rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::{
  11    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  12    task,
  13};
  14use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  15use collections::{HashMap, HashSet};
  16use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  17use log::{as_debug, as_display};
  18use rpc::{
  19    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  20    Connection, ConnectionId, Peer, TypedEnvelope,
  21};
  22use sha1::{Digest as _, Sha1};
  23use std::{
  24    any::TypeId,
  25    future::Future,
  26    marker::PhantomData,
  27    ops::{Deref, DerefMut},
  28    rc::Rc,
  29    sync::Arc,
  30    time::{Duration, Instant},
  31};
  32use store::{Store, Worktree};
  33use surf::StatusCode;
  34use tide::{
  35    http::headers::{HeaderName, CONNECTION, UPGRADE},
  36    Request, Response,
  37};
  38use time::OffsetDateTime;
  39use util::ResultExt;
  40
  41type MessageHandler = Box<
  42    dyn Send
  43        + Sync
  44        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  45>;
  46
  47pub struct Server {
  48    peer: Arc<Peer>,
  49    store: RwLock<Store>,
  50    app_state: Arc<AppState>,
  51    handlers: HashMap<TypeId, MessageHandler>,
  52    notifications: Option<mpsc::UnboundedSender<()>>,
  53}
  54
  55pub trait Executor: Send + Clone {
  56    type Timer: Send + Future;
  57    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  58    fn timer(&self, duration: Duration) -> Self::Timer;
  59}
  60
  61#[derive(Clone)]
  62pub struct RealExecutor;
  63
  64const MESSAGE_COUNT_PER_PAGE: usize = 100;
  65const MAX_MESSAGE_LEN: usize = 1024;
  66
  67struct StoreReadGuard<'a> {
  68    guard: RwLockReadGuard<'a, Store>,
  69    _not_send: PhantomData<Rc<()>>,
  70}
  71
  72struct StoreWriteGuard<'a> {
  73    guard: RwLockWriteGuard<'a, Store>,
  74    _not_send: PhantomData<Rc<()>>,
  75}
  76
  77impl Server {
  78    pub fn new(
  79        app_state: Arc<AppState>,
  80        peer: Arc<Peer>,
  81        notifications: Option<mpsc::UnboundedSender<()>>,
  82    ) -> Arc<Self> {
  83        let mut server = Self {
  84            peer,
  85            app_state,
  86            store: Default::default(),
  87            handlers: Default::default(),
  88            notifications,
  89        };
  90
  91        server
  92            .add_request_handler(Server::ping)
  93            .add_request_handler(Server::register_project)
  94            .add_message_handler(Server::unregister_project)
  95            .add_request_handler(Server::share_project)
  96            .add_message_handler(Server::unshare_project)
  97            .add_request_handler(Server::join_project)
  98            .add_message_handler(Server::leave_project)
  99            .add_request_handler(Server::register_worktree)
 100            .add_message_handler(Server::unregister_worktree)
 101            .add_request_handler(Server::update_worktree)
 102            .add_message_handler(Server::start_language_server)
 103            .add_message_handler(Server::update_language_server)
 104            .add_message_handler(Server::update_diagnostic_summary)
 105            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 106            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 107            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 108            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 109            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 110            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 111            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 112            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 113            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 114            .add_request_handler(
 115                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 116            )
 117            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 118            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 119            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 120            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 121            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 122            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 123            .add_request_handler(Server::update_buffer)
 124            .add_message_handler(Server::update_buffer_file)
 125            .add_message_handler(Server::buffer_reloaded)
 126            .add_message_handler(Server::buffer_saved)
 127            .add_request_handler(Server::save_buffer)
 128            .add_request_handler(Server::get_channels)
 129            .add_request_handler(Server::get_users)
 130            .add_request_handler(Server::join_channel)
 131            .add_message_handler(Server::leave_channel)
 132            .add_request_handler(Server::send_channel_message)
 133            .add_request_handler(Server::follow)
 134            .add_message_handler(Server::unfollow)
 135            .add_message_handler(Server::update_followers)
 136            .add_request_handler(Server::get_channel_messages);
 137
 138        Arc::new(server)
 139    }
 140
 141    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 142    where
 143        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 144        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 145        M: EnvelopedMessage,
 146    {
 147        let prev_handler = self.handlers.insert(
 148            TypeId::of::<M>(),
 149            Box::new(move |server, envelope| {
 150                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 151                (handler)(server, *envelope).boxed()
 152            }),
 153        );
 154        if prev_handler.is_some() {
 155            panic!("registered a handler for the same message twice");
 156        }
 157        self
 158    }
 159
 160    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 161    where
 162        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 163        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 164        M: RequestMessage,
 165    {
 166        self.add_message_handler(move |server, envelope| {
 167            let receipt = envelope.receipt();
 168            let response = (handler)(server.clone(), envelope);
 169            async move {
 170                match response.await {
 171                    Ok(response) => {
 172                        server.peer.respond(receipt, response)?;
 173                        Ok(())
 174                    }
 175                    Err(error) => {
 176                        server.peer.respond_with_error(
 177                            receipt,
 178                            proto::Error {
 179                                message: error.to_string(),
 180                            },
 181                        )?;
 182                        Err(error)
 183                    }
 184                }
 185            }
 186        })
 187    }
 188
 189    pub fn handle_connection<E: Executor>(
 190        self: &Arc<Self>,
 191        connection: Connection,
 192        addr: String,
 193        user_id: UserId,
 194        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 195        executor: E,
 196    ) -> impl Future<Output = ()> {
 197        let mut this = self.clone();
 198        async move {
 199            let (connection_id, handle_io, mut incoming_rx) = this
 200                .peer
 201                .add_connection(connection, {
 202                    let executor = executor.clone();
 203                    move |duration| {
 204                        let timer = executor.timer(duration);
 205                        async move {
 206                            timer.await;
 207                        }
 208                    }
 209                })
 210                .await;
 211
 212            if let Some(send_connection_id) = send_connection_id.as_mut() {
 213                let _ = send_connection_id.send(connection_id).await;
 214            }
 215
 216            this.state_mut()
 217                .await
 218                .add_connection(connection_id, user_id);
 219            this.update_contacts_for_users(&[user_id]).await;
 220
 221            let handle_io = handle_io.fuse();
 222            futures::pin_mut!(handle_io);
 223            loop {
 224                let next_message = incoming_rx.next().fuse();
 225                futures::pin_mut!(next_message);
 226                futures::select_biased! {
 227                    result = handle_io => {
 228                        if let Err(err) = result {
 229                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 230                        }
 231                        break;
 232                    }
 233                    message = next_message => {
 234                        if let Some(message) = message {
 235                            let start_time = Instant::now();
 236                            let type_name = message.payload_type_name();
 237                            log::info!(connection_id = connection_id.0, type_name = type_name; "rpc message received");
 238                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 239                                let notifications = this.notifications.clone();
 240                                let is_background = message.is_background();
 241                                let handle_message = (handler)(this.clone(), message);
 242                                let handle_message = async move {
 243                                    if let Err(err) = handle_message.await {
 244                                        log::error!(connection_id = connection_id.0, type = type_name, error = as_display!(err); "rpc message error");
 245                                    } else {
 246                                        log::info!(connection_id = connection_id.0, type = type_name, duration = as_debug!(start_time.elapsed()); "rpc message handled");
 247                                    }
 248                                    if let Some(mut notifications) = notifications {
 249                                        let _ = notifications.send(()).await;
 250                                    }
 251                                };
 252                                if is_background {
 253                                    executor.spawn_detached(handle_message);
 254                                } else {
 255                                    handle_message.await;
 256                                }
 257                            } else {
 258                                log::warn!("unhandled message: {}", type_name);
 259                            }
 260                        } else {
 261                            log::info!(address = as_debug!(addr); "rpc connection closed");
 262                            break;
 263                        }
 264                    }
 265                }
 266            }
 267
 268            if let Err(err) = this.sign_out(connection_id).await {
 269                log::error!("error signing out connection {:?} - {:?}", addr, err);
 270            }
 271        }
 272    }
 273
 274    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 275        self.peer.disconnect(connection_id);
 276        let removed_connection = self.state_mut().await.remove_connection(connection_id)?;
 277
 278        for (project_id, project) in removed_connection.hosted_projects {
 279            if let Some(share) = project.share {
 280                broadcast(
 281                    connection_id,
 282                    share.guests.keys().copied().collect(),
 283                    |conn_id| {
 284                        self.peer
 285                            .send(conn_id, proto::UnshareProject { project_id })
 286                    },
 287                );
 288            }
 289        }
 290
 291        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 292            broadcast(connection_id, peer_ids, |conn_id| {
 293                self.peer.send(
 294                    conn_id,
 295                    proto::RemoveProjectCollaborator {
 296                        project_id,
 297                        peer_id: connection_id.0,
 298                    },
 299                )
 300            });
 301        }
 302
 303        self.update_contacts_for_users(removed_connection.contact_ids.iter())
 304            .await;
 305        Ok(())
 306    }
 307
 308    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 309        Ok(proto::Ack {})
 310    }
 311
 312    async fn register_project(
 313        mut self: Arc<Server>,
 314        request: TypedEnvelope<proto::RegisterProject>,
 315    ) -> tide::Result<proto::RegisterProjectResponse> {
 316        let project_id = {
 317            let mut state = self.state_mut().await;
 318            let user_id = state.user_id_for_connection(request.sender_id)?;
 319            state.register_project(request.sender_id, user_id)
 320        };
 321        Ok(proto::RegisterProjectResponse { project_id })
 322    }
 323
 324    async fn unregister_project(
 325        mut self: Arc<Server>,
 326        request: TypedEnvelope<proto::UnregisterProject>,
 327    ) -> tide::Result<()> {
 328        let project = self
 329            .state_mut()
 330            .await
 331            .unregister_project(request.payload.project_id, request.sender_id)?;
 332        self.update_contacts_for_users(project.authorized_user_ids().iter())
 333            .await;
 334        Ok(())
 335    }
 336
 337    async fn share_project(
 338        mut self: Arc<Server>,
 339        request: TypedEnvelope<proto::ShareProject>,
 340    ) -> tide::Result<proto::Ack> {
 341        self.state_mut()
 342            .await
 343            .share_project(request.payload.project_id, request.sender_id);
 344        Ok(proto::Ack {})
 345    }
 346
 347    async fn unshare_project(
 348        mut self: Arc<Server>,
 349        request: TypedEnvelope<proto::UnshareProject>,
 350    ) -> tide::Result<()> {
 351        let project_id = request.payload.project_id;
 352        let project = self
 353            .state_mut()
 354            .await
 355            .unshare_project(project_id, request.sender_id)?;
 356
 357        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 358            self.peer
 359                .send(conn_id, proto::UnshareProject { project_id })
 360        });
 361        self.update_contacts_for_users(&project.authorized_user_ids)
 362            .await;
 363        Ok(())
 364    }
 365
 366    async fn join_project(
 367        mut self: Arc<Server>,
 368        request: TypedEnvelope<proto::JoinProject>,
 369    ) -> tide::Result<proto::JoinProjectResponse> {
 370        let project_id = request.payload.project_id;
 371
 372        let user_id = self
 373            .state()
 374            .await
 375            .user_id_for_connection(request.sender_id)?;
 376        let (response, connection_ids, contact_user_ids) = self
 377            .state_mut()
 378            .await
 379            .join_project(request.sender_id, user_id, project_id)
 380            .and_then(|joined| {
 381                let share = joined.project.share()?;
 382                let peer_count = share.guests.len();
 383                let mut collaborators = Vec::with_capacity(peer_count);
 384                collaborators.push(proto::Collaborator {
 385                    peer_id: joined.project.host_connection_id.0,
 386                    replica_id: 0,
 387                    user_id: joined.project.host_user_id.to_proto(),
 388                });
 389                let worktrees = share
 390                    .worktrees
 391                    .iter()
 392                    .filter_map(|(id, shared_worktree)| {
 393                        let worktree = joined.project.worktrees.get(&id)?;
 394                        Some(proto::Worktree {
 395                            id: *id,
 396                            root_name: worktree.root_name.clone(),
 397                            entries: shared_worktree.entries.values().cloned().collect(),
 398                            diagnostic_summaries: shared_worktree
 399                                .diagnostic_summaries
 400                                .values()
 401                                .cloned()
 402                                .collect(),
 403                            visible: worktree.visible,
 404                        })
 405                    })
 406                    .collect();
 407                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 408                    if *peer_conn_id != request.sender_id {
 409                        collaborators.push(proto::Collaborator {
 410                            peer_id: peer_conn_id.0,
 411                            replica_id: *peer_replica_id as u32,
 412                            user_id: peer_user_id.to_proto(),
 413                        });
 414                    }
 415                }
 416                let response = proto::JoinProjectResponse {
 417                    worktrees,
 418                    replica_id: joined.replica_id as u32,
 419                    collaborators,
 420                    language_servers: joined.project.language_servers.clone(),
 421                };
 422                let connection_ids = joined.project.connection_ids();
 423                let contact_user_ids = joined.project.authorized_user_ids();
 424                Ok((response, connection_ids, contact_user_ids))
 425            })?;
 426
 427        broadcast(request.sender_id, connection_ids, |conn_id| {
 428            self.peer.send(
 429                conn_id,
 430                proto::AddProjectCollaborator {
 431                    project_id,
 432                    collaborator: Some(proto::Collaborator {
 433                        peer_id: request.sender_id.0,
 434                        replica_id: response.replica_id,
 435                        user_id: user_id.to_proto(),
 436                    }),
 437                },
 438            )
 439        });
 440        self.update_contacts_for_users(&contact_user_ids).await;
 441        Ok(response)
 442    }
 443
 444    async fn leave_project(
 445        mut self: Arc<Server>,
 446        request: TypedEnvelope<proto::LeaveProject>,
 447    ) -> tide::Result<()> {
 448        let sender_id = request.sender_id;
 449        let project_id = request.payload.project_id;
 450        let worktree = self
 451            .state_mut()
 452            .await
 453            .leave_project(sender_id, project_id)?;
 454
 455        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 456            self.peer.send(
 457                conn_id,
 458                proto::RemoveProjectCollaborator {
 459                    project_id,
 460                    peer_id: sender_id.0,
 461                },
 462            )
 463        });
 464        self.update_contacts_for_users(&worktree.authorized_user_ids)
 465            .await;
 466
 467        Ok(())
 468    }
 469
 470    async fn register_worktree(
 471        mut self: Arc<Server>,
 472        request: TypedEnvelope<proto::RegisterWorktree>,
 473    ) -> tide::Result<proto::Ack> {
 474        let host_user_id = self
 475            .state()
 476            .await
 477            .user_id_for_connection(request.sender_id)?;
 478
 479        let mut contact_user_ids = HashSet::default();
 480        contact_user_ids.insert(host_user_id);
 481        for github_login in &request.payload.authorized_logins {
 482            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 483            contact_user_ids.insert(contact_user_id);
 484        }
 485
 486        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 487        let guest_connection_ids;
 488        {
 489            let mut state = self.state_mut().await;
 490            guest_connection_ids = state
 491                .read_project(request.payload.project_id, request.sender_id)?
 492                .guest_connection_ids();
 493            state.register_worktree(
 494                request.payload.project_id,
 495                request.payload.worktree_id,
 496                request.sender_id,
 497                Worktree {
 498                    authorized_user_ids: contact_user_ids.clone(),
 499                    root_name: request.payload.root_name.clone(),
 500                    visible: request.payload.visible,
 501                },
 502            )?;
 503        }
 504        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 505            self.peer
 506                .forward_send(request.sender_id, connection_id, request.payload.clone())
 507        });
 508        self.update_contacts_for_users(&contact_user_ids).await;
 509        Ok(proto::Ack {})
 510    }
 511
 512    async fn unregister_worktree(
 513        mut self: Arc<Server>,
 514        request: TypedEnvelope<proto::UnregisterWorktree>,
 515    ) -> tide::Result<()> {
 516        let project_id = request.payload.project_id;
 517        let worktree_id = request.payload.worktree_id;
 518        let (worktree, guest_connection_ids) = self.state_mut().await.unregister_worktree(
 519            project_id,
 520            worktree_id,
 521            request.sender_id,
 522        )?;
 523        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 524            self.peer.send(
 525                conn_id,
 526                proto::UnregisterWorktree {
 527                    project_id,
 528                    worktree_id,
 529                },
 530            )
 531        });
 532        self.update_contacts_for_users(&worktree.authorized_user_ids)
 533            .await;
 534        Ok(())
 535    }
 536
 537    async fn update_worktree(
 538        mut self: Arc<Server>,
 539        request: TypedEnvelope<proto::UpdateWorktree>,
 540    ) -> tide::Result<proto::Ack> {
 541        let connection_ids = self.state_mut().await.update_worktree(
 542            request.sender_id,
 543            request.payload.project_id,
 544            request.payload.worktree_id,
 545            &request.payload.removed_entries,
 546            &request.payload.updated_entries,
 547        )?;
 548
 549        broadcast(request.sender_id, connection_ids, |connection_id| {
 550            self.peer
 551                .forward_send(request.sender_id, connection_id, request.payload.clone())
 552        });
 553
 554        Ok(proto::Ack {})
 555    }
 556
 557    async fn update_diagnostic_summary(
 558        mut self: Arc<Server>,
 559        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 560    ) -> tide::Result<()> {
 561        let summary = request
 562            .payload
 563            .summary
 564            .clone()
 565            .ok_or_else(|| anyhow!("invalid summary"))?;
 566        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 567            request.payload.project_id,
 568            request.payload.worktree_id,
 569            request.sender_id,
 570            summary,
 571        )?;
 572
 573        broadcast(request.sender_id, receiver_ids, |connection_id| {
 574            self.peer
 575                .forward_send(request.sender_id, connection_id, request.payload.clone())
 576        });
 577        Ok(())
 578    }
 579
 580    async fn start_language_server(
 581        mut self: Arc<Server>,
 582        request: TypedEnvelope<proto::StartLanguageServer>,
 583    ) -> tide::Result<()> {
 584        let receiver_ids = self.state_mut().await.start_language_server(
 585            request.payload.project_id,
 586            request.sender_id,
 587            request
 588                .payload
 589                .server
 590                .clone()
 591                .ok_or_else(|| anyhow!("invalid language server"))?,
 592        )?;
 593        broadcast(request.sender_id, receiver_ids, |connection_id| {
 594            self.peer
 595                .forward_send(request.sender_id, connection_id, request.payload.clone())
 596        });
 597        Ok(())
 598    }
 599
 600    async fn update_language_server(
 601        self: Arc<Server>,
 602        request: TypedEnvelope<proto::UpdateLanguageServer>,
 603    ) -> tide::Result<()> {
 604        let receiver_ids = self
 605            .state()
 606            .await
 607            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 608        broadcast(request.sender_id, receiver_ids, |connection_id| {
 609            self.peer
 610                .forward_send(request.sender_id, connection_id, request.payload.clone())
 611        });
 612        Ok(())
 613    }
 614
 615    async fn forward_project_request<T>(
 616        self: Arc<Server>,
 617        request: TypedEnvelope<T>,
 618    ) -> tide::Result<T::Response>
 619    where
 620        T: EntityMessage + RequestMessage,
 621    {
 622        let host_connection_id = self
 623            .state()
 624            .await
 625            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 626            .host_connection_id;
 627        Ok(self
 628            .peer
 629            .forward_request(request.sender_id, host_connection_id, request.payload)
 630            .await?)
 631    }
 632
 633    async fn save_buffer(
 634        self: Arc<Server>,
 635        request: TypedEnvelope<proto::SaveBuffer>,
 636    ) -> tide::Result<proto::BufferSaved> {
 637        let host = self
 638            .state()
 639            .await
 640            .read_project(request.payload.project_id, request.sender_id)?
 641            .host_connection_id;
 642        let response = self
 643            .peer
 644            .forward_request(request.sender_id, host, request.payload.clone())
 645            .await?;
 646
 647        let mut guests = self
 648            .state()
 649            .await
 650            .read_project(request.payload.project_id, request.sender_id)?
 651            .connection_ids();
 652        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 653        broadcast(host, guests, |conn_id| {
 654            self.peer.forward_send(host, conn_id, response.clone())
 655        });
 656
 657        Ok(response)
 658    }
 659
 660    async fn update_buffer(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::UpdateBuffer>,
 663    ) -> tide::Result<proto::Ack> {
 664        let receiver_ids = self
 665            .state()
 666            .await
 667            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 668        broadcast(request.sender_id, receiver_ids, |connection_id| {
 669            self.peer
 670                .forward_send(request.sender_id, connection_id, request.payload.clone())
 671        });
 672        Ok(proto::Ack {})
 673    }
 674
 675    async fn update_buffer_file(
 676        self: Arc<Server>,
 677        request: TypedEnvelope<proto::UpdateBufferFile>,
 678    ) -> tide::Result<()> {
 679        let receiver_ids = self
 680            .state()
 681            .await
 682            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 683        broadcast(request.sender_id, receiver_ids, |connection_id| {
 684            self.peer
 685                .forward_send(request.sender_id, connection_id, request.payload.clone())
 686        });
 687        Ok(())
 688    }
 689
 690    async fn buffer_reloaded(
 691        self: Arc<Server>,
 692        request: TypedEnvelope<proto::BufferReloaded>,
 693    ) -> tide::Result<()> {
 694        let receiver_ids = self
 695            .state()
 696            .await
 697            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 698        broadcast(request.sender_id, receiver_ids, |connection_id| {
 699            self.peer
 700                .forward_send(request.sender_id, connection_id, request.payload.clone())
 701        });
 702        Ok(())
 703    }
 704
 705    async fn buffer_saved(
 706        self: Arc<Server>,
 707        request: TypedEnvelope<proto::BufferSaved>,
 708    ) -> tide::Result<()> {
 709        let receiver_ids = self
 710            .state()
 711            .await
 712            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 713        broadcast(request.sender_id, receiver_ids, |connection_id| {
 714            self.peer
 715                .forward_send(request.sender_id, connection_id, request.payload.clone())
 716        });
 717        Ok(())
 718    }
 719
 720    async fn follow(
 721        self: Arc<Self>,
 722        request: TypedEnvelope<proto::Follow>,
 723    ) -> tide::Result<proto::FollowResponse> {
 724        let leader_id = ConnectionId(request.payload.leader_id);
 725        let follower_id = request.sender_id;
 726        if !self
 727            .state()
 728            .await
 729            .project_connection_ids(request.payload.project_id, follower_id)?
 730            .contains(&leader_id)
 731        {
 732            Err(anyhow!("no such peer"))?;
 733        }
 734        let mut response = self
 735            .peer
 736            .forward_request(request.sender_id, leader_id, request.payload)
 737            .await?;
 738        response
 739            .views
 740            .retain(|view| view.leader_id != Some(follower_id.0));
 741        Ok(response)
 742    }
 743
 744    async fn unfollow(
 745        self: Arc<Self>,
 746        request: TypedEnvelope<proto::Unfollow>,
 747    ) -> tide::Result<()> {
 748        let leader_id = ConnectionId(request.payload.leader_id);
 749        if !self
 750            .state()
 751            .await
 752            .project_connection_ids(request.payload.project_id, request.sender_id)?
 753            .contains(&leader_id)
 754        {
 755            Err(anyhow!("no such peer"))?;
 756        }
 757        self.peer
 758            .forward_send(request.sender_id, leader_id, request.payload)?;
 759        Ok(())
 760    }
 761
 762    async fn update_followers(
 763        self: Arc<Self>,
 764        request: TypedEnvelope<proto::UpdateFollowers>,
 765    ) -> tide::Result<()> {
 766        let connection_ids = self
 767            .state()
 768            .await
 769            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 770        let leader_id = request
 771            .payload
 772            .variant
 773            .as_ref()
 774            .and_then(|variant| match variant {
 775                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 776                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 777                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 778            });
 779        for follower_id in &request.payload.follower_ids {
 780            let follower_id = ConnectionId(*follower_id);
 781            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 782                self.peer
 783                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 784            }
 785        }
 786        Ok(())
 787    }
 788
 789    async fn get_channels(
 790        self: Arc<Server>,
 791        request: TypedEnvelope<proto::GetChannels>,
 792    ) -> tide::Result<proto::GetChannelsResponse> {
 793        let user_id = self
 794            .state()
 795            .await
 796            .user_id_for_connection(request.sender_id)?;
 797        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 798        Ok(proto::GetChannelsResponse {
 799            channels: channels
 800                .into_iter()
 801                .map(|chan| proto::Channel {
 802                    id: chan.id.to_proto(),
 803                    name: chan.name,
 804                })
 805                .collect(),
 806        })
 807    }
 808
 809    async fn get_users(
 810        self: Arc<Server>,
 811        request: TypedEnvelope<proto::GetUsers>,
 812    ) -> tide::Result<proto::GetUsersResponse> {
 813        let user_ids = request
 814            .payload
 815            .user_ids
 816            .into_iter()
 817            .map(UserId::from_proto)
 818            .collect();
 819        let users = self
 820            .app_state
 821            .db
 822            .get_users_by_ids(user_ids)
 823            .await?
 824            .into_iter()
 825            .map(|user| proto::User {
 826                id: user.id.to_proto(),
 827                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 828                github_login: user.github_login,
 829            })
 830            .collect();
 831        Ok(proto::GetUsersResponse { users })
 832    }
 833
 834    async fn update_contacts_for_users<'a>(
 835        self: &Arc<Server>,
 836        user_ids: impl IntoIterator<Item = &'a UserId>,
 837    ) {
 838        let state = self.state().await;
 839        for user_id in user_ids {
 840            let contacts = state.contacts_for_user(*user_id);
 841            for connection_id in state.connection_ids_for_user(*user_id) {
 842                self.peer
 843                    .send(
 844                        connection_id,
 845                        proto::UpdateContacts {
 846                            contacts: contacts.clone(),
 847                        },
 848                    )
 849                    .log_err();
 850            }
 851        }
 852    }
 853
 854    async fn join_channel(
 855        mut self: Arc<Self>,
 856        request: TypedEnvelope<proto::JoinChannel>,
 857    ) -> tide::Result<proto::JoinChannelResponse> {
 858        let user_id = self
 859            .state()
 860            .await
 861            .user_id_for_connection(request.sender_id)?;
 862        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 863        if !self
 864            .app_state
 865            .db
 866            .can_user_access_channel(user_id, channel_id)
 867            .await?
 868        {
 869            Err(anyhow!("access denied"))?;
 870        }
 871
 872        self.state_mut()
 873            .await
 874            .join_channel(request.sender_id, channel_id);
 875        let messages = self
 876            .app_state
 877            .db
 878            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 879            .await?
 880            .into_iter()
 881            .map(|msg| proto::ChannelMessage {
 882                id: msg.id.to_proto(),
 883                body: msg.body,
 884                timestamp: msg.sent_at.unix_timestamp() as u64,
 885                sender_id: msg.sender_id.to_proto(),
 886                nonce: Some(msg.nonce.as_u128().into()),
 887            })
 888            .collect::<Vec<_>>();
 889        Ok(proto::JoinChannelResponse {
 890            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 891            messages,
 892        })
 893    }
 894
 895    async fn leave_channel(
 896        mut self: Arc<Self>,
 897        request: TypedEnvelope<proto::LeaveChannel>,
 898    ) -> tide::Result<()> {
 899        let user_id = self
 900            .state()
 901            .await
 902            .user_id_for_connection(request.sender_id)?;
 903        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 904        if !self
 905            .app_state
 906            .db
 907            .can_user_access_channel(user_id, channel_id)
 908            .await?
 909        {
 910            Err(anyhow!("access denied"))?;
 911        }
 912
 913        self.state_mut()
 914            .await
 915            .leave_channel(request.sender_id, channel_id);
 916
 917        Ok(())
 918    }
 919
 920    async fn send_channel_message(
 921        self: Arc<Self>,
 922        request: TypedEnvelope<proto::SendChannelMessage>,
 923    ) -> tide::Result<proto::SendChannelMessageResponse> {
 924        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 925        let user_id;
 926        let connection_ids;
 927        {
 928            let state = self.state().await;
 929            user_id = state.user_id_for_connection(request.sender_id)?;
 930            connection_ids = state.channel_connection_ids(channel_id)?;
 931        }
 932
 933        // Validate the message body.
 934        let body = request.payload.body.trim().to_string();
 935        if body.len() > MAX_MESSAGE_LEN {
 936            return Err(anyhow!("message is too long"))?;
 937        }
 938        if body.is_empty() {
 939            return Err(anyhow!("message can't be blank"))?;
 940        }
 941
 942        let timestamp = OffsetDateTime::now_utc();
 943        let nonce = request
 944            .payload
 945            .nonce
 946            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 947
 948        let message_id = self
 949            .app_state
 950            .db
 951            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 952            .await?
 953            .to_proto();
 954        let message = proto::ChannelMessage {
 955            sender_id: user_id.to_proto(),
 956            id: message_id,
 957            body,
 958            timestamp: timestamp.unix_timestamp() as u64,
 959            nonce: Some(nonce),
 960        };
 961        broadcast(request.sender_id, connection_ids, |conn_id| {
 962            self.peer.send(
 963                conn_id,
 964                proto::ChannelMessageSent {
 965                    channel_id: channel_id.to_proto(),
 966                    message: Some(message.clone()),
 967                },
 968            )
 969        });
 970        Ok(proto::SendChannelMessageResponse {
 971            message: Some(message),
 972        })
 973    }
 974
 975    async fn get_channel_messages(
 976        self: Arc<Self>,
 977        request: TypedEnvelope<proto::GetChannelMessages>,
 978    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 979        let user_id = self
 980            .state()
 981            .await
 982            .user_id_for_connection(request.sender_id)?;
 983        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 984        if !self
 985            .app_state
 986            .db
 987            .can_user_access_channel(user_id, channel_id)
 988            .await?
 989        {
 990            Err(anyhow!("access denied"))?;
 991        }
 992
 993        let messages = self
 994            .app_state
 995            .db
 996            .get_channel_messages(
 997                channel_id,
 998                MESSAGE_COUNT_PER_PAGE,
 999                Some(MessageId::from_proto(request.payload.before_message_id)),
1000            )
1001            .await?
1002            .into_iter()
1003            .map(|msg| proto::ChannelMessage {
1004                id: msg.id.to_proto(),
1005                body: msg.body,
1006                timestamp: msg.sent_at.unix_timestamp() as u64,
1007                sender_id: msg.sender_id.to_proto(),
1008                nonce: Some(msg.nonce.as_u128().into()),
1009            })
1010            .collect::<Vec<_>>();
1011
1012        Ok(proto::GetChannelMessagesResponse {
1013            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1014            messages,
1015        })
1016    }
1017
1018    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1019        #[cfg(test)]
1020        async_std::task::yield_now().await;
1021        let guard = self.store.read().await;
1022        #[cfg(test)]
1023        async_std::task::yield_now().await;
1024        StoreReadGuard {
1025            guard,
1026            _not_send: PhantomData,
1027        }
1028    }
1029
1030    async fn state_mut<'a>(self: &'a mut Arc<Self>) -> StoreWriteGuard<'a> {
1031        #[cfg(test)]
1032        async_std::task::yield_now().await;
1033        let guard = self.store.write().await;
1034        #[cfg(test)]
1035        async_std::task::yield_now().await;
1036        StoreWriteGuard {
1037            guard,
1038            _not_send: PhantomData,
1039        }
1040    }
1041}
1042
1043impl<'a> Deref for StoreReadGuard<'a> {
1044    type Target = Store;
1045
1046    fn deref(&self) -> &Self::Target {
1047        &*self.guard
1048    }
1049}
1050
1051impl<'a> Deref for StoreWriteGuard<'a> {
1052    type Target = Store;
1053
1054    fn deref(&self) -> &Self::Target {
1055        &*self.guard
1056    }
1057}
1058
1059impl<'a> DerefMut for StoreWriteGuard<'a> {
1060    fn deref_mut(&mut self) -> &mut Self::Target {
1061        &mut *self.guard
1062    }
1063}
1064
1065impl<'a> Drop for StoreWriteGuard<'a> {
1066    fn drop(&mut self) {
1067        #[cfg(test)]
1068        self.check_invariants();
1069    }
1070}
1071
1072impl Executor for RealExecutor {
1073    type Timer = Timer;
1074
1075    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1076        task::spawn(future);
1077    }
1078
1079    fn timer(&self, duration: Duration) -> Self::Timer {
1080        Timer::after(duration)
1081    }
1082}
1083
1084fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1085where
1086    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1087{
1088    for receiver_id in receiver_ids {
1089        if receiver_id != sender_id {
1090            f(receiver_id).log_err();
1091        }
1092    }
1093}
1094
1095pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1096    let server = Server::new(app.state().clone(), rpc.clone(), None);
1097    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1098        let server = server.clone();
1099        async move {
1100            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1101
1102            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1103            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1104            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1105            let client_protocol_version: Option<u32> = request
1106                .header("X-Zed-Protocol-Version")
1107                .and_then(|v| v.as_str().parse().ok());
1108
1109            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1110                return Ok(Response::new(StatusCode::UpgradeRequired));
1111            }
1112
1113            let header = match request.header("Sec-Websocket-Key") {
1114                Some(h) => h.as_str(),
1115                None => return Err(anyhow!("expected sec-websocket-key"))?,
1116            };
1117
1118            let user_id = process_auth_header(&request).await?;
1119
1120            let mut response = Response::new(StatusCode::SwitchingProtocols);
1121            response.insert_header(UPGRADE, "websocket");
1122            response.insert_header(CONNECTION, "Upgrade");
1123            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1124            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1125            response.insert_header("Sec-Websocket-Version", "13");
1126
1127            let http_res: &mut tide::http::Response = response.as_mut();
1128            let upgrade_receiver = http_res.recv_upgrade().await;
1129            let addr = request.remote().unwrap_or("unknown").to_string();
1130            task::spawn(async move {
1131                if let Some(stream) = upgrade_receiver.await {
1132                    server
1133                        .handle_connection(
1134                            Connection::new(
1135                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1136                            ),
1137                            addr,
1138                            user_id,
1139                            None,
1140                            RealExecutor,
1141                        )
1142                        .await;
1143                }
1144            });
1145
1146            Ok(response)
1147        }
1148    });
1149}
1150
1151fn header_contains_ignore_case<T>(
1152    request: &tide::Request<T>,
1153    header_name: HeaderName,
1154    value: &str,
1155) -> bool {
1156    request
1157        .header(header_name)
1158        .map(|h| {
1159            h.as_str()
1160                .split(',')
1161                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1162        })
1163        .unwrap_or(false)
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169    use crate::{
1170        auth,
1171        db::{tests::TestDb, UserId},
1172        github, AppState, Config,
1173    };
1174    use ::rpc::Peer;
1175    use client::{
1176        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1177        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1178    };
1179    use collections::BTreeMap;
1180    use editor::{
1181        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1182        ToOffset, ToggleCodeActions, Undo,
1183    };
1184    use gpui::{
1185        executor::{self, Deterministic},
1186        geometry::vector::vec2f,
1187        ModelHandle, TestAppContext, ViewHandle,
1188    };
1189    use language::{
1190        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1191        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1192    };
1193    use lsp::{self, FakeLanguageServer};
1194    use parking_lot::Mutex;
1195    use project::{
1196        fs::{FakeFs, Fs as _},
1197        search::SearchQuery,
1198        worktree::WorktreeHandle,
1199        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1200    };
1201    use rand::prelude::*;
1202    use rpc::PeerId;
1203    use serde_json::json;
1204    use settings::Settings;
1205    use sqlx::types::time::OffsetDateTime;
1206    use std::{
1207        env,
1208        ops::Deref,
1209        path::{Path, PathBuf},
1210        rc::Rc,
1211        sync::{
1212            atomic::{AtomicBool, Ordering::SeqCst},
1213            Arc,
1214        },
1215        time::Duration,
1216    };
1217    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1218
1219    #[cfg(test)]
1220    #[ctor::ctor]
1221    fn init_logger() {
1222        if std::env::var("RUST_LOG").is_ok() {
1223            env_logger::init();
1224        }
1225    }
1226
1227    #[gpui::test(iterations = 10)]
1228    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1229        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1230        let lang_registry = Arc::new(LanguageRegistry::test());
1231        let fs = FakeFs::new(cx_a.background());
1232        cx_a.foreground().forbid_parking();
1233
1234        // Connect to a server as 2 clients.
1235        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1236        let client_a = server.create_client(cx_a, "user_a").await;
1237        let client_b = server.create_client(cx_b, "user_b").await;
1238
1239        // Share a project as client A
1240        fs.insert_tree(
1241            "/a",
1242            json!({
1243                ".zed.toml": r#"collaborators = ["user_b"]"#,
1244                "a.txt": "a-contents",
1245                "b.txt": "b-contents",
1246            }),
1247        )
1248        .await;
1249        let project_a = cx_a.update(|cx| {
1250            Project::local(
1251                client_a.clone(),
1252                client_a.user_store.clone(),
1253                lang_registry.clone(),
1254                fs.clone(),
1255                cx,
1256            )
1257        });
1258        let (worktree_a, _) = project_a
1259            .update(cx_a, |p, cx| {
1260                p.find_or_create_local_worktree("/a", true, cx)
1261            })
1262            .await
1263            .unwrap();
1264        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1265        worktree_a
1266            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1267            .await;
1268        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1269        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1270
1271        // Join that project as client B
1272        let project_b = Project::remote(
1273            project_id,
1274            client_b.clone(),
1275            client_b.user_store.clone(),
1276            lang_registry.clone(),
1277            fs.clone(),
1278            &mut cx_b.to_async(),
1279        )
1280        .await
1281        .unwrap();
1282
1283        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1284            assert_eq!(
1285                project
1286                    .collaborators()
1287                    .get(&client_a.peer_id)
1288                    .unwrap()
1289                    .user
1290                    .github_login,
1291                "user_a"
1292            );
1293            project.replica_id()
1294        });
1295        project_a
1296            .condition(&cx_a, |tree, _| {
1297                tree.collaborators()
1298                    .get(&client_b.peer_id)
1299                    .map_or(false, |collaborator| {
1300                        collaborator.replica_id == replica_id_b
1301                            && collaborator.user.github_login == "user_b"
1302                    })
1303            })
1304            .await;
1305
1306        // Open the same file as client B and client A.
1307        let buffer_b = project_b
1308            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1309            .await
1310            .unwrap();
1311        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1312        project_a.read_with(cx_a, |project, cx| {
1313            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1314        });
1315        let buffer_a = project_a
1316            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1317            .await
1318            .unwrap();
1319
1320        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1321
1322        // TODO
1323        // // Create a selection set as client B and see that selection set as client A.
1324        // buffer_a
1325        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1326        //     .await;
1327
1328        // Edit the buffer as client B and see that edit as client A.
1329        editor_b.update(cx_b, |editor, cx| {
1330            editor.handle_input(&Input("ok, ".into()), cx)
1331        });
1332        buffer_a
1333            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1334            .await;
1335
1336        // TODO
1337        // // Remove the selection set as client B, see those selections disappear as client A.
1338        cx_b.update(move |_| drop(editor_b));
1339        // buffer_a
1340        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1341        //     .await;
1342
1343        // Dropping the client B's project removes client B from client A's collaborators.
1344        cx_b.update(move |_| drop(project_b));
1345        project_a
1346            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1347            .await;
1348    }
1349
1350    #[gpui::test(iterations = 10)]
1351    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1352        let lang_registry = Arc::new(LanguageRegistry::test());
1353        let fs = FakeFs::new(cx_a.background());
1354        cx_a.foreground().forbid_parking();
1355
1356        // Connect to a server as 2 clients.
1357        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1358        let client_a = server.create_client(cx_a, "user_a").await;
1359        let client_b = server.create_client(cx_b, "user_b").await;
1360
1361        // Share a project as client A
1362        fs.insert_tree(
1363            "/a",
1364            json!({
1365                ".zed.toml": r#"collaborators = ["user_b"]"#,
1366                "a.txt": "a-contents",
1367                "b.txt": "b-contents",
1368            }),
1369        )
1370        .await;
1371        let project_a = cx_a.update(|cx| {
1372            Project::local(
1373                client_a.clone(),
1374                client_a.user_store.clone(),
1375                lang_registry.clone(),
1376                fs.clone(),
1377                cx,
1378            )
1379        });
1380        let (worktree_a, _) = project_a
1381            .update(cx_a, |p, cx| {
1382                p.find_or_create_local_worktree("/a", true, cx)
1383            })
1384            .await
1385            .unwrap();
1386        worktree_a
1387            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1388            .await;
1389        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1390        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1391        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1392        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1393
1394        // Join that project as client B
1395        let project_b = Project::remote(
1396            project_id,
1397            client_b.clone(),
1398            client_b.user_store.clone(),
1399            lang_registry.clone(),
1400            fs.clone(),
1401            &mut cx_b.to_async(),
1402        )
1403        .await
1404        .unwrap();
1405        project_b
1406            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1407            .await
1408            .unwrap();
1409
1410        // Unshare the project as client A
1411        project_a.update(cx_a, |project, cx| project.unshare(cx));
1412        project_b
1413            .condition(cx_b, |project, _| project.is_read_only())
1414            .await;
1415        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1416        cx_b.update(|_| {
1417            drop(project_b);
1418        });
1419
1420        // Share the project again and ensure guests can still join.
1421        project_a
1422            .update(cx_a, |project, cx| project.share(cx))
1423            .await
1424            .unwrap();
1425        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1426
1427        let project_b2 = Project::remote(
1428            project_id,
1429            client_b.clone(),
1430            client_b.user_store.clone(),
1431            lang_registry.clone(),
1432            fs.clone(),
1433            &mut cx_b.to_async(),
1434        )
1435        .await
1436        .unwrap();
1437        project_b2
1438            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1439            .await
1440            .unwrap();
1441    }
1442
1443    #[gpui::test(iterations = 10)]
1444    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1445        let lang_registry = Arc::new(LanguageRegistry::test());
1446        let fs = FakeFs::new(cx_a.background());
1447        cx_a.foreground().forbid_parking();
1448
1449        // Connect to a server as 2 clients.
1450        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1451        let client_a = server.create_client(cx_a, "user_a").await;
1452        let client_b = server.create_client(cx_b, "user_b").await;
1453
1454        // Share a project as client A
1455        fs.insert_tree(
1456            "/a",
1457            json!({
1458                ".zed.toml": r#"collaborators = ["user_b"]"#,
1459                "a.txt": "a-contents",
1460                "b.txt": "b-contents",
1461            }),
1462        )
1463        .await;
1464        let project_a = cx_a.update(|cx| {
1465            Project::local(
1466                client_a.clone(),
1467                client_a.user_store.clone(),
1468                lang_registry.clone(),
1469                fs.clone(),
1470                cx,
1471            )
1472        });
1473        let (worktree_a, _) = project_a
1474            .update(cx_a, |p, cx| {
1475                p.find_or_create_local_worktree("/a", true, cx)
1476            })
1477            .await
1478            .unwrap();
1479        worktree_a
1480            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1481            .await;
1482        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1483        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1484        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1485        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1486
1487        // Join that project as client B
1488        let project_b = Project::remote(
1489            project_id,
1490            client_b.clone(),
1491            client_b.user_store.clone(),
1492            lang_registry.clone(),
1493            fs.clone(),
1494            &mut cx_b.to_async(),
1495        )
1496        .await
1497        .unwrap();
1498        project_b
1499            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1500            .await
1501            .unwrap();
1502
1503        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1504        server.disconnect_client(client_a.current_user_id(cx_a));
1505        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1506        project_a
1507            .condition(cx_a, |project, _| project.collaborators().is_empty())
1508            .await;
1509        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1510        project_b
1511            .condition(cx_b, |project, _| project.is_read_only())
1512            .await;
1513        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1514        cx_b.update(|_| {
1515            drop(project_b);
1516        });
1517
1518        // Await reconnection
1519        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1520
1521        // Share the project again and ensure guests can still join.
1522        project_a
1523            .update(cx_a, |project, cx| project.share(cx))
1524            .await
1525            .unwrap();
1526        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1527
1528        let project_b2 = Project::remote(
1529            project_id,
1530            client_b.clone(),
1531            client_b.user_store.clone(),
1532            lang_registry.clone(),
1533            fs.clone(),
1534            &mut cx_b.to_async(),
1535        )
1536        .await
1537        .unwrap();
1538        project_b2
1539            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1540            .await
1541            .unwrap();
1542    }
1543
1544    #[gpui::test(iterations = 10)]
1545    async fn test_propagate_saves_and_fs_changes(
1546        cx_a: &mut TestAppContext,
1547        cx_b: &mut TestAppContext,
1548        cx_c: &mut TestAppContext,
1549    ) {
1550        let lang_registry = Arc::new(LanguageRegistry::test());
1551        let fs = FakeFs::new(cx_a.background());
1552        cx_a.foreground().forbid_parking();
1553
1554        // Connect to a server as 3 clients.
1555        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1556        let client_a = server.create_client(cx_a, "user_a").await;
1557        let client_b = server.create_client(cx_b, "user_b").await;
1558        let client_c = server.create_client(cx_c, "user_c").await;
1559
1560        // Share a worktree as client A.
1561        fs.insert_tree(
1562            "/a",
1563            json!({
1564                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1565                "file1": "",
1566                "file2": ""
1567            }),
1568        )
1569        .await;
1570        let project_a = cx_a.update(|cx| {
1571            Project::local(
1572                client_a.clone(),
1573                client_a.user_store.clone(),
1574                lang_registry.clone(),
1575                fs.clone(),
1576                cx,
1577            )
1578        });
1579        let (worktree_a, _) = project_a
1580            .update(cx_a, |p, cx| {
1581                p.find_or_create_local_worktree("/a", true, cx)
1582            })
1583            .await
1584            .unwrap();
1585        worktree_a
1586            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1587            .await;
1588        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1589        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1590        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1591
1592        // Join that worktree as clients B and C.
1593        let project_b = Project::remote(
1594            project_id,
1595            client_b.clone(),
1596            client_b.user_store.clone(),
1597            lang_registry.clone(),
1598            fs.clone(),
1599            &mut cx_b.to_async(),
1600        )
1601        .await
1602        .unwrap();
1603        let project_c = Project::remote(
1604            project_id,
1605            client_c.clone(),
1606            client_c.user_store.clone(),
1607            lang_registry.clone(),
1608            fs.clone(),
1609            &mut cx_c.to_async(),
1610        )
1611        .await
1612        .unwrap();
1613        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1614        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1615
1616        // Open and edit a buffer as both guests B and C.
1617        let buffer_b = project_b
1618            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1619            .await
1620            .unwrap();
1621        let buffer_c = project_c
1622            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1623            .await
1624            .unwrap();
1625        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1626        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1627
1628        // Open and edit that buffer as the host.
1629        let buffer_a = project_a
1630            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1631            .await
1632            .unwrap();
1633
1634        buffer_a
1635            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1636            .await;
1637        buffer_a.update(cx_a, |buf, cx| {
1638            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1639        });
1640
1641        // Wait for edits to propagate
1642        buffer_a
1643            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1644            .await;
1645        buffer_b
1646            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1647            .await;
1648        buffer_c
1649            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1650            .await;
1651
1652        // Edit the buffer as the host and concurrently save as guest B.
1653        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1654        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1655        save_b.await.unwrap();
1656        assert_eq!(
1657            fs.load("/a/file1".as_ref()).await.unwrap(),
1658            "hi-a, i-am-c, i-am-b, i-am-a"
1659        );
1660        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1661        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1662        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1663
1664        worktree_a.flush_fs_events(cx_a).await;
1665
1666        // Make changes on host's file system, see those changes on guest worktrees.
1667        fs.rename(
1668            "/a/file1".as_ref(),
1669            "/a/file1-renamed".as_ref(),
1670            Default::default(),
1671        )
1672        .await
1673        .unwrap();
1674
1675        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1676            .await
1677            .unwrap();
1678        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1679
1680        worktree_a
1681            .condition(&cx_a, |tree, _| {
1682                tree.paths()
1683                    .map(|p| p.to_string_lossy())
1684                    .collect::<Vec<_>>()
1685                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1686            })
1687            .await;
1688        worktree_b
1689            .condition(&cx_b, |tree, _| {
1690                tree.paths()
1691                    .map(|p| p.to_string_lossy())
1692                    .collect::<Vec<_>>()
1693                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1694            })
1695            .await;
1696        worktree_c
1697            .condition(&cx_c, |tree, _| {
1698                tree.paths()
1699                    .map(|p| p.to_string_lossy())
1700                    .collect::<Vec<_>>()
1701                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1702            })
1703            .await;
1704
1705        // Ensure buffer files are updated as well.
1706        buffer_a
1707            .condition(&cx_a, |buf, _| {
1708                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1709            })
1710            .await;
1711        buffer_b
1712            .condition(&cx_b, |buf, _| {
1713                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1714            })
1715            .await;
1716        buffer_c
1717            .condition(&cx_c, |buf, _| {
1718                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1719            })
1720            .await;
1721    }
1722
1723    #[gpui::test(iterations = 10)]
1724    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1725        cx_a.foreground().forbid_parking();
1726        let lang_registry = Arc::new(LanguageRegistry::test());
1727        let fs = FakeFs::new(cx_a.background());
1728
1729        // Connect to a server as 2 clients.
1730        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1731        let client_a = server.create_client(cx_a, "user_a").await;
1732        let client_b = server.create_client(cx_b, "user_b").await;
1733
1734        // Share a project as client A
1735        fs.insert_tree(
1736            "/dir",
1737            json!({
1738                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1739                "a.txt": "a-contents",
1740            }),
1741        )
1742        .await;
1743
1744        let project_a = cx_a.update(|cx| {
1745            Project::local(
1746                client_a.clone(),
1747                client_a.user_store.clone(),
1748                lang_registry.clone(),
1749                fs.clone(),
1750                cx,
1751            )
1752        });
1753        let (worktree_a, _) = project_a
1754            .update(cx_a, |p, cx| {
1755                p.find_or_create_local_worktree("/dir", true, cx)
1756            })
1757            .await
1758            .unwrap();
1759        worktree_a
1760            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1761            .await;
1762        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1763        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1764        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1765
1766        // Join that project as client B
1767        let project_b = Project::remote(
1768            project_id,
1769            client_b.clone(),
1770            client_b.user_store.clone(),
1771            lang_registry.clone(),
1772            fs.clone(),
1773            &mut cx_b.to_async(),
1774        )
1775        .await
1776        .unwrap();
1777
1778        // Open a buffer as client B
1779        let buffer_b = project_b
1780            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1781            .await
1782            .unwrap();
1783
1784        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1785        buffer_b.read_with(cx_b, |buf, _| {
1786            assert!(buf.is_dirty());
1787            assert!(!buf.has_conflict());
1788        });
1789
1790        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1791        buffer_b
1792            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1793            .await;
1794        buffer_b.read_with(cx_b, |buf, _| {
1795            assert!(!buf.has_conflict());
1796        });
1797
1798        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1799        buffer_b.read_with(cx_b, |buf, _| {
1800            assert!(buf.is_dirty());
1801            assert!(!buf.has_conflict());
1802        });
1803    }
1804
1805    #[gpui::test(iterations = 10)]
1806    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1807        cx_a.foreground().forbid_parking();
1808        let lang_registry = Arc::new(LanguageRegistry::test());
1809        let fs = FakeFs::new(cx_a.background());
1810
1811        // Connect to a server as 2 clients.
1812        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1813        let client_a = server.create_client(cx_a, "user_a").await;
1814        let client_b = server.create_client(cx_b, "user_b").await;
1815
1816        // Share a project as client A
1817        fs.insert_tree(
1818            "/dir",
1819            json!({
1820                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1821                "a.txt": "a-contents",
1822            }),
1823        )
1824        .await;
1825
1826        let project_a = cx_a.update(|cx| {
1827            Project::local(
1828                client_a.clone(),
1829                client_a.user_store.clone(),
1830                lang_registry.clone(),
1831                fs.clone(),
1832                cx,
1833            )
1834        });
1835        let (worktree_a, _) = project_a
1836            .update(cx_a, |p, cx| {
1837                p.find_or_create_local_worktree("/dir", true, cx)
1838            })
1839            .await
1840            .unwrap();
1841        worktree_a
1842            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1843            .await;
1844        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1845        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1846        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1847
1848        // Join that project as client B
1849        let project_b = Project::remote(
1850            project_id,
1851            client_b.clone(),
1852            client_b.user_store.clone(),
1853            lang_registry.clone(),
1854            fs.clone(),
1855            &mut cx_b.to_async(),
1856        )
1857        .await
1858        .unwrap();
1859        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1860
1861        // Open a buffer as client B
1862        let buffer_b = project_b
1863            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1864            .await
1865            .unwrap();
1866        buffer_b.read_with(cx_b, |buf, _| {
1867            assert!(!buf.is_dirty());
1868            assert!(!buf.has_conflict());
1869        });
1870
1871        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1872            .await
1873            .unwrap();
1874        buffer_b
1875            .condition(&cx_b, |buf, _| {
1876                buf.text() == "new contents" && !buf.is_dirty()
1877            })
1878            .await;
1879        buffer_b.read_with(cx_b, |buf, _| {
1880            assert!(!buf.has_conflict());
1881        });
1882    }
1883
1884    #[gpui::test(iterations = 10)]
1885    async fn test_editing_while_guest_opens_buffer(
1886        cx_a: &mut TestAppContext,
1887        cx_b: &mut TestAppContext,
1888    ) {
1889        cx_a.foreground().forbid_parking();
1890        let lang_registry = Arc::new(LanguageRegistry::test());
1891        let fs = FakeFs::new(cx_a.background());
1892
1893        // Connect to a server as 2 clients.
1894        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1895        let client_a = server.create_client(cx_a, "user_a").await;
1896        let client_b = server.create_client(cx_b, "user_b").await;
1897
1898        // Share a project as client A
1899        fs.insert_tree(
1900            "/dir",
1901            json!({
1902                ".zed.toml": r#"collaborators = ["user_b"]"#,
1903                "a.txt": "a-contents",
1904            }),
1905        )
1906        .await;
1907        let project_a = cx_a.update(|cx| {
1908            Project::local(
1909                client_a.clone(),
1910                client_a.user_store.clone(),
1911                lang_registry.clone(),
1912                fs.clone(),
1913                cx,
1914            )
1915        });
1916        let (worktree_a, _) = project_a
1917            .update(cx_a, |p, cx| {
1918                p.find_or_create_local_worktree("/dir", true, cx)
1919            })
1920            .await
1921            .unwrap();
1922        worktree_a
1923            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1924            .await;
1925        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1926        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1927        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1928
1929        // Join that project as client B
1930        let project_b = Project::remote(
1931            project_id,
1932            client_b.clone(),
1933            client_b.user_store.clone(),
1934            lang_registry.clone(),
1935            fs.clone(),
1936            &mut cx_b.to_async(),
1937        )
1938        .await
1939        .unwrap();
1940
1941        // Open a buffer as client A
1942        let buffer_a = project_a
1943            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1944            .await
1945            .unwrap();
1946
1947        // Start opening the same buffer as client B
1948        let buffer_b = cx_b
1949            .background()
1950            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1951
1952        // Edit the buffer as client A while client B is still opening it.
1953        cx_b.background().simulate_random_delay().await;
1954        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1955        cx_b.background().simulate_random_delay().await;
1956        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1957
1958        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1959        let buffer_b = buffer_b.await.unwrap();
1960        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1961    }
1962
1963    #[gpui::test(iterations = 10)]
1964    async fn test_leaving_worktree_while_opening_buffer(
1965        cx_a: &mut TestAppContext,
1966        cx_b: &mut TestAppContext,
1967    ) {
1968        cx_a.foreground().forbid_parking();
1969        let lang_registry = Arc::new(LanguageRegistry::test());
1970        let fs = FakeFs::new(cx_a.background());
1971
1972        // Connect to a server as 2 clients.
1973        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1974        let client_a = server.create_client(cx_a, "user_a").await;
1975        let client_b = server.create_client(cx_b, "user_b").await;
1976
1977        // Share a project as client A
1978        fs.insert_tree(
1979            "/dir",
1980            json!({
1981                ".zed.toml": r#"collaborators = ["user_b"]"#,
1982                "a.txt": "a-contents",
1983            }),
1984        )
1985        .await;
1986        let project_a = cx_a.update(|cx| {
1987            Project::local(
1988                client_a.clone(),
1989                client_a.user_store.clone(),
1990                lang_registry.clone(),
1991                fs.clone(),
1992                cx,
1993            )
1994        });
1995        let (worktree_a, _) = project_a
1996            .update(cx_a, |p, cx| {
1997                p.find_or_create_local_worktree("/dir", true, cx)
1998            })
1999            .await
2000            .unwrap();
2001        worktree_a
2002            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2003            .await;
2004        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2005        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2006        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2007
2008        // Join that project as client B
2009        let project_b = Project::remote(
2010            project_id,
2011            client_b.clone(),
2012            client_b.user_store.clone(),
2013            lang_registry.clone(),
2014            fs.clone(),
2015            &mut cx_b.to_async(),
2016        )
2017        .await
2018        .unwrap();
2019
2020        // See that a guest has joined as client A.
2021        project_a
2022            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2023            .await;
2024
2025        // Begin opening a buffer as client B, but leave the project before the open completes.
2026        let buffer_b = cx_b
2027            .background()
2028            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2029        cx_b.update(|_| drop(project_b));
2030        drop(buffer_b);
2031
2032        // See that the guest has left.
2033        project_a
2034            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2035            .await;
2036    }
2037
2038    #[gpui::test(iterations = 10)]
2039    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2040        cx_a.foreground().forbid_parking();
2041        let lang_registry = Arc::new(LanguageRegistry::test());
2042        let fs = FakeFs::new(cx_a.background());
2043
2044        // Connect to a server as 2 clients.
2045        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2046        let client_a = server.create_client(cx_a, "user_a").await;
2047        let client_b = server.create_client(cx_b, "user_b").await;
2048
2049        // Share a project as client A
2050        fs.insert_tree(
2051            "/a",
2052            json!({
2053                ".zed.toml": r#"collaborators = ["user_b"]"#,
2054                "a.txt": "a-contents",
2055                "b.txt": "b-contents",
2056            }),
2057        )
2058        .await;
2059        let project_a = cx_a.update(|cx| {
2060            Project::local(
2061                client_a.clone(),
2062                client_a.user_store.clone(),
2063                lang_registry.clone(),
2064                fs.clone(),
2065                cx,
2066            )
2067        });
2068        let (worktree_a, _) = project_a
2069            .update(cx_a, |p, cx| {
2070                p.find_or_create_local_worktree("/a", true, cx)
2071            })
2072            .await
2073            .unwrap();
2074        worktree_a
2075            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2076            .await;
2077        let project_id = project_a
2078            .update(cx_a, |project, _| project.next_remote_id())
2079            .await;
2080        project_a
2081            .update(cx_a, |project, cx| project.share(cx))
2082            .await
2083            .unwrap();
2084
2085        // Join that project as client B
2086        let _project_b = Project::remote(
2087            project_id,
2088            client_b.clone(),
2089            client_b.user_store.clone(),
2090            lang_registry.clone(),
2091            fs.clone(),
2092            &mut cx_b.to_async(),
2093        )
2094        .await
2095        .unwrap();
2096
2097        // Client A sees that a guest has joined.
2098        project_a
2099            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2100            .await;
2101
2102        // Drop client B's connection and ensure client A observes client B leaving the project.
2103        client_b.disconnect(&cx_b.to_async()).unwrap();
2104        project_a
2105            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2106            .await;
2107
2108        // Rejoin the project as client B
2109        let _project_b = Project::remote(
2110            project_id,
2111            client_b.clone(),
2112            client_b.user_store.clone(),
2113            lang_registry.clone(),
2114            fs.clone(),
2115            &mut cx_b.to_async(),
2116        )
2117        .await
2118        .unwrap();
2119
2120        // Client A sees that a guest has re-joined.
2121        project_a
2122            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2123            .await;
2124
2125        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2126        client_b.wait_for_current_user(cx_b).await;
2127        server.disconnect_client(client_b.current_user_id(cx_b));
2128        cx_a.foreground().advance_clock(Duration::from_secs(3));
2129        project_a
2130            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2131            .await;
2132    }
2133
2134    #[gpui::test(iterations = 10)]
2135    async fn test_collaborating_with_diagnostics(
2136        cx_a: &mut TestAppContext,
2137        cx_b: &mut TestAppContext,
2138    ) {
2139        cx_a.foreground().forbid_parking();
2140        let lang_registry = Arc::new(LanguageRegistry::test());
2141        let fs = FakeFs::new(cx_a.background());
2142
2143        // Set up a fake language server.
2144        let mut language = Language::new(
2145            LanguageConfig {
2146                name: "Rust".into(),
2147                path_suffixes: vec!["rs".to_string()],
2148                ..Default::default()
2149            },
2150            Some(tree_sitter_rust::language()),
2151        );
2152        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2153        lang_registry.add(Arc::new(language));
2154
2155        // Connect to a server as 2 clients.
2156        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2157        let client_a = server.create_client(cx_a, "user_a").await;
2158        let client_b = server.create_client(cx_b, "user_b").await;
2159
2160        // Share a project as client A
2161        fs.insert_tree(
2162            "/a",
2163            json!({
2164                ".zed.toml": r#"collaborators = ["user_b"]"#,
2165                "a.rs": "let one = two",
2166                "other.rs": "",
2167            }),
2168        )
2169        .await;
2170        let project_a = cx_a.update(|cx| {
2171            Project::local(
2172                client_a.clone(),
2173                client_a.user_store.clone(),
2174                lang_registry.clone(),
2175                fs.clone(),
2176                cx,
2177            )
2178        });
2179        let (worktree_a, _) = project_a
2180            .update(cx_a, |p, cx| {
2181                p.find_or_create_local_worktree("/a", true, cx)
2182            })
2183            .await
2184            .unwrap();
2185        worktree_a
2186            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2187            .await;
2188        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2189        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2190        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2191
2192        // Cause the language server to start.
2193        let _ = cx_a
2194            .background()
2195            .spawn(project_a.update(cx_a, |project, cx| {
2196                project.open_buffer(
2197                    ProjectPath {
2198                        worktree_id,
2199                        path: Path::new("other.rs").into(),
2200                    },
2201                    cx,
2202                )
2203            }))
2204            .await
2205            .unwrap();
2206
2207        // Simulate a language server reporting errors for a file.
2208        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2209        fake_language_server
2210            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2211            .await;
2212        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2213            lsp::PublishDiagnosticsParams {
2214                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2215                version: None,
2216                diagnostics: vec![lsp::Diagnostic {
2217                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2218                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2219                    message: "message 1".to_string(),
2220                    ..Default::default()
2221                }],
2222            },
2223        );
2224
2225        // Wait for server to see the diagnostics update.
2226        server
2227            .condition(|store| {
2228                let worktree = store
2229                    .project(project_id)
2230                    .unwrap()
2231                    .share
2232                    .as_ref()
2233                    .unwrap()
2234                    .worktrees
2235                    .get(&worktree_id.to_proto())
2236                    .unwrap();
2237
2238                !worktree.diagnostic_summaries.is_empty()
2239            })
2240            .await;
2241
2242        // Join the worktree as client B.
2243        let project_b = Project::remote(
2244            project_id,
2245            client_b.clone(),
2246            client_b.user_store.clone(),
2247            lang_registry.clone(),
2248            fs.clone(),
2249            &mut cx_b.to_async(),
2250        )
2251        .await
2252        .unwrap();
2253
2254        project_b.read_with(cx_b, |project, cx| {
2255            assert_eq!(
2256                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2257                &[(
2258                    ProjectPath {
2259                        worktree_id,
2260                        path: Arc::from(Path::new("a.rs")),
2261                    },
2262                    DiagnosticSummary {
2263                        error_count: 1,
2264                        warning_count: 0,
2265                        ..Default::default()
2266                    },
2267                )]
2268            )
2269        });
2270
2271        // Simulate a language server reporting more errors for a file.
2272        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2273            lsp::PublishDiagnosticsParams {
2274                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2275                version: None,
2276                diagnostics: vec![
2277                    lsp::Diagnostic {
2278                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2279                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2280                        message: "message 1".to_string(),
2281                        ..Default::default()
2282                    },
2283                    lsp::Diagnostic {
2284                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2285                        range: lsp::Range::new(
2286                            lsp::Position::new(0, 10),
2287                            lsp::Position::new(0, 13),
2288                        ),
2289                        message: "message 2".to_string(),
2290                        ..Default::default()
2291                    },
2292                ],
2293            },
2294        );
2295
2296        // Client b gets the updated summaries
2297        project_b
2298            .condition(&cx_b, |project, cx| {
2299                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2300                    == &[(
2301                        ProjectPath {
2302                            worktree_id,
2303                            path: Arc::from(Path::new("a.rs")),
2304                        },
2305                        DiagnosticSummary {
2306                            error_count: 1,
2307                            warning_count: 1,
2308                            ..Default::default()
2309                        },
2310                    )]
2311            })
2312            .await;
2313
2314        // Open the file with the errors on client B. They should be present.
2315        let buffer_b = cx_b
2316            .background()
2317            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2318            .await
2319            .unwrap();
2320
2321        buffer_b.read_with(cx_b, |buffer, _| {
2322            assert_eq!(
2323                buffer
2324                    .snapshot()
2325                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2326                    .map(|entry| entry)
2327                    .collect::<Vec<_>>(),
2328                &[
2329                    DiagnosticEntry {
2330                        range: Point::new(0, 4)..Point::new(0, 7),
2331                        diagnostic: Diagnostic {
2332                            group_id: 0,
2333                            message: "message 1".to_string(),
2334                            severity: lsp::DiagnosticSeverity::ERROR,
2335                            is_primary: true,
2336                            ..Default::default()
2337                        }
2338                    },
2339                    DiagnosticEntry {
2340                        range: Point::new(0, 10)..Point::new(0, 13),
2341                        diagnostic: Diagnostic {
2342                            group_id: 1,
2343                            severity: lsp::DiagnosticSeverity::WARNING,
2344                            message: "message 2".to_string(),
2345                            is_primary: true,
2346                            ..Default::default()
2347                        }
2348                    }
2349                ]
2350            );
2351        });
2352    }
2353
2354    #[gpui::test(iterations = 10)]
2355    async fn test_collaborating_with_completion(
2356        cx_a: &mut TestAppContext,
2357        cx_b: &mut TestAppContext,
2358    ) {
2359        cx_a.foreground().forbid_parking();
2360        let lang_registry = Arc::new(LanguageRegistry::test());
2361        let fs = FakeFs::new(cx_a.background());
2362
2363        // Set up a fake language server.
2364        let mut language = Language::new(
2365            LanguageConfig {
2366                name: "Rust".into(),
2367                path_suffixes: vec!["rs".to_string()],
2368                ..Default::default()
2369            },
2370            Some(tree_sitter_rust::language()),
2371        );
2372        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2373            capabilities: lsp::ServerCapabilities {
2374                completion_provider: Some(lsp::CompletionOptions {
2375                    trigger_characters: Some(vec![".".to_string()]),
2376                    ..Default::default()
2377                }),
2378                ..Default::default()
2379            },
2380            ..Default::default()
2381        });
2382        lang_registry.add(Arc::new(language));
2383
2384        // Connect to a server as 2 clients.
2385        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2386        let client_a = server.create_client(cx_a, "user_a").await;
2387        let client_b = server.create_client(cx_b, "user_b").await;
2388
2389        // Share a project as client A
2390        fs.insert_tree(
2391            "/a",
2392            json!({
2393                ".zed.toml": r#"collaborators = ["user_b"]"#,
2394                "main.rs": "fn main() { a }",
2395                "other.rs": "",
2396            }),
2397        )
2398        .await;
2399        let project_a = cx_a.update(|cx| {
2400            Project::local(
2401                client_a.clone(),
2402                client_a.user_store.clone(),
2403                lang_registry.clone(),
2404                fs.clone(),
2405                cx,
2406            )
2407        });
2408        let (worktree_a, _) = project_a
2409            .update(cx_a, |p, cx| {
2410                p.find_or_create_local_worktree("/a", true, cx)
2411            })
2412            .await
2413            .unwrap();
2414        worktree_a
2415            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2416            .await;
2417        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2418        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2419        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2420
2421        // Join the worktree as client B.
2422        let project_b = Project::remote(
2423            project_id,
2424            client_b.clone(),
2425            client_b.user_store.clone(),
2426            lang_registry.clone(),
2427            fs.clone(),
2428            &mut cx_b.to_async(),
2429        )
2430        .await
2431        .unwrap();
2432
2433        // Open a file in an editor as the guest.
2434        let buffer_b = project_b
2435            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2436            .await
2437            .unwrap();
2438        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2439        let editor_b = cx_b.add_view(window_b, |cx| {
2440            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2441        });
2442
2443        let fake_language_server = fake_language_servers.next().await.unwrap();
2444        buffer_b
2445            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2446            .await;
2447
2448        // Type a completion trigger character as the guest.
2449        editor_b.update(cx_b, |editor, cx| {
2450            editor.select_ranges([13..13], None, cx);
2451            editor.handle_input(&Input(".".into()), cx);
2452            cx.focus(&editor_b);
2453        });
2454
2455        // Receive a completion request as the host's language server.
2456        // Return some completions from the host's language server.
2457        cx_a.foreground().start_waiting();
2458        fake_language_server
2459            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2460                assert_eq!(
2461                    params.text_document_position.text_document.uri,
2462                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2463                );
2464                assert_eq!(
2465                    params.text_document_position.position,
2466                    lsp::Position::new(0, 14),
2467                );
2468
2469                Ok(Some(lsp::CompletionResponse::Array(vec![
2470                    lsp::CompletionItem {
2471                        label: "first_method(…)".into(),
2472                        detail: Some("fn(&mut self, B) -> C".into()),
2473                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2474                            new_text: "first_method($1)".to_string(),
2475                            range: lsp::Range::new(
2476                                lsp::Position::new(0, 14),
2477                                lsp::Position::new(0, 14),
2478                            ),
2479                        })),
2480                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2481                        ..Default::default()
2482                    },
2483                    lsp::CompletionItem {
2484                        label: "second_method(…)".into(),
2485                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2486                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2487                            new_text: "second_method()".to_string(),
2488                            range: lsp::Range::new(
2489                                lsp::Position::new(0, 14),
2490                                lsp::Position::new(0, 14),
2491                            ),
2492                        })),
2493                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2494                        ..Default::default()
2495                    },
2496                ])))
2497            })
2498            .next()
2499            .await
2500            .unwrap();
2501        cx_a.foreground().finish_waiting();
2502
2503        // Open the buffer on the host.
2504        let buffer_a = project_a
2505            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2506            .await
2507            .unwrap();
2508        buffer_a
2509            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2510            .await;
2511
2512        // Confirm a completion on the guest.
2513        editor_b
2514            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2515            .await;
2516        editor_b.update(cx_b, |editor, cx| {
2517            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2518            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2519        });
2520
2521        // Return a resolved completion from the host's language server.
2522        // The resolved completion has an additional text edit.
2523        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2524            |params, _| async move {
2525                assert_eq!(params.label, "first_method(…)");
2526                Ok(lsp::CompletionItem {
2527                    label: "first_method(…)".into(),
2528                    detail: Some("fn(&mut self, B) -> C".into()),
2529                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2530                        new_text: "first_method($1)".to_string(),
2531                        range: lsp::Range::new(
2532                            lsp::Position::new(0, 14),
2533                            lsp::Position::new(0, 14),
2534                        ),
2535                    })),
2536                    additional_text_edits: Some(vec![lsp::TextEdit {
2537                        new_text: "use d::SomeTrait;\n".to_string(),
2538                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2539                    }]),
2540                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2541                    ..Default::default()
2542                })
2543            },
2544        );
2545
2546        // The additional edit is applied.
2547        buffer_a
2548            .condition(&cx_a, |buffer, _| {
2549                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2550            })
2551            .await;
2552        buffer_b
2553            .condition(&cx_b, |buffer, _| {
2554                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2555            })
2556            .await;
2557    }
2558
2559    #[gpui::test(iterations = 10)]
2560    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2561        cx_a.foreground().forbid_parking();
2562        let lang_registry = Arc::new(LanguageRegistry::test());
2563        let fs = FakeFs::new(cx_a.background());
2564
2565        // Connect to a server as 2 clients.
2566        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2567        let client_a = server.create_client(cx_a, "user_a").await;
2568        let client_b = server.create_client(cx_b, "user_b").await;
2569
2570        // Share a project as client A
2571        fs.insert_tree(
2572            "/a",
2573            json!({
2574                ".zed.toml": r#"collaborators = ["user_b"]"#,
2575                "a.rs": "let one = 1;",
2576            }),
2577        )
2578        .await;
2579        let project_a = cx_a.update(|cx| {
2580            Project::local(
2581                client_a.clone(),
2582                client_a.user_store.clone(),
2583                lang_registry.clone(),
2584                fs.clone(),
2585                cx,
2586            )
2587        });
2588        let (worktree_a, _) = project_a
2589            .update(cx_a, |p, cx| {
2590                p.find_or_create_local_worktree("/a", true, cx)
2591            })
2592            .await
2593            .unwrap();
2594        worktree_a
2595            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2596            .await;
2597        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2598        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2599        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2600        let buffer_a = project_a
2601            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2602            .await
2603            .unwrap();
2604
2605        // Join the worktree as client B.
2606        let project_b = Project::remote(
2607            project_id,
2608            client_b.clone(),
2609            client_b.user_store.clone(),
2610            lang_registry.clone(),
2611            fs.clone(),
2612            &mut cx_b.to_async(),
2613        )
2614        .await
2615        .unwrap();
2616
2617        let buffer_b = cx_b
2618            .background()
2619            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2620            .await
2621            .unwrap();
2622        buffer_b.update(cx_b, |buffer, cx| {
2623            buffer.edit([4..7], "six", cx);
2624            buffer.edit([10..11], "6", cx);
2625            assert_eq!(buffer.text(), "let six = 6;");
2626            assert!(buffer.is_dirty());
2627            assert!(!buffer.has_conflict());
2628        });
2629        buffer_a
2630            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
2631            .await;
2632
2633        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
2634            .await
2635            .unwrap();
2636        buffer_a
2637            .condition(cx_a, |buffer, _| buffer.has_conflict())
2638            .await;
2639        buffer_b
2640            .condition(cx_b, |buffer, _| buffer.has_conflict())
2641            .await;
2642
2643        project_b
2644            .update(cx_b, |project, cx| {
2645                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
2646            })
2647            .await
2648            .unwrap();
2649        buffer_a.read_with(cx_a, |buffer, _| {
2650            assert_eq!(buffer.text(), "let seven = 7;");
2651            assert!(!buffer.is_dirty());
2652            assert!(!buffer.has_conflict());
2653        });
2654        buffer_b.read_with(cx_b, |buffer, _| {
2655            assert_eq!(buffer.text(), "let seven = 7;");
2656            assert!(!buffer.is_dirty());
2657            assert!(!buffer.has_conflict());
2658        });
2659
2660        buffer_a.update(cx_a, |buffer, cx| {
2661            // Undoing on the host is a no-op when the reload was initiated by the guest.
2662            buffer.undo(cx);
2663            assert_eq!(buffer.text(), "let seven = 7;");
2664            assert!(!buffer.is_dirty());
2665            assert!(!buffer.has_conflict());
2666        });
2667        buffer_b.update(cx_b, |buffer, cx| {
2668            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
2669            buffer.undo(cx);
2670            assert_eq!(buffer.text(), "let six = 6;");
2671            assert!(buffer.is_dirty());
2672            assert!(!buffer.has_conflict());
2673        });
2674    }
2675
2676    #[gpui::test(iterations = 10)]
2677    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2678        cx_a.foreground().forbid_parking();
2679        let lang_registry = Arc::new(LanguageRegistry::test());
2680        let fs = FakeFs::new(cx_a.background());
2681
2682        // Set up a fake language server.
2683        let mut language = Language::new(
2684            LanguageConfig {
2685                name: "Rust".into(),
2686                path_suffixes: vec!["rs".to_string()],
2687                ..Default::default()
2688            },
2689            Some(tree_sitter_rust::language()),
2690        );
2691        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2692        lang_registry.add(Arc::new(language));
2693
2694        // Connect to a server as 2 clients.
2695        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2696        let client_a = server.create_client(cx_a, "user_a").await;
2697        let client_b = server.create_client(cx_b, "user_b").await;
2698
2699        // Share a project as client A
2700        fs.insert_tree(
2701            "/a",
2702            json!({
2703                ".zed.toml": r#"collaborators = ["user_b"]"#,
2704                "a.rs": "let one = two",
2705            }),
2706        )
2707        .await;
2708        let project_a = cx_a.update(|cx| {
2709            Project::local(
2710                client_a.clone(),
2711                client_a.user_store.clone(),
2712                lang_registry.clone(),
2713                fs.clone(),
2714                cx,
2715            )
2716        });
2717        let (worktree_a, _) = project_a
2718            .update(cx_a, |p, cx| {
2719                p.find_or_create_local_worktree("/a", true, cx)
2720            })
2721            .await
2722            .unwrap();
2723        worktree_a
2724            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2725            .await;
2726        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2727        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2728        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2729
2730        // Join the worktree as client B.
2731        let project_b = Project::remote(
2732            project_id,
2733            client_b.clone(),
2734            client_b.user_store.clone(),
2735            lang_registry.clone(),
2736            fs.clone(),
2737            &mut cx_b.to_async(),
2738        )
2739        .await
2740        .unwrap();
2741
2742        let buffer_b = cx_b
2743            .background()
2744            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2745            .await
2746            .unwrap();
2747
2748        let fake_language_server = fake_language_servers.next().await.unwrap();
2749        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
2750            Ok(Some(vec![
2751                lsp::TextEdit {
2752                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2753                    new_text: "h".to_string(),
2754                },
2755                lsp::TextEdit {
2756                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2757                    new_text: "y".to_string(),
2758                },
2759            ]))
2760        });
2761
2762        project_b
2763            .update(cx_b, |project, cx| {
2764                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2765            })
2766            .await
2767            .unwrap();
2768        assert_eq!(
2769            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2770            "let honey = two"
2771        );
2772    }
2773
2774    #[gpui::test(iterations = 10)]
2775    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2776        cx_a.foreground().forbid_parking();
2777        let lang_registry = Arc::new(LanguageRegistry::test());
2778        let fs = FakeFs::new(cx_a.background());
2779        fs.insert_tree(
2780            "/root-1",
2781            json!({
2782                ".zed.toml": r#"collaborators = ["user_b"]"#,
2783                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2784            }),
2785        )
2786        .await;
2787        fs.insert_tree(
2788            "/root-2",
2789            json!({
2790                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2791            }),
2792        )
2793        .await;
2794
2795        // Set up a fake language server.
2796        let mut language = Language::new(
2797            LanguageConfig {
2798                name: "Rust".into(),
2799                path_suffixes: vec!["rs".to_string()],
2800                ..Default::default()
2801            },
2802            Some(tree_sitter_rust::language()),
2803        );
2804        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2805        lang_registry.add(Arc::new(language));
2806
2807        // Connect to a server as 2 clients.
2808        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2809        let client_a = server.create_client(cx_a, "user_a").await;
2810        let client_b = server.create_client(cx_b, "user_b").await;
2811
2812        // Share a project as client A
2813        let project_a = cx_a.update(|cx| {
2814            Project::local(
2815                client_a.clone(),
2816                client_a.user_store.clone(),
2817                lang_registry.clone(),
2818                fs.clone(),
2819                cx,
2820            )
2821        });
2822        let (worktree_a, _) = project_a
2823            .update(cx_a, |p, cx| {
2824                p.find_or_create_local_worktree("/root-1", true, cx)
2825            })
2826            .await
2827            .unwrap();
2828        worktree_a
2829            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2830            .await;
2831        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2832        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2833        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2834
2835        // Join the worktree as client B.
2836        let project_b = Project::remote(
2837            project_id,
2838            client_b.clone(),
2839            client_b.user_store.clone(),
2840            lang_registry.clone(),
2841            fs.clone(),
2842            &mut cx_b.to_async(),
2843        )
2844        .await
2845        .unwrap();
2846
2847        // Open the file on client B.
2848        let buffer_b = cx_b
2849            .background()
2850            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2851            .await
2852            .unwrap();
2853
2854        // Request the definition of a symbol as the guest.
2855        let fake_language_server = fake_language_servers.next().await.unwrap();
2856        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2857            |_, _| async move {
2858                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2859                    lsp::Location::new(
2860                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2861                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2862                    ),
2863                )))
2864            },
2865        );
2866
2867        let definitions_1 = project_b
2868            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2869            .await
2870            .unwrap();
2871        cx_b.read(|cx| {
2872            assert_eq!(definitions_1.len(), 1);
2873            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2874            let target_buffer = definitions_1[0].buffer.read(cx);
2875            assert_eq!(
2876                target_buffer.text(),
2877                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2878            );
2879            assert_eq!(
2880                definitions_1[0].range.to_point(target_buffer),
2881                Point::new(0, 6)..Point::new(0, 9)
2882            );
2883        });
2884
2885        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2886        // the previous call to `definition`.
2887        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
2888            |_, _| async move {
2889                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
2890                    lsp::Location::new(
2891                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2892                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2893                    ),
2894                )))
2895            },
2896        );
2897
2898        let definitions_2 = project_b
2899            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2900            .await
2901            .unwrap();
2902        cx_b.read(|cx| {
2903            assert_eq!(definitions_2.len(), 1);
2904            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2905            let target_buffer = definitions_2[0].buffer.read(cx);
2906            assert_eq!(
2907                target_buffer.text(),
2908                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2909            );
2910            assert_eq!(
2911                definitions_2[0].range.to_point(target_buffer),
2912                Point::new(1, 6)..Point::new(1, 11)
2913            );
2914        });
2915        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2916    }
2917
2918    #[gpui::test(iterations = 10)]
2919    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2920        cx_a.foreground().forbid_parking();
2921        let lang_registry = Arc::new(LanguageRegistry::test());
2922        let fs = FakeFs::new(cx_a.background());
2923        fs.insert_tree(
2924            "/root-1",
2925            json!({
2926                ".zed.toml": r#"collaborators = ["user_b"]"#,
2927                "one.rs": "const ONE: usize = 1;",
2928                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2929            }),
2930        )
2931        .await;
2932        fs.insert_tree(
2933            "/root-2",
2934            json!({
2935                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2936            }),
2937        )
2938        .await;
2939
2940        // Set up a fake language server.
2941        let mut language = Language::new(
2942            LanguageConfig {
2943                name: "Rust".into(),
2944                path_suffixes: vec!["rs".to_string()],
2945                ..Default::default()
2946            },
2947            Some(tree_sitter_rust::language()),
2948        );
2949        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2950        lang_registry.add(Arc::new(language));
2951
2952        // Connect to a server as 2 clients.
2953        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2954        let client_a = server.create_client(cx_a, "user_a").await;
2955        let client_b = server.create_client(cx_b, "user_b").await;
2956
2957        // Share a project as client A
2958        let project_a = cx_a.update(|cx| {
2959            Project::local(
2960                client_a.clone(),
2961                client_a.user_store.clone(),
2962                lang_registry.clone(),
2963                fs.clone(),
2964                cx,
2965            )
2966        });
2967        let (worktree_a, _) = project_a
2968            .update(cx_a, |p, cx| {
2969                p.find_or_create_local_worktree("/root-1", true, cx)
2970            })
2971            .await
2972            .unwrap();
2973        worktree_a
2974            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2975            .await;
2976        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2977        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2978        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2979
2980        // Join the worktree as client B.
2981        let project_b = Project::remote(
2982            project_id,
2983            client_b.clone(),
2984            client_b.user_store.clone(),
2985            lang_registry.clone(),
2986            fs.clone(),
2987            &mut cx_b.to_async(),
2988        )
2989        .await
2990        .unwrap();
2991
2992        // Open the file on client B.
2993        let buffer_b = cx_b
2994            .background()
2995            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2996            .await
2997            .unwrap();
2998
2999        // Request references to a symbol as the guest.
3000        let fake_language_server = fake_language_servers.next().await.unwrap();
3001        fake_language_server.handle_request::<lsp::request::References, _, _>(
3002            |params, _| async move {
3003                assert_eq!(
3004                    params.text_document_position.text_document.uri.as_str(),
3005                    "file:///root-1/one.rs"
3006                );
3007                Ok(Some(vec![
3008                    lsp::Location {
3009                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3010                        range: lsp::Range::new(
3011                            lsp::Position::new(0, 24),
3012                            lsp::Position::new(0, 27),
3013                        ),
3014                    },
3015                    lsp::Location {
3016                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3017                        range: lsp::Range::new(
3018                            lsp::Position::new(0, 35),
3019                            lsp::Position::new(0, 38),
3020                        ),
3021                    },
3022                    lsp::Location {
3023                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3024                        range: lsp::Range::new(
3025                            lsp::Position::new(0, 37),
3026                            lsp::Position::new(0, 40),
3027                        ),
3028                    },
3029                ]))
3030            },
3031        );
3032
3033        let references = project_b
3034            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3035            .await
3036            .unwrap();
3037        cx_b.read(|cx| {
3038            assert_eq!(references.len(), 3);
3039            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3040
3041            let two_buffer = references[0].buffer.read(cx);
3042            let three_buffer = references[2].buffer.read(cx);
3043            assert_eq!(
3044                two_buffer.file().unwrap().path().as_ref(),
3045                Path::new("two.rs")
3046            );
3047            assert_eq!(references[1].buffer, references[0].buffer);
3048            assert_eq!(
3049                three_buffer.file().unwrap().full_path(cx),
3050                Path::new("three.rs")
3051            );
3052
3053            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3054            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3055            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3056        });
3057    }
3058
3059    #[gpui::test(iterations = 10)]
3060    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3061        cx_a.foreground().forbid_parking();
3062        let lang_registry = Arc::new(LanguageRegistry::test());
3063        let fs = FakeFs::new(cx_a.background());
3064        fs.insert_tree(
3065            "/root-1",
3066            json!({
3067                ".zed.toml": r#"collaborators = ["user_b"]"#,
3068                "a": "hello world",
3069                "b": "goodnight moon",
3070                "c": "a world of goo",
3071                "d": "world champion of clown world",
3072            }),
3073        )
3074        .await;
3075        fs.insert_tree(
3076            "/root-2",
3077            json!({
3078                "e": "disney world is fun",
3079            }),
3080        )
3081        .await;
3082
3083        // Connect to a server as 2 clients.
3084        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3085        let client_a = server.create_client(cx_a, "user_a").await;
3086        let client_b = server.create_client(cx_b, "user_b").await;
3087
3088        // Share a project as client A
3089        let project_a = cx_a.update(|cx| {
3090            Project::local(
3091                client_a.clone(),
3092                client_a.user_store.clone(),
3093                lang_registry.clone(),
3094                fs.clone(),
3095                cx,
3096            )
3097        });
3098        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3099
3100        let (worktree_1, _) = project_a
3101            .update(cx_a, |p, cx| {
3102                p.find_or_create_local_worktree("/root-1", true, cx)
3103            })
3104            .await
3105            .unwrap();
3106        worktree_1
3107            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3108            .await;
3109        let (worktree_2, _) = project_a
3110            .update(cx_a, |p, cx| {
3111                p.find_or_create_local_worktree("/root-2", true, cx)
3112            })
3113            .await
3114            .unwrap();
3115        worktree_2
3116            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3117            .await;
3118
3119        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3120
3121        // Join the worktree as client B.
3122        let project_b = Project::remote(
3123            project_id,
3124            client_b.clone(),
3125            client_b.user_store.clone(),
3126            lang_registry.clone(),
3127            fs.clone(),
3128            &mut cx_b.to_async(),
3129        )
3130        .await
3131        .unwrap();
3132
3133        let results = project_b
3134            .update(cx_b, |project, cx| {
3135                project.search(SearchQuery::text("world", false, false), cx)
3136            })
3137            .await
3138            .unwrap();
3139
3140        let mut ranges_by_path = results
3141            .into_iter()
3142            .map(|(buffer, ranges)| {
3143                buffer.read_with(cx_b, |buffer, cx| {
3144                    let path = buffer.file().unwrap().full_path(cx);
3145                    let offset_ranges = ranges
3146                        .into_iter()
3147                        .map(|range| range.to_offset(buffer))
3148                        .collect::<Vec<_>>();
3149                    (path, offset_ranges)
3150                })
3151            })
3152            .collect::<Vec<_>>();
3153        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3154
3155        assert_eq!(
3156            ranges_by_path,
3157            &[
3158                (PathBuf::from("root-1/a"), vec![6..11]),
3159                (PathBuf::from("root-1/c"), vec![2..7]),
3160                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3161                (PathBuf::from("root-2/e"), vec![7..12]),
3162            ]
3163        );
3164    }
3165
3166    #[gpui::test(iterations = 10)]
3167    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3168        cx_a.foreground().forbid_parking();
3169        let lang_registry = Arc::new(LanguageRegistry::test());
3170        let fs = FakeFs::new(cx_a.background());
3171        fs.insert_tree(
3172            "/root-1",
3173            json!({
3174                ".zed.toml": r#"collaborators = ["user_b"]"#,
3175                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3176            }),
3177        )
3178        .await;
3179
3180        // Set up a fake language server.
3181        let mut language = Language::new(
3182            LanguageConfig {
3183                name: "Rust".into(),
3184                path_suffixes: vec!["rs".to_string()],
3185                ..Default::default()
3186            },
3187            Some(tree_sitter_rust::language()),
3188        );
3189        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3190        lang_registry.add(Arc::new(language));
3191
3192        // Connect to a server as 2 clients.
3193        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3194        let client_a = server.create_client(cx_a, "user_a").await;
3195        let client_b = server.create_client(cx_b, "user_b").await;
3196
3197        // Share a project as client A
3198        let project_a = cx_a.update(|cx| {
3199            Project::local(
3200                client_a.clone(),
3201                client_a.user_store.clone(),
3202                lang_registry.clone(),
3203                fs.clone(),
3204                cx,
3205            )
3206        });
3207        let (worktree_a, _) = project_a
3208            .update(cx_a, |p, cx| {
3209                p.find_or_create_local_worktree("/root-1", true, cx)
3210            })
3211            .await
3212            .unwrap();
3213        worktree_a
3214            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3215            .await;
3216        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3217        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3218        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3219
3220        // Join the worktree as client B.
3221        let project_b = Project::remote(
3222            project_id,
3223            client_b.clone(),
3224            client_b.user_store.clone(),
3225            lang_registry.clone(),
3226            fs.clone(),
3227            &mut cx_b.to_async(),
3228        )
3229        .await
3230        .unwrap();
3231
3232        // Open the file on client B.
3233        let buffer_b = cx_b
3234            .background()
3235            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3236            .await
3237            .unwrap();
3238
3239        // Request document highlights as the guest.
3240        let fake_language_server = fake_language_servers.next().await.unwrap();
3241        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3242            |params, _| async move {
3243                assert_eq!(
3244                    params
3245                        .text_document_position_params
3246                        .text_document
3247                        .uri
3248                        .as_str(),
3249                    "file:///root-1/main.rs"
3250                );
3251                assert_eq!(
3252                    params.text_document_position_params.position,
3253                    lsp::Position::new(0, 34)
3254                );
3255                Ok(Some(vec![
3256                    lsp::DocumentHighlight {
3257                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3258                        range: lsp::Range::new(
3259                            lsp::Position::new(0, 10),
3260                            lsp::Position::new(0, 16),
3261                        ),
3262                    },
3263                    lsp::DocumentHighlight {
3264                        kind: Some(lsp::DocumentHighlightKind::READ),
3265                        range: lsp::Range::new(
3266                            lsp::Position::new(0, 32),
3267                            lsp::Position::new(0, 38),
3268                        ),
3269                    },
3270                    lsp::DocumentHighlight {
3271                        kind: Some(lsp::DocumentHighlightKind::READ),
3272                        range: lsp::Range::new(
3273                            lsp::Position::new(0, 41),
3274                            lsp::Position::new(0, 47),
3275                        ),
3276                    },
3277                ]))
3278            },
3279        );
3280
3281        let highlights = project_b
3282            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3283            .await
3284            .unwrap();
3285        buffer_b.read_with(cx_b, |buffer, _| {
3286            let snapshot = buffer.snapshot();
3287
3288            let highlights = highlights
3289                .into_iter()
3290                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3291                .collect::<Vec<_>>();
3292            assert_eq!(
3293                highlights,
3294                &[
3295                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3296                    (lsp::DocumentHighlightKind::READ, 32..38),
3297                    (lsp::DocumentHighlightKind::READ, 41..47)
3298                ]
3299            )
3300        });
3301    }
3302
3303    #[gpui::test(iterations = 10)]
3304    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3305        cx_a.foreground().forbid_parking();
3306        let lang_registry = Arc::new(LanguageRegistry::test());
3307        let fs = FakeFs::new(cx_a.background());
3308        fs.insert_tree(
3309            "/code",
3310            json!({
3311                "crate-1": {
3312                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3313                    "one.rs": "const ONE: usize = 1;",
3314                },
3315                "crate-2": {
3316                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3317                },
3318                "private": {
3319                    "passwords.txt": "the-password",
3320                }
3321            }),
3322        )
3323        .await;
3324
3325        // Set up a fake language server.
3326        let mut language = Language::new(
3327            LanguageConfig {
3328                name: "Rust".into(),
3329                path_suffixes: vec!["rs".to_string()],
3330                ..Default::default()
3331            },
3332            Some(tree_sitter_rust::language()),
3333        );
3334        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3335        lang_registry.add(Arc::new(language));
3336
3337        // Connect to a server as 2 clients.
3338        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3339        let client_a = server.create_client(cx_a, "user_a").await;
3340        let client_b = server.create_client(cx_b, "user_b").await;
3341
3342        // Share a project as client A
3343        let project_a = cx_a.update(|cx| {
3344            Project::local(
3345                client_a.clone(),
3346                client_a.user_store.clone(),
3347                lang_registry.clone(),
3348                fs.clone(),
3349                cx,
3350            )
3351        });
3352        let (worktree_a, _) = project_a
3353            .update(cx_a, |p, cx| {
3354                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3355            })
3356            .await
3357            .unwrap();
3358        worktree_a
3359            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3360            .await;
3361        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3362        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3363        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3364
3365        // Join the worktree as client B.
3366        let project_b = Project::remote(
3367            project_id,
3368            client_b.clone(),
3369            client_b.user_store.clone(),
3370            lang_registry.clone(),
3371            fs.clone(),
3372            &mut cx_b.to_async(),
3373        )
3374        .await
3375        .unwrap();
3376
3377        // Cause the language server to start.
3378        let _buffer = cx_b
3379            .background()
3380            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3381            .await
3382            .unwrap();
3383
3384        let fake_language_server = fake_language_servers.next().await.unwrap();
3385        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3386            |_, _| async move {
3387                #[allow(deprecated)]
3388                Ok(Some(vec![lsp::SymbolInformation {
3389                    name: "TWO".into(),
3390                    location: lsp::Location {
3391                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3392                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3393                    },
3394                    kind: lsp::SymbolKind::CONSTANT,
3395                    tags: None,
3396                    container_name: None,
3397                    deprecated: None,
3398                }]))
3399            },
3400        );
3401
3402        // Request the definition of a symbol as the guest.
3403        let symbols = project_b
3404            .update(cx_b, |p, cx| p.symbols("two", cx))
3405            .await
3406            .unwrap();
3407        assert_eq!(symbols.len(), 1);
3408        assert_eq!(symbols[0].name, "TWO");
3409
3410        // Open one of the returned symbols.
3411        let buffer_b_2 = project_b
3412            .update(cx_b, |project, cx| {
3413                project.open_buffer_for_symbol(&symbols[0], cx)
3414            })
3415            .await
3416            .unwrap();
3417        buffer_b_2.read_with(cx_b, |buffer, _| {
3418            assert_eq!(
3419                buffer.file().unwrap().path().as_ref(),
3420                Path::new("../crate-2/two.rs")
3421            );
3422        });
3423
3424        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3425        let mut fake_symbol = symbols[0].clone();
3426        fake_symbol.path = Path::new("/code/secrets").into();
3427        let error = project_b
3428            .update(cx_b, |project, cx| {
3429                project.open_buffer_for_symbol(&fake_symbol, cx)
3430            })
3431            .await
3432            .unwrap_err();
3433        assert!(error.to_string().contains("invalid symbol signature"));
3434    }
3435
3436    #[gpui::test(iterations = 10)]
3437    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3438        cx_a: &mut TestAppContext,
3439        cx_b: &mut TestAppContext,
3440        mut rng: StdRng,
3441    ) {
3442        cx_a.foreground().forbid_parking();
3443        let lang_registry = Arc::new(LanguageRegistry::test());
3444        let fs = FakeFs::new(cx_a.background());
3445        fs.insert_tree(
3446            "/root",
3447            json!({
3448                ".zed.toml": r#"collaborators = ["user_b"]"#,
3449                "a.rs": "const ONE: usize = b::TWO;",
3450                "b.rs": "const TWO: usize = 2",
3451            }),
3452        )
3453        .await;
3454
3455        // Set up a fake language server.
3456        let mut language = Language::new(
3457            LanguageConfig {
3458                name: "Rust".into(),
3459                path_suffixes: vec!["rs".to_string()],
3460                ..Default::default()
3461            },
3462            Some(tree_sitter_rust::language()),
3463        );
3464        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3465        lang_registry.add(Arc::new(language));
3466
3467        // Connect to a server as 2 clients.
3468        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3469        let client_a = server.create_client(cx_a, "user_a").await;
3470        let client_b = server.create_client(cx_b, "user_b").await;
3471
3472        // Share a project as client A
3473        let project_a = cx_a.update(|cx| {
3474            Project::local(
3475                client_a.clone(),
3476                client_a.user_store.clone(),
3477                lang_registry.clone(),
3478                fs.clone(),
3479                cx,
3480            )
3481        });
3482
3483        let (worktree_a, _) = project_a
3484            .update(cx_a, |p, cx| {
3485                p.find_or_create_local_worktree("/root", true, cx)
3486            })
3487            .await
3488            .unwrap();
3489        worktree_a
3490            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3491            .await;
3492        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3493        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3494        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3495
3496        // Join the worktree as client B.
3497        let project_b = Project::remote(
3498            project_id,
3499            client_b.clone(),
3500            client_b.user_store.clone(),
3501            lang_registry.clone(),
3502            fs.clone(),
3503            &mut cx_b.to_async(),
3504        )
3505        .await
3506        .unwrap();
3507
3508        let buffer_b1 = cx_b
3509            .background()
3510            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3511            .await
3512            .unwrap();
3513
3514        let fake_language_server = fake_language_servers.next().await.unwrap();
3515        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3516            |_, _| async move {
3517                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3518                    lsp::Location::new(
3519                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3520                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3521                    ),
3522                )))
3523            },
3524        );
3525
3526        let definitions;
3527        let buffer_b2;
3528        if rng.gen() {
3529            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3530            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3531        } else {
3532            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3533            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3534        }
3535
3536        let buffer_b2 = buffer_b2.await.unwrap();
3537        let definitions = definitions.await.unwrap();
3538        assert_eq!(definitions.len(), 1);
3539        assert_eq!(definitions[0].buffer, buffer_b2);
3540    }
3541
3542    #[gpui::test(iterations = 10)]
3543    async fn test_collaborating_with_code_actions(
3544        cx_a: &mut TestAppContext,
3545        cx_b: &mut TestAppContext,
3546    ) {
3547        cx_a.foreground().forbid_parking();
3548        let lang_registry = Arc::new(LanguageRegistry::test());
3549        let fs = FakeFs::new(cx_a.background());
3550        cx_b.update(|cx| editor::init(cx));
3551
3552        // Set up a fake language server.
3553        let mut language = Language::new(
3554            LanguageConfig {
3555                name: "Rust".into(),
3556                path_suffixes: vec!["rs".to_string()],
3557                ..Default::default()
3558            },
3559            Some(tree_sitter_rust::language()),
3560        );
3561        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3562        lang_registry.add(Arc::new(language));
3563
3564        // Connect to a server as 2 clients.
3565        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3566        let client_a = server.create_client(cx_a, "user_a").await;
3567        let client_b = server.create_client(cx_b, "user_b").await;
3568
3569        // Share a project as client A
3570        fs.insert_tree(
3571            "/a",
3572            json!({
3573                ".zed.toml": r#"collaborators = ["user_b"]"#,
3574                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3575                "other.rs": "pub fn foo() -> usize { 4 }",
3576            }),
3577        )
3578        .await;
3579        let project_a = cx_a.update(|cx| {
3580            Project::local(
3581                client_a.clone(),
3582                client_a.user_store.clone(),
3583                lang_registry.clone(),
3584                fs.clone(),
3585                cx,
3586            )
3587        });
3588        let (worktree_a, _) = project_a
3589            .update(cx_a, |p, cx| {
3590                p.find_or_create_local_worktree("/a", true, cx)
3591            })
3592            .await
3593            .unwrap();
3594        worktree_a
3595            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3596            .await;
3597        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3598        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3599        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3600
3601        // Join the worktree as client B.
3602        let project_b = Project::remote(
3603            project_id,
3604            client_b.clone(),
3605            client_b.user_store.clone(),
3606            lang_registry.clone(),
3607            fs.clone(),
3608            &mut cx_b.to_async(),
3609        )
3610        .await
3611        .unwrap();
3612        let mut params = cx_b.update(WorkspaceParams::test);
3613        params.languages = lang_registry.clone();
3614        params.client = client_b.client.clone();
3615        params.user_store = client_b.user_store.clone();
3616        params.project = project_b;
3617
3618        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3619        let editor_b = workspace_b
3620            .update(cx_b, |workspace, cx| {
3621                workspace.open_path((worktree_id, "main.rs"), cx)
3622            })
3623            .await
3624            .unwrap()
3625            .downcast::<Editor>()
3626            .unwrap();
3627
3628        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3629        fake_language_server
3630            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3631                assert_eq!(
3632                    params.text_document.uri,
3633                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3634                );
3635                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3636                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3637                Ok(None)
3638            })
3639            .next()
3640            .await;
3641
3642        // Move cursor to a location that contains code actions.
3643        editor_b.update(cx_b, |editor, cx| {
3644            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3645            cx.focus(&editor_b);
3646        });
3647
3648        fake_language_server
3649            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
3650                assert_eq!(
3651                    params.text_document.uri,
3652                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3653                );
3654                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3655                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3656
3657                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
3658                    lsp::CodeAction {
3659                        title: "Inline into all callers".to_string(),
3660                        edit: Some(lsp::WorkspaceEdit {
3661                            changes: Some(
3662                                [
3663                                    (
3664                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3665                                        vec![lsp::TextEdit::new(
3666                                            lsp::Range::new(
3667                                                lsp::Position::new(1, 22),
3668                                                lsp::Position::new(1, 34),
3669                                            ),
3670                                            "4".to_string(),
3671                                        )],
3672                                    ),
3673                                    (
3674                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3675                                        vec![lsp::TextEdit::new(
3676                                            lsp::Range::new(
3677                                                lsp::Position::new(0, 0),
3678                                                lsp::Position::new(0, 27),
3679                                            ),
3680                                            "".to_string(),
3681                                        )],
3682                                    ),
3683                                ]
3684                                .into_iter()
3685                                .collect(),
3686                            ),
3687                            ..Default::default()
3688                        }),
3689                        data: Some(json!({
3690                            "codeActionParams": {
3691                                "range": {
3692                                    "start": {"line": 1, "column": 31},
3693                                    "end": {"line": 1, "column": 31},
3694                                }
3695                            }
3696                        })),
3697                        ..Default::default()
3698                    },
3699                )]))
3700            })
3701            .next()
3702            .await;
3703
3704        // Toggle code actions and wait for them to display.
3705        editor_b.update(cx_b, |editor, cx| {
3706            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3707        });
3708        editor_b
3709            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3710            .await;
3711
3712        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3713
3714        // Confirming the code action will trigger a resolve request.
3715        let confirm_action = workspace_b
3716            .update(cx_b, |workspace, cx| {
3717                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3718            })
3719            .unwrap();
3720        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
3721            |_, _| async move {
3722                Ok(lsp::CodeAction {
3723                    title: "Inline into all callers".to_string(),
3724                    edit: Some(lsp::WorkspaceEdit {
3725                        changes: Some(
3726                            [
3727                                (
3728                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3729                                    vec![lsp::TextEdit::new(
3730                                        lsp::Range::new(
3731                                            lsp::Position::new(1, 22),
3732                                            lsp::Position::new(1, 34),
3733                                        ),
3734                                        "4".to_string(),
3735                                    )],
3736                                ),
3737                                (
3738                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
3739                                    vec![lsp::TextEdit::new(
3740                                        lsp::Range::new(
3741                                            lsp::Position::new(0, 0),
3742                                            lsp::Position::new(0, 27),
3743                                        ),
3744                                        "".to_string(),
3745                                    )],
3746                                ),
3747                            ]
3748                            .into_iter()
3749                            .collect(),
3750                        ),
3751                        ..Default::default()
3752                    }),
3753                    ..Default::default()
3754                })
3755            },
3756        );
3757
3758        // After the action is confirmed, an editor containing both modified files is opened.
3759        confirm_action.await.unwrap();
3760        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3761            workspace
3762                .active_item(cx)
3763                .unwrap()
3764                .downcast::<Editor>()
3765                .unwrap()
3766        });
3767        code_action_editor.update(cx_b, |editor, cx| {
3768            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3769            editor.undo(&Undo, cx);
3770            assert_eq!(
3771                editor.text(cx),
3772                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3773            );
3774            editor.redo(&Redo, cx);
3775            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3776        });
3777    }
3778
3779    #[gpui::test(iterations = 10)]
3780    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3781        cx_a.foreground().forbid_parking();
3782        let lang_registry = Arc::new(LanguageRegistry::test());
3783        let fs = FakeFs::new(cx_a.background());
3784        cx_b.update(|cx| editor::init(cx));
3785
3786        // Set up a fake language server.
3787        let mut language = Language::new(
3788            LanguageConfig {
3789                name: "Rust".into(),
3790                path_suffixes: vec!["rs".to_string()],
3791                ..Default::default()
3792            },
3793            Some(tree_sitter_rust::language()),
3794        );
3795        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3796        lang_registry.add(Arc::new(language));
3797
3798        // Connect to a server as 2 clients.
3799        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3800        let client_a = server.create_client(cx_a, "user_a").await;
3801        let client_b = server.create_client(cx_b, "user_b").await;
3802
3803        // Share a project as client A
3804        fs.insert_tree(
3805            "/dir",
3806            json!({
3807                ".zed.toml": r#"collaborators = ["user_b"]"#,
3808                "one.rs": "const ONE: usize = 1;",
3809                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3810            }),
3811        )
3812        .await;
3813        let project_a = cx_a.update(|cx| {
3814            Project::local(
3815                client_a.clone(),
3816                client_a.user_store.clone(),
3817                lang_registry.clone(),
3818                fs.clone(),
3819                cx,
3820            )
3821        });
3822        let (worktree_a, _) = project_a
3823            .update(cx_a, |p, cx| {
3824                p.find_or_create_local_worktree("/dir", true, cx)
3825            })
3826            .await
3827            .unwrap();
3828        worktree_a
3829            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3830            .await;
3831        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3832        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3833        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3834
3835        // Join the worktree as client B.
3836        let project_b = Project::remote(
3837            project_id,
3838            client_b.clone(),
3839            client_b.user_store.clone(),
3840            lang_registry.clone(),
3841            fs.clone(),
3842            &mut cx_b.to_async(),
3843        )
3844        .await
3845        .unwrap();
3846        let mut params = cx_b.update(WorkspaceParams::test);
3847        params.languages = lang_registry.clone();
3848        params.client = client_b.client.clone();
3849        params.user_store = client_b.user_store.clone();
3850        params.project = project_b;
3851
3852        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3853        let editor_b = workspace_b
3854            .update(cx_b, |workspace, cx| {
3855                workspace.open_path((worktree_id, "one.rs"), cx)
3856            })
3857            .await
3858            .unwrap()
3859            .downcast::<Editor>()
3860            .unwrap();
3861        let fake_language_server = fake_language_servers.next().await.unwrap();
3862
3863        // Move cursor to a location that can be renamed.
3864        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3865            editor.select_ranges([7..7], None, cx);
3866            editor.rename(&Rename, cx).unwrap()
3867        });
3868
3869        fake_language_server
3870            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
3871                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3872                assert_eq!(params.position, lsp::Position::new(0, 7));
3873                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3874                    lsp::Position::new(0, 6),
3875                    lsp::Position::new(0, 9),
3876                ))))
3877            })
3878            .next()
3879            .await
3880            .unwrap();
3881        prepare_rename.await.unwrap();
3882        editor_b.update(cx_b, |editor, cx| {
3883            let rename = editor.pending_rename().unwrap();
3884            let buffer = editor.buffer().read(cx).snapshot(cx);
3885            assert_eq!(
3886                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3887                6..9
3888            );
3889            rename.editor.update(cx, |rename_editor, cx| {
3890                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3891                    rename_buffer.edit([0..3], "THREE", cx);
3892                });
3893            });
3894        });
3895
3896        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3897            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3898        });
3899        fake_language_server
3900            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
3901                assert_eq!(
3902                    params.text_document_position.text_document.uri.as_str(),
3903                    "file:///dir/one.rs"
3904                );
3905                assert_eq!(
3906                    params.text_document_position.position,
3907                    lsp::Position::new(0, 6)
3908                );
3909                assert_eq!(params.new_name, "THREE");
3910                Ok(Some(lsp::WorkspaceEdit {
3911                    changes: Some(
3912                        [
3913                            (
3914                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3915                                vec![lsp::TextEdit::new(
3916                                    lsp::Range::new(
3917                                        lsp::Position::new(0, 6),
3918                                        lsp::Position::new(0, 9),
3919                                    ),
3920                                    "THREE".to_string(),
3921                                )],
3922                            ),
3923                            (
3924                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3925                                vec![
3926                                    lsp::TextEdit::new(
3927                                        lsp::Range::new(
3928                                            lsp::Position::new(0, 24),
3929                                            lsp::Position::new(0, 27),
3930                                        ),
3931                                        "THREE".to_string(),
3932                                    ),
3933                                    lsp::TextEdit::new(
3934                                        lsp::Range::new(
3935                                            lsp::Position::new(0, 35),
3936                                            lsp::Position::new(0, 38),
3937                                        ),
3938                                        "THREE".to_string(),
3939                                    ),
3940                                ],
3941                            ),
3942                        ]
3943                        .into_iter()
3944                        .collect(),
3945                    ),
3946                    ..Default::default()
3947                }))
3948            })
3949            .next()
3950            .await
3951            .unwrap();
3952        confirm_rename.await.unwrap();
3953
3954        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3955            workspace
3956                .active_item(cx)
3957                .unwrap()
3958                .downcast::<Editor>()
3959                .unwrap()
3960        });
3961        rename_editor.update(cx_b, |editor, cx| {
3962            assert_eq!(
3963                editor.text(cx),
3964                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3965            );
3966            editor.undo(&Undo, cx);
3967            assert_eq!(
3968                editor.text(cx),
3969                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3970            );
3971            editor.redo(&Redo, cx);
3972            assert_eq!(
3973                editor.text(cx),
3974                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3975            );
3976        });
3977
3978        // Ensure temporary rename edits cannot be undone/redone.
3979        editor_b.update(cx_b, |editor, cx| {
3980            editor.undo(&Undo, cx);
3981            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3982            editor.undo(&Undo, cx);
3983            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3984            editor.redo(&Redo, cx);
3985            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3986        })
3987    }
3988
3989    #[gpui::test(iterations = 10)]
3990    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3991        cx_a.foreground().forbid_parking();
3992
3993        // Connect to a server as 2 clients.
3994        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3995        let client_a = server.create_client(cx_a, "user_a").await;
3996        let client_b = server.create_client(cx_b, "user_b").await;
3997
3998        // Create an org that includes these 2 users.
3999        let db = &server.app_state.db;
4000        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4001        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4002            .await
4003            .unwrap();
4004        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4005            .await
4006            .unwrap();
4007
4008        // Create a channel that includes all the users.
4009        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4010        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4011            .await
4012            .unwrap();
4013        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4014            .await
4015            .unwrap();
4016        db.create_channel_message(
4017            channel_id,
4018            client_b.current_user_id(&cx_b),
4019            "hello A, it's B.",
4020            OffsetDateTime::now_utc(),
4021            1,
4022        )
4023        .await
4024        .unwrap();
4025
4026        let channels_a = cx_a
4027            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4028        channels_a
4029            .condition(cx_a, |list, _| list.available_channels().is_some())
4030            .await;
4031        channels_a.read_with(cx_a, |list, _| {
4032            assert_eq!(
4033                list.available_channels().unwrap(),
4034                &[ChannelDetails {
4035                    id: channel_id.to_proto(),
4036                    name: "test-channel".to_string()
4037                }]
4038            )
4039        });
4040        let channel_a = channels_a.update(cx_a, |this, cx| {
4041            this.get_channel(channel_id.to_proto(), cx).unwrap()
4042        });
4043        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4044        channel_a
4045            .condition(&cx_a, |channel, _| {
4046                channel_messages(channel)
4047                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4048            })
4049            .await;
4050
4051        let channels_b = cx_b
4052            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4053        channels_b
4054            .condition(cx_b, |list, _| list.available_channels().is_some())
4055            .await;
4056        channels_b.read_with(cx_b, |list, _| {
4057            assert_eq!(
4058                list.available_channels().unwrap(),
4059                &[ChannelDetails {
4060                    id: channel_id.to_proto(),
4061                    name: "test-channel".to_string()
4062                }]
4063            )
4064        });
4065
4066        let channel_b = channels_b.update(cx_b, |this, cx| {
4067            this.get_channel(channel_id.to_proto(), cx).unwrap()
4068        });
4069        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4070        channel_b
4071            .condition(&cx_b, |channel, _| {
4072                channel_messages(channel)
4073                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4074            })
4075            .await;
4076
4077        channel_a
4078            .update(cx_a, |channel, cx| {
4079                channel
4080                    .send_message("oh, hi B.".to_string(), cx)
4081                    .unwrap()
4082                    .detach();
4083                let task = channel.send_message("sup".to_string(), cx).unwrap();
4084                assert_eq!(
4085                    channel_messages(channel),
4086                    &[
4087                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4088                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4089                        ("user_a".to_string(), "sup".to_string(), true)
4090                    ]
4091                );
4092                task
4093            })
4094            .await
4095            .unwrap();
4096
4097        channel_b
4098            .condition(&cx_b, |channel, _| {
4099                channel_messages(channel)
4100                    == [
4101                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4102                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4103                        ("user_a".to_string(), "sup".to_string(), false),
4104                    ]
4105            })
4106            .await;
4107
4108        assert_eq!(
4109            server
4110                .state()
4111                .await
4112                .channel(channel_id)
4113                .unwrap()
4114                .connection_ids
4115                .len(),
4116            2
4117        );
4118        cx_b.update(|_| drop(channel_b));
4119        server
4120            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4121            .await;
4122
4123        cx_a.update(|_| drop(channel_a));
4124        server
4125            .condition(|state| state.channel(channel_id).is_none())
4126            .await;
4127    }
4128
4129    #[gpui::test(iterations = 10)]
4130    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4131        cx_a.foreground().forbid_parking();
4132
4133        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4134        let client_a = server.create_client(cx_a, "user_a").await;
4135
4136        let db = &server.app_state.db;
4137        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4138        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4139        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4140            .await
4141            .unwrap();
4142        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4143            .await
4144            .unwrap();
4145
4146        let channels_a = cx_a
4147            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4148        channels_a
4149            .condition(cx_a, |list, _| list.available_channels().is_some())
4150            .await;
4151        let channel_a = channels_a.update(cx_a, |this, cx| {
4152            this.get_channel(channel_id.to_proto(), cx).unwrap()
4153        });
4154
4155        // Messages aren't allowed to be too long.
4156        channel_a
4157            .update(cx_a, |channel, cx| {
4158                let long_body = "this is long.\n".repeat(1024);
4159                channel.send_message(long_body, cx).unwrap()
4160            })
4161            .await
4162            .unwrap_err();
4163
4164        // Messages aren't allowed to be blank.
4165        channel_a.update(cx_a, |channel, cx| {
4166            channel.send_message(String::new(), cx).unwrap_err()
4167        });
4168
4169        // Leading and trailing whitespace are trimmed.
4170        channel_a
4171            .update(cx_a, |channel, cx| {
4172                channel
4173                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4174                    .unwrap()
4175            })
4176            .await
4177            .unwrap();
4178        assert_eq!(
4179            db.get_channel_messages(channel_id, 10, None)
4180                .await
4181                .unwrap()
4182                .iter()
4183                .map(|m| &m.body)
4184                .collect::<Vec<_>>(),
4185            &["surrounded by whitespace"]
4186        );
4187    }
4188
4189    #[gpui::test(iterations = 10)]
4190    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4191        cx_a.foreground().forbid_parking();
4192
4193        // Connect to a server as 2 clients.
4194        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4195        let client_a = server.create_client(cx_a, "user_a").await;
4196        let client_b = server.create_client(cx_b, "user_b").await;
4197        let mut status_b = client_b.status();
4198
4199        // Create an org that includes these 2 users.
4200        let db = &server.app_state.db;
4201        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4202        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4203            .await
4204            .unwrap();
4205        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4206            .await
4207            .unwrap();
4208
4209        // Create a channel that includes all the users.
4210        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4211        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4212            .await
4213            .unwrap();
4214        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4215            .await
4216            .unwrap();
4217        db.create_channel_message(
4218            channel_id,
4219            client_b.current_user_id(&cx_b),
4220            "hello A, it's B.",
4221            OffsetDateTime::now_utc(),
4222            2,
4223        )
4224        .await
4225        .unwrap();
4226
4227        let channels_a = cx_a
4228            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4229        channels_a
4230            .condition(cx_a, |list, _| list.available_channels().is_some())
4231            .await;
4232
4233        channels_a.read_with(cx_a, |list, _| {
4234            assert_eq!(
4235                list.available_channels().unwrap(),
4236                &[ChannelDetails {
4237                    id: channel_id.to_proto(),
4238                    name: "test-channel".to_string()
4239                }]
4240            )
4241        });
4242        let channel_a = channels_a.update(cx_a, |this, cx| {
4243            this.get_channel(channel_id.to_proto(), cx).unwrap()
4244        });
4245        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4246        channel_a
4247            .condition(&cx_a, |channel, _| {
4248                channel_messages(channel)
4249                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4250            })
4251            .await;
4252
4253        let channels_b = cx_b
4254            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4255        channels_b
4256            .condition(cx_b, |list, _| list.available_channels().is_some())
4257            .await;
4258        channels_b.read_with(cx_b, |list, _| {
4259            assert_eq!(
4260                list.available_channels().unwrap(),
4261                &[ChannelDetails {
4262                    id: channel_id.to_proto(),
4263                    name: "test-channel".to_string()
4264                }]
4265            )
4266        });
4267
4268        let channel_b = channels_b.update(cx_b, |this, cx| {
4269            this.get_channel(channel_id.to_proto(), cx).unwrap()
4270        });
4271        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4272        channel_b
4273            .condition(&cx_b, |channel, _| {
4274                channel_messages(channel)
4275                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4276            })
4277            .await;
4278
4279        // Disconnect client B, ensuring we can still access its cached channel data.
4280        server.forbid_connections();
4281        server.disconnect_client(client_b.current_user_id(&cx_b));
4282        cx_b.foreground().advance_clock(Duration::from_secs(3));
4283        while !matches!(
4284            status_b.next().await,
4285            Some(client::Status::ReconnectionError { .. })
4286        ) {}
4287
4288        channels_b.read_with(cx_b, |channels, _| {
4289            assert_eq!(
4290                channels.available_channels().unwrap(),
4291                [ChannelDetails {
4292                    id: channel_id.to_proto(),
4293                    name: "test-channel".to_string()
4294                }]
4295            )
4296        });
4297        channel_b.read_with(cx_b, |channel, _| {
4298            assert_eq!(
4299                channel_messages(channel),
4300                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4301            )
4302        });
4303
4304        // Send a message from client B while it is disconnected.
4305        channel_b
4306            .update(cx_b, |channel, cx| {
4307                let task = channel
4308                    .send_message("can you see this?".to_string(), cx)
4309                    .unwrap();
4310                assert_eq!(
4311                    channel_messages(channel),
4312                    &[
4313                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4314                        ("user_b".to_string(), "can you see this?".to_string(), true)
4315                    ]
4316                );
4317                task
4318            })
4319            .await
4320            .unwrap_err();
4321
4322        // Send a message from client A while B is disconnected.
4323        channel_a
4324            .update(cx_a, |channel, cx| {
4325                channel
4326                    .send_message("oh, hi B.".to_string(), cx)
4327                    .unwrap()
4328                    .detach();
4329                let task = channel.send_message("sup".to_string(), cx).unwrap();
4330                assert_eq!(
4331                    channel_messages(channel),
4332                    &[
4333                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4334                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4335                        ("user_a".to_string(), "sup".to_string(), true)
4336                    ]
4337                );
4338                task
4339            })
4340            .await
4341            .unwrap();
4342
4343        // Give client B a chance to reconnect.
4344        server.allow_connections();
4345        cx_b.foreground().advance_clock(Duration::from_secs(10));
4346
4347        // Verify that B sees the new messages upon reconnection, as well as the message client B
4348        // sent while offline.
4349        channel_b
4350            .condition(&cx_b, |channel, _| {
4351                channel_messages(channel)
4352                    == [
4353                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4354                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4355                        ("user_a".to_string(), "sup".to_string(), false),
4356                        ("user_b".to_string(), "can you see this?".to_string(), false),
4357                    ]
4358            })
4359            .await;
4360
4361        // Ensure client A and B can communicate normally after reconnection.
4362        channel_a
4363            .update(cx_a, |channel, cx| {
4364                channel.send_message("you online?".to_string(), cx).unwrap()
4365            })
4366            .await
4367            .unwrap();
4368        channel_b
4369            .condition(&cx_b, |channel, _| {
4370                channel_messages(channel)
4371                    == [
4372                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4373                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4374                        ("user_a".to_string(), "sup".to_string(), false),
4375                        ("user_b".to_string(), "can you see this?".to_string(), false),
4376                        ("user_a".to_string(), "you online?".to_string(), false),
4377                    ]
4378            })
4379            .await;
4380
4381        channel_b
4382            .update(cx_b, |channel, cx| {
4383                channel.send_message("yep".to_string(), cx).unwrap()
4384            })
4385            .await
4386            .unwrap();
4387        channel_a
4388            .condition(&cx_a, |channel, _| {
4389                channel_messages(channel)
4390                    == [
4391                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4392                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4393                        ("user_a".to_string(), "sup".to_string(), false),
4394                        ("user_b".to_string(), "can you see this?".to_string(), false),
4395                        ("user_a".to_string(), "you online?".to_string(), false),
4396                        ("user_b".to_string(), "yep".to_string(), false),
4397                    ]
4398            })
4399            .await;
4400    }
4401
4402    #[gpui::test(iterations = 10)]
4403    async fn test_contacts(
4404        cx_a: &mut TestAppContext,
4405        cx_b: &mut TestAppContext,
4406        cx_c: &mut TestAppContext,
4407    ) {
4408        cx_a.foreground().forbid_parking();
4409        let lang_registry = Arc::new(LanguageRegistry::test());
4410        let fs = FakeFs::new(cx_a.background());
4411
4412        // Connect to a server as 3 clients.
4413        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4414        let client_a = server.create_client(cx_a, "user_a").await;
4415        let client_b = server.create_client(cx_b, "user_b").await;
4416        let client_c = server.create_client(cx_c, "user_c").await;
4417
4418        // Share a worktree as client A.
4419        fs.insert_tree(
4420            "/a",
4421            json!({
4422                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4423            }),
4424        )
4425        .await;
4426
4427        let project_a = cx_a.update(|cx| {
4428            Project::local(
4429                client_a.clone(),
4430                client_a.user_store.clone(),
4431                lang_registry.clone(),
4432                fs.clone(),
4433                cx,
4434            )
4435        });
4436        let (worktree_a, _) = project_a
4437            .update(cx_a, |p, cx| {
4438                p.find_or_create_local_worktree("/a", true, cx)
4439            })
4440            .await
4441            .unwrap();
4442        worktree_a
4443            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4444            .await;
4445
4446        client_a
4447            .user_store
4448            .condition(&cx_a, |user_store, _| {
4449                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4450            })
4451            .await;
4452        client_b
4453            .user_store
4454            .condition(&cx_b, |user_store, _| {
4455                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4456            })
4457            .await;
4458        client_c
4459            .user_store
4460            .condition(&cx_c, |user_store, _| {
4461                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4462            })
4463            .await;
4464
4465        let project_id = project_a
4466            .update(cx_a, |project, _| project.next_remote_id())
4467            .await;
4468        project_a
4469            .update(cx_a, |project, cx| project.share(cx))
4470            .await
4471            .unwrap();
4472
4473        let _project_b = Project::remote(
4474            project_id,
4475            client_b.clone(),
4476            client_b.user_store.clone(),
4477            lang_registry.clone(),
4478            fs.clone(),
4479            &mut cx_b.to_async(),
4480        )
4481        .await
4482        .unwrap();
4483
4484        client_a
4485            .user_store
4486            .condition(&cx_a, |user_store, _| {
4487                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4488            })
4489            .await;
4490        client_b
4491            .user_store
4492            .condition(&cx_b, |user_store, _| {
4493                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4494            })
4495            .await;
4496        client_c
4497            .user_store
4498            .condition(&cx_c, |user_store, _| {
4499                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4500            })
4501            .await;
4502
4503        project_a
4504            .condition(&cx_a, |project, _| {
4505                project.collaborators().contains_key(&client_b.peer_id)
4506            })
4507            .await;
4508
4509        cx_a.update(move |_| drop(project_a));
4510        client_a
4511            .user_store
4512            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4513            .await;
4514        client_b
4515            .user_store
4516            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4517            .await;
4518        client_c
4519            .user_store
4520            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4521            .await;
4522
4523        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4524            user_store
4525                .contacts()
4526                .iter()
4527                .map(|contact| {
4528                    let worktrees = contact
4529                        .projects
4530                        .iter()
4531                        .map(|p| {
4532                            (
4533                                p.worktree_root_names[0].as_str(),
4534                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4535                            )
4536                        })
4537                        .collect();
4538                    (contact.user.github_login.as_str(), worktrees)
4539                })
4540                .collect()
4541        }
4542    }
4543
4544    #[gpui::test(iterations = 10)]
4545    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4546        cx_a.foreground().forbid_parking();
4547        let fs = FakeFs::new(cx_a.background());
4548
4549        // 2 clients connect to a server.
4550        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4551        let mut client_a = server.create_client(cx_a, "user_a").await;
4552        let mut client_b = server.create_client(cx_b, "user_b").await;
4553        cx_a.update(editor::init);
4554        cx_b.update(editor::init);
4555
4556        // Client A shares a project.
4557        fs.insert_tree(
4558            "/a",
4559            json!({
4560                ".zed.toml": r#"collaborators = ["user_b"]"#,
4561                "1.txt": "one",
4562                "2.txt": "two",
4563                "3.txt": "three",
4564            }),
4565        )
4566        .await;
4567        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4568        project_a
4569            .update(cx_a, |project, cx| project.share(cx))
4570            .await
4571            .unwrap();
4572
4573        // Client B joins the project.
4574        let project_b = client_b
4575            .build_remote_project(
4576                project_a
4577                    .read_with(cx_a, |project, _| project.remote_id())
4578                    .unwrap(),
4579                cx_b,
4580            )
4581            .await;
4582
4583        // Client A opens some editors.
4584        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4585        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4586        let editor_a1 = workspace_a
4587            .update(cx_a, |workspace, cx| {
4588                workspace.open_path((worktree_id, "1.txt"), cx)
4589            })
4590            .await
4591            .unwrap()
4592            .downcast::<Editor>()
4593            .unwrap();
4594        let editor_a2 = workspace_a
4595            .update(cx_a, |workspace, cx| {
4596                workspace.open_path((worktree_id, "2.txt"), cx)
4597            })
4598            .await
4599            .unwrap()
4600            .downcast::<Editor>()
4601            .unwrap();
4602
4603        // Client B opens an editor.
4604        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4605        let editor_b1 = workspace_b
4606            .update(cx_b, |workspace, cx| {
4607                workspace.open_path((worktree_id, "1.txt"), cx)
4608            })
4609            .await
4610            .unwrap()
4611            .downcast::<Editor>()
4612            .unwrap();
4613
4614        let client_a_id = project_b.read_with(cx_b, |project, _| {
4615            project.collaborators().values().next().unwrap().peer_id
4616        });
4617        let client_b_id = project_a.read_with(cx_a, |project, _| {
4618            project.collaborators().values().next().unwrap().peer_id
4619        });
4620
4621        // When client B starts following client A, all visible view states are replicated to client B.
4622        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4623        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4624        workspace_b
4625            .update(cx_b, |workspace, cx| {
4626                workspace
4627                    .toggle_follow(&ToggleFollow(client_a_id), cx)
4628                    .unwrap()
4629            })
4630            .await
4631            .unwrap();
4632        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4633            workspace
4634                .active_item(cx)
4635                .unwrap()
4636                .downcast::<Editor>()
4637                .unwrap()
4638        });
4639        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4640        assert_eq!(
4641            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4642            Some((worktree_id, "2.txt").into())
4643        );
4644        assert_eq!(
4645            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4646            vec![2..3]
4647        );
4648        assert_eq!(
4649            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4650            vec![0..1]
4651        );
4652
4653        // When client A activates a different editor, client B does so as well.
4654        workspace_a.update(cx_a, |workspace, cx| {
4655            workspace.activate_item(&editor_a1, cx)
4656        });
4657        workspace_b
4658            .condition(cx_b, |workspace, cx| {
4659                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4660            })
4661            .await;
4662
4663        // When client A navigates back and forth, client B does so as well.
4664        workspace_a
4665            .update(cx_a, |workspace, cx| {
4666                workspace::Pane::go_back(workspace, None, cx)
4667            })
4668            .await;
4669        workspace_b
4670            .condition(cx_b, |workspace, cx| {
4671                workspace.active_item(cx).unwrap().id() == editor_b2.id()
4672            })
4673            .await;
4674
4675        workspace_a
4676            .update(cx_a, |workspace, cx| {
4677                workspace::Pane::go_forward(workspace, None, cx)
4678            })
4679            .await;
4680        workspace_b
4681            .condition(cx_b, |workspace, cx| {
4682                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4683            })
4684            .await;
4685
4686        // Changes to client A's editor are reflected on client B.
4687        editor_a1.update(cx_a, |editor, cx| {
4688            editor.select_ranges([1..1, 2..2], None, cx);
4689        });
4690        editor_b1
4691            .condition(cx_b, |editor, cx| {
4692                editor.selected_ranges(cx) == vec![1..1, 2..2]
4693            })
4694            .await;
4695
4696        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4697        editor_b1
4698            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4699            .await;
4700
4701        editor_a1.update(cx_a, |editor, cx| {
4702            editor.select_ranges([3..3], None, cx);
4703            editor.set_scroll_position(vec2f(0., 100.), cx);
4704        });
4705        editor_b1
4706            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4707            .await;
4708
4709        // After unfollowing, client B stops receiving updates from client A.
4710        workspace_b.update(cx_b, |workspace, cx| {
4711            workspace.unfollow(&workspace.active_pane().clone(), cx)
4712        });
4713        workspace_a.update(cx_a, |workspace, cx| {
4714            workspace.activate_item(&editor_a2, cx)
4715        });
4716        cx_a.foreground().run_until_parked();
4717        assert_eq!(
4718            workspace_b.read_with(cx_b, |workspace, cx| workspace
4719                .active_item(cx)
4720                .unwrap()
4721                .id()),
4722            editor_b1.id()
4723        );
4724
4725        // Client A starts following client B.
4726        workspace_a
4727            .update(cx_a, |workspace, cx| {
4728                workspace
4729                    .toggle_follow(&ToggleFollow(client_b_id), cx)
4730                    .unwrap()
4731            })
4732            .await
4733            .unwrap();
4734        assert_eq!(
4735            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4736            Some(client_b_id)
4737        );
4738        assert_eq!(
4739            workspace_a.read_with(cx_a, |workspace, cx| workspace
4740                .active_item(cx)
4741                .unwrap()
4742                .id()),
4743            editor_a1.id()
4744        );
4745
4746        // Following interrupts when client B disconnects.
4747        client_b.disconnect(&cx_b.to_async()).unwrap();
4748        cx_a.foreground().run_until_parked();
4749        assert_eq!(
4750            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4751            None
4752        );
4753    }
4754
4755    #[gpui::test(iterations = 10)]
4756    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4757        cx_a.foreground().forbid_parking();
4758        let fs = FakeFs::new(cx_a.background());
4759
4760        // 2 clients connect to a server.
4761        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4762        let mut client_a = server.create_client(cx_a, "user_a").await;
4763        let mut client_b = server.create_client(cx_b, "user_b").await;
4764        cx_a.update(editor::init);
4765        cx_b.update(editor::init);
4766
4767        // Client A shares a project.
4768        fs.insert_tree(
4769            "/a",
4770            json!({
4771                ".zed.toml": r#"collaborators = ["user_b"]"#,
4772                "1.txt": "one",
4773                "2.txt": "two",
4774                "3.txt": "three",
4775                "4.txt": "four",
4776            }),
4777        )
4778        .await;
4779        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4780        project_a
4781            .update(cx_a, |project, cx| project.share(cx))
4782            .await
4783            .unwrap();
4784
4785        // Client B joins the project.
4786        let project_b = client_b
4787            .build_remote_project(
4788                project_a
4789                    .read_with(cx_a, |project, _| project.remote_id())
4790                    .unwrap(),
4791                cx_b,
4792            )
4793            .await;
4794
4795        // Client A opens some editors.
4796        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4797        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4798        let _editor_a1 = workspace_a
4799            .update(cx_a, |workspace, cx| {
4800                workspace.open_path((worktree_id, "1.txt"), cx)
4801            })
4802            .await
4803            .unwrap()
4804            .downcast::<Editor>()
4805            .unwrap();
4806
4807        // Client B opens an editor.
4808        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4809        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4810        let _editor_b1 = workspace_b
4811            .update(cx_b, |workspace, cx| {
4812                workspace.open_path((worktree_id, "2.txt"), cx)
4813            })
4814            .await
4815            .unwrap()
4816            .downcast::<Editor>()
4817            .unwrap();
4818
4819        // Clients A and B follow each other in split panes
4820        workspace_a
4821            .update(cx_a, |workspace, cx| {
4822                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4823                assert_ne!(*workspace.active_pane(), pane_a1);
4824                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4825                workspace
4826                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4827                    .unwrap()
4828            })
4829            .await
4830            .unwrap();
4831        workspace_b
4832            .update(cx_b, |workspace, cx| {
4833                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4834                assert_ne!(*workspace.active_pane(), pane_b1);
4835                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4836                workspace
4837                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4838                    .unwrap()
4839            })
4840            .await
4841            .unwrap();
4842
4843        workspace_a
4844            .update(cx_a, |workspace, cx| {
4845                workspace.activate_next_pane(cx);
4846                assert_eq!(*workspace.active_pane(), pane_a1);
4847                workspace.open_path((worktree_id, "3.txt"), cx)
4848            })
4849            .await
4850            .unwrap();
4851        workspace_b
4852            .update(cx_b, |workspace, cx| {
4853                workspace.activate_next_pane(cx);
4854                assert_eq!(*workspace.active_pane(), pane_b1);
4855                workspace.open_path((worktree_id, "4.txt"), cx)
4856            })
4857            .await
4858            .unwrap();
4859        cx_a.foreground().run_until_parked();
4860
4861        // Ensure leader updates don't change the active pane of followers
4862        workspace_a.read_with(cx_a, |workspace, _| {
4863            assert_eq!(*workspace.active_pane(), pane_a1);
4864        });
4865        workspace_b.read_with(cx_b, |workspace, _| {
4866            assert_eq!(*workspace.active_pane(), pane_b1);
4867        });
4868
4869        // Ensure peers following each other doesn't cause an infinite loop.
4870        assert_eq!(
4871            workspace_a.read_with(cx_a, |workspace, cx| workspace
4872                .active_item(cx)
4873                .unwrap()
4874                .project_path(cx)),
4875            Some((worktree_id, "3.txt").into())
4876        );
4877        workspace_a.update(cx_a, |workspace, cx| {
4878            assert_eq!(
4879                workspace.active_item(cx).unwrap().project_path(cx),
4880                Some((worktree_id, "3.txt").into())
4881            );
4882            workspace.activate_next_pane(cx);
4883            assert_eq!(
4884                workspace.active_item(cx).unwrap().project_path(cx),
4885                Some((worktree_id, "4.txt").into())
4886            );
4887        });
4888        workspace_b.update(cx_b, |workspace, cx| {
4889            assert_eq!(
4890                workspace.active_item(cx).unwrap().project_path(cx),
4891                Some((worktree_id, "4.txt").into())
4892            );
4893            workspace.activate_next_pane(cx);
4894            assert_eq!(
4895                workspace.active_item(cx).unwrap().project_path(cx),
4896                Some((worktree_id, "3.txt").into())
4897            );
4898        });
4899    }
4900
4901    #[gpui::test(iterations = 10)]
4902    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4903        cx_a.foreground().forbid_parking();
4904        let fs = FakeFs::new(cx_a.background());
4905
4906        // 2 clients connect to a server.
4907        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4908        let mut client_a = server.create_client(cx_a, "user_a").await;
4909        let mut client_b = server.create_client(cx_b, "user_b").await;
4910        cx_a.update(editor::init);
4911        cx_b.update(editor::init);
4912
4913        // Client A shares a project.
4914        fs.insert_tree(
4915            "/a",
4916            json!({
4917                ".zed.toml": r#"collaborators = ["user_b"]"#,
4918                "1.txt": "one",
4919                "2.txt": "two",
4920                "3.txt": "three",
4921            }),
4922        )
4923        .await;
4924        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4925        project_a
4926            .update(cx_a, |project, cx| project.share(cx))
4927            .await
4928            .unwrap();
4929
4930        // Client B joins the project.
4931        let project_b = client_b
4932            .build_remote_project(
4933                project_a
4934                    .read_with(cx_a, |project, _| project.remote_id())
4935                    .unwrap(),
4936                cx_b,
4937            )
4938            .await;
4939
4940        // Client A opens some editors.
4941        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4942        let _editor_a1 = workspace_a
4943            .update(cx_a, |workspace, cx| {
4944                workspace.open_path((worktree_id, "1.txt"), cx)
4945            })
4946            .await
4947            .unwrap()
4948            .downcast::<Editor>()
4949            .unwrap();
4950
4951        // Client B starts following client A.
4952        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4953        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4954        let leader_id = project_b.read_with(cx_b, |project, _| {
4955            project.collaborators().values().next().unwrap().peer_id
4956        });
4957        workspace_b
4958            .update(cx_b, |workspace, cx| {
4959                workspace
4960                    .toggle_follow(&ToggleFollow(leader_id), cx)
4961                    .unwrap()
4962            })
4963            .await
4964            .unwrap();
4965        assert_eq!(
4966            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4967            Some(leader_id)
4968        );
4969        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4970            workspace
4971                .active_item(cx)
4972                .unwrap()
4973                .downcast::<Editor>()
4974                .unwrap()
4975        });
4976
4977        // When client B moves, it automatically stops following client A.
4978        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
4979        assert_eq!(
4980            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4981            None
4982        );
4983
4984        workspace_b
4985            .update(cx_b, |workspace, cx| {
4986                workspace
4987                    .toggle_follow(&ToggleFollow(leader_id), cx)
4988                    .unwrap()
4989            })
4990            .await
4991            .unwrap();
4992        assert_eq!(
4993            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4994            Some(leader_id)
4995        );
4996
4997        // When client B edits, it automatically stops following client A.
4998        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
4999        assert_eq!(
5000            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5001            None
5002        );
5003
5004        workspace_b
5005            .update(cx_b, |workspace, cx| {
5006                workspace
5007                    .toggle_follow(&ToggleFollow(leader_id), cx)
5008                    .unwrap()
5009            })
5010            .await
5011            .unwrap();
5012        assert_eq!(
5013            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5014            Some(leader_id)
5015        );
5016
5017        // When client B scrolls, it automatically stops following client A.
5018        editor_b2.update(cx_b, |editor, cx| {
5019            editor.set_scroll_position(vec2f(0., 3.), cx)
5020        });
5021        assert_eq!(
5022            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5023            None
5024        );
5025
5026        workspace_b
5027            .update(cx_b, |workspace, cx| {
5028                workspace
5029                    .toggle_follow(&ToggleFollow(leader_id), cx)
5030                    .unwrap()
5031            })
5032            .await
5033            .unwrap();
5034        assert_eq!(
5035            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5036            Some(leader_id)
5037        );
5038
5039        // When client B activates a different pane, it continues following client A in the original pane.
5040        workspace_b.update(cx_b, |workspace, cx| {
5041            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5042        });
5043        assert_eq!(
5044            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5045            Some(leader_id)
5046        );
5047
5048        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5049        assert_eq!(
5050            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5051            Some(leader_id)
5052        );
5053
5054        // When client B activates a different item in the original pane, it automatically stops following client A.
5055        workspace_b
5056            .update(cx_b, |workspace, cx| {
5057                workspace.open_path((worktree_id, "2.txt"), cx)
5058            })
5059            .await
5060            .unwrap();
5061        assert_eq!(
5062            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5063            None
5064        );
5065    }
5066
5067    #[gpui::test(iterations = 100)]
5068    async fn test_random_collaboration(
5069        cx: &mut TestAppContext,
5070        deterministic: Arc<Deterministic>,
5071        rng: StdRng,
5072    ) {
5073        cx.foreground().forbid_parking();
5074        let max_peers = env::var("MAX_PEERS")
5075            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5076            .unwrap_or(5);
5077        assert!(max_peers <= 5);
5078
5079        let max_operations = env::var("OPERATIONS")
5080            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5081            .unwrap_or(10);
5082
5083        let rng = Arc::new(Mutex::new(rng));
5084
5085        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5086        let host_language_registry = Arc::new(LanguageRegistry::test());
5087
5088        let fs = FakeFs::new(cx.background());
5089        fs.insert_tree(
5090            "/_collab",
5091            json!({
5092                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5093            }),
5094        )
5095        .await;
5096
5097        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5098        let mut clients = Vec::new();
5099        let mut user_ids = Vec::new();
5100        let mut op_start_signals = Vec::new();
5101        let files = Arc::new(Mutex::new(Vec::new()));
5102
5103        let mut next_entity_id = 100000;
5104        let mut host_cx = TestAppContext::new(
5105            cx.foreground_platform(),
5106            cx.platform(),
5107            deterministic.build_foreground(next_entity_id),
5108            deterministic.build_background(),
5109            cx.font_cache(),
5110            cx.leak_detector(),
5111            next_entity_id,
5112        );
5113        let host = server.create_client(&mut host_cx, "host").await;
5114        let host_project = host_cx.update(|cx| {
5115            Project::local(
5116                host.client.clone(),
5117                host.user_store.clone(),
5118                host_language_registry.clone(),
5119                fs.clone(),
5120                cx,
5121            )
5122        });
5123        let host_project_id = host_project
5124            .update(&mut host_cx, |p, _| p.next_remote_id())
5125            .await;
5126
5127        let (collab_worktree, _) = host_project
5128            .update(&mut host_cx, |project, cx| {
5129                project.find_or_create_local_worktree("/_collab", true, cx)
5130            })
5131            .await
5132            .unwrap();
5133        collab_worktree
5134            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5135            .await;
5136        host_project
5137            .update(&mut host_cx, |project, cx| project.share(cx))
5138            .await
5139            .unwrap();
5140
5141        // Set up fake language servers.
5142        let mut language = Language::new(
5143            LanguageConfig {
5144                name: "Rust".into(),
5145                path_suffixes: vec!["rs".to_string()],
5146                ..Default::default()
5147            },
5148            None,
5149        );
5150        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5151            name: "the-fake-language-server",
5152            capabilities: lsp::LanguageServer::full_capabilities(),
5153            initializer: Some(Box::new({
5154                let rng = rng.clone();
5155                let files = files.clone();
5156                let project = host_project.downgrade();
5157                move |fake_server: &mut FakeLanguageServer| {
5158                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5159                        |_, _| async move {
5160                            Ok(Some(lsp::CompletionResponse::Array(vec![
5161                                lsp::CompletionItem {
5162                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5163                                        range: lsp::Range::new(
5164                                            lsp::Position::new(0, 0),
5165                                            lsp::Position::new(0, 0),
5166                                        ),
5167                                        new_text: "the-new-text".to_string(),
5168                                    })),
5169                                    ..Default::default()
5170                                },
5171                            ])))
5172                        },
5173                    );
5174
5175                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5176                        |_, _| async move {
5177                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5178                                lsp::CodeAction {
5179                                    title: "the-code-action".to_string(),
5180                                    ..Default::default()
5181                                },
5182                            )]))
5183                        },
5184                    );
5185
5186                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5187                        |params, _| async move {
5188                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5189                                params.position,
5190                                params.position,
5191                            ))))
5192                        },
5193                    );
5194
5195                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5196                        let files = files.clone();
5197                        let rng = rng.clone();
5198                        move |_, _| {
5199                            let files = files.clone();
5200                            let rng = rng.clone();
5201                            async move {
5202                                let files = files.lock();
5203                                let mut rng = rng.lock();
5204                                let count = rng.gen_range::<usize, _>(1..3);
5205                                let files = (0..count)
5206                                    .map(|_| files.choose(&mut *rng).unwrap())
5207                                    .collect::<Vec<_>>();
5208                                log::info!("LSP: Returning definitions in files {:?}", &files);
5209                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5210                                    files
5211                                        .into_iter()
5212                                        .map(|file| lsp::Location {
5213                                            uri: lsp::Url::from_file_path(file).unwrap(),
5214                                            range: Default::default(),
5215                                        })
5216                                        .collect(),
5217                                )))
5218                            }
5219                        }
5220                    });
5221
5222                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5223                        let rng = rng.clone();
5224                        let project = project.clone();
5225                        move |params, mut cx| {
5226                            let highlights = if let Some(project) = project.upgrade(&cx) {
5227                                project.update(&mut cx, |project, cx| {
5228                                    let path = params
5229                                        .text_document_position_params
5230                                        .text_document
5231                                        .uri
5232                                        .to_file_path()
5233                                        .unwrap();
5234                                    let (worktree, relative_path) =
5235                                        project.find_local_worktree(&path, cx)?;
5236                                    let project_path =
5237                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5238                                    let buffer =
5239                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5240
5241                                    let mut highlights = Vec::new();
5242                                    let highlight_count = rng.lock().gen_range(1..=5);
5243                                    let mut prev_end = 0;
5244                                    for _ in 0..highlight_count {
5245                                        let range =
5246                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5247
5248                                        highlights.push(lsp::DocumentHighlight {
5249                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5250                                            kind: Some(lsp::DocumentHighlightKind::READ),
5251                                        });
5252                                        prev_end = range.end;
5253                                    }
5254                                    Some(highlights)
5255                                })
5256                            } else {
5257                                None
5258                            };
5259                            async move { Ok(highlights) }
5260                        }
5261                    });
5262                }
5263            })),
5264            ..Default::default()
5265        });
5266        host_language_registry.add(Arc::new(language));
5267
5268        let op_start_signal = futures::channel::mpsc::unbounded();
5269        user_ids.push(host.current_user_id(&host_cx));
5270        op_start_signals.push(op_start_signal.0);
5271        clients.push(host_cx.foreground().spawn(host.simulate_host(
5272            host_project,
5273            files,
5274            op_start_signal.1,
5275            rng.clone(),
5276            host_cx,
5277        )));
5278
5279        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5280            rng.lock().gen_range(0..max_operations)
5281        } else {
5282            max_operations
5283        };
5284        let mut available_guests = vec![
5285            "guest-1".to_string(),
5286            "guest-2".to_string(),
5287            "guest-3".to_string(),
5288            "guest-4".to_string(),
5289        ];
5290        let mut operations = 0;
5291        while operations < max_operations {
5292            if operations == disconnect_host_at {
5293                server.disconnect_client(user_ids[0]);
5294                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5295                drop(op_start_signals);
5296                let mut clients = futures::future::join_all(clients).await;
5297                cx.foreground().run_until_parked();
5298
5299                let (host, mut host_cx, host_err) = clients.remove(0);
5300                if let Some(host_err) = host_err {
5301                    log::error!("host error - {}", host_err);
5302                }
5303                host.project
5304                    .as_ref()
5305                    .unwrap()
5306                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5307                for (guest, mut guest_cx, guest_err) in clients {
5308                    if let Some(guest_err) = guest_err {
5309                        log::error!("{} error - {}", guest.username, guest_err);
5310                    }
5311                    let contacts = server
5312                        .store
5313                        .read()
5314                        .await
5315                        .contacts_for_user(guest.current_user_id(&guest_cx));
5316                    assert!(!contacts
5317                        .iter()
5318                        .flat_map(|contact| &contact.projects)
5319                        .any(|project| project.id == host_project_id));
5320                    guest
5321                        .project
5322                        .as_ref()
5323                        .unwrap()
5324                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5325                    guest_cx.update(|_| drop(guest));
5326                }
5327                host_cx.update(|_| drop(host));
5328
5329                return;
5330            }
5331
5332            let distribution = rng.lock().gen_range(0..100);
5333            match distribution {
5334                0..=19 if !available_guests.is_empty() => {
5335                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5336                    let guest_username = available_guests.remove(guest_ix);
5337                    log::info!("Adding new connection for {}", guest_username);
5338                    next_entity_id += 100000;
5339                    let mut guest_cx = TestAppContext::new(
5340                        cx.foreground_platform(),
5341                        cx.platform(),
5342                        deterministic.build_foreground(next_entity_id),
5343                        deterministic.build_background(),
5344                        cx.font_cache(),
5345                        cx.leak_detector(),
5346                        next_entity_id,
5347                    );
5348                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5349                    let guest_project = Project::remote(
5350                        host_project_id,
5351                        guest.client.clone(),
5352                        guest.user_store.clone(),
5353                        guest_lang_registry.clone(),
5354                        FakeFs::new(cx.background()),
5355                        &mut guest_cx.to_async(),
5356                    )
5357                    .await
5358                    .unwrap();
5359                    let op_start_signal = futures::channel::mpsc::unbounded();
5360                    user_ids.push(guest.current_user_id(&guest_cx));
5361                    op_start_signals.push(op_start_signal.0);
5362                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5363                        guest_username.clone(),
5364                        guest_project,
5365                        op_start_signal.1,
5366                        rng.clone(),
5367                        guest_cx,
5368                    )));
5369
5370                    log::info!("Added connection for {}", guest_username);
5371                    operations += 1;
5372                }
5373                20..=30 if clients.len() > 1 => {
5374                    log::info!("Removing guest");
5375                    let guest_ix = rng.lock().gen_range(1..clients.len());
5376                    let removed_guest_id = user_ids.remove(guest_ix);
5377                    let guest = clients.remove(guest_ix);
5378                    op_start_signals.remove(guest_ix);
5379                    server.disconnect_client(removed_guest_id);
5380                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5381                    let (guest, mut guest_cx, guest_err) = guest.await;
5382                    if let Some(guest_err) = guest_err {
5383                        log::error!("{} error - {}", guest.username, guest_err);
5384                    }
5385                    guest
5386                        .project
5387                        .as_ref()
5388                        .unwrap()
5389                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5390                    for user_id in &user_ids {
5391                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5392                            assert_ne!(
5393                                contact.user_id, removed_guest_id.0 as u64,
5394                                "removed guest is still a contact of another peer"
5395                            );
5396                            for project in contact.projects {
5397                                for project_guest_id in project.guests {
5398                                    assert_ne!(
5399                                        project_guest_id, removed_guest_id.0 as u64,
5400                                        "removed guest appears as still participating on a project"
5401                                    );
5402                                }
5403                            }
5404                        }
5405                    }
5406
5407                    log::info!("{} removed", guest.username);
5408                    available_guests.push(guest.username.clone());
5409                    guest_cx.update(|_| drop(guest));
5410
5411                    operations += 1;
5412                }
5413                _ => {
5414                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5415                        op_start_signals
5416                            .choose(&mut *rng.lock())
5417                            .unwrap()
5418                            .unbounded_send(())
5419                            .unwrap();
5420                        operations += 1;
5421                    }
5422
5423                    if rng.lock().gen_bool(0.8) {
5424                        cx.foreground().run_until_parked();
5425                    }
5426                }
5427            }
5428        }
5429
5430        drop(op_start_signals);
5431        let mut clients = futures::future::join_all(clients).await;
5432        cx.foreground().run_until_parked();
5433
5434        let (host_client, mut host_cx, host_err) = clients.remove(0);
5435        if let Some(host_err) = host_err {
5436            panic!("host error - {}", host_err);
5437        }
5438        let host_project = host_client.project.as_ref().unwrap();
5439        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5440            project
5441                .worktrees(cx)
5442                .map(|worktree| {
5443                    let snapshot = worktree.read(cx).snapshot();
5444                    (snapshot.id(), snapshot)
5445                })
5446                .collect::<BTreeMap<_, _>>()
5447        });
5448
5449        host_client
5450            .project
5451            .as_ref()
5452            .unwrap()
5453            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5454
5455        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5456            if let Some(guest_err) = guest_err {
5457                panic!("{} error - {}", guest_client.username, guest_err);
5458            }
5459            let worktree_snapshots =
5460                guest_client
5461                    .project
5462                    .as_ref()
5463                    .unwrap()
5464                    .read_with(&guest_cx, |project, cx| {
5465                        project
5466                            .worktrees(cx)
5467                            .map(|worktree| {
5468                                let worktree = worktree.read(cx);
5469                                (worktree.id(), worktree.snapshot())
5470                            })
5471                            .collect::<BTreeMap<_, _>>()
5472                    });
5473
5474            assert_eq!(
5475                worktree_snapshots.keys().collect::<Vec<_>>(),
5476                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5477                "{} has different worktrees than the host",
5478                guest_client.username
5479            );
5480            for (id, host_snapshot) in &host_worktree_snapshots {
5481                let guest_snapshot = &worktree_snapshots[id];
5482                assert_eq!(
5483                    guest_snapshot.root_name(),
5484                    host_snapshot.root_name(),
5485                    "{} has different root name than the host for worktree {}",
5486                    guest_client.username,
5487                    id
5488                );
5489                assert_eq!(
5490                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5491                    host_snapshot.entries(false).collect::<Vec<_>>(),
5492                    "{} has different snapshot than the host for worktree {}",
5493                    guest_client.username,
5494                    id
5495                );
5496            }
5497
5498            guest_client
5499                .project
5500                .as_ref()
5501                .unwrap()
5502                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5503
5504            for guest_buffer in &guest_client.buffers {
5505                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5506                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5507                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5508                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5509                        guest_client.username, guest_client.peer_id, buffer_id
5510                    ))
5511                });
5512                let path = host_buffer
5513                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5514
5515                assert_eq!(
5516                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5517                    0,
5518                    "{}, buffer {}, path {:?} has deferred operations",
5519                    guest_client.username,
5520                    buffer_id,
5521                    path,
5522                );
5523                assert_eq!(
5524                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5525                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5526                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5527                    guest_client.username,
5528                    buffer_id,
5529                    path
5530                );
5531            }
5532
5533            guest_cx.update(|_| drop(guest_client));
5534        }
5535
5536        host_cx.update(|_| drop(host_client));
5537    }
5538
5539    struct TestServer {
5540        peer: Arc<Peer>,
5541        app_state: Arc<AppState>,
5542        server: Arc<Server>,
5543        foreground: Rc<executor::Foreground>,
5544        notifications: mpsc::UnboundedReceiver<()>,
5545        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5546        forbid_connections: Arc<AtomicBool>,
5547        _test_db: TestDb,
5548    }
5549
5550    impl TestServer {
5551        async fn start(
5552            foreground: Rc<executor::Foreground>,
5553            background: Arc<executor::Background>,
5554        ) -> Self {
5555            let test_db = TestDb::fake(background);
5556            let app_state = Self::build_app_state(&test_db).await;
5557            let peer = Peer::new();
5558            let notifications = mpsc::unbounded();
5559            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
5560            Self {
5561                peer,
5562                app_state,
5563                server,
5564                foreground,
5565                notifications: notifications.1,
5566                connection_killers: Default::default(),
5567                forbid_connections: Default::default(),
5568                _test_db: test_db,
5569            }
5570        }
5571
5572        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
5573            cx.update(|cx| {
5574                let settings = Settings::test(cx);
5575                cx.set_global(settings);
5576            });
5577
5578            let http = FakeHttpClient::with_404_response();
5579            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
5580            let client_name = name.to_string();
5581            let mut client = Client::new(http.clone());
5582            let server = self.server.clone();
5583            let connection_killers = self.connection_killers.clone();
5584            let forbid_connections = self.forbid_connections.clone();
5585            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
5586
5587            Arc::get_mut(&mut client)
5588                .unwrap()
5589                .override_authenticate(move |cx| {
5590                    cx.spawn(|_| async move {
5591                        let access_token = "the-token".to_string();
5592                        Ok(Credentials {
5593                            user_id: user_id.0 as u64,
5594                            access_token,
5595                        })
5596                    })
5597                })
5598                .override_establish_connection(move |credentials, cx| {
5599                    assert_eq!(credentials.user_id, user_id.0 as u64);
5600                    assert_eq!(credentials.access_token, "the-token");
5601
5602                    let server = server.clone();
5603                    let connection_killers = connection_killers.clone();
5604                    let forbid_connections = forbid_connections.clone();
5605                    let client_name = client_name.clone();
5606                    let connection_id_tx = connection_id_tx.clone();
5607                    cx.spawn(move |cx| async move {
5608                        if forbid_connections.load(SeqCst) {
5609                            Err(EstablishConnectionError::other(anyhow!(
5610                                "server is forbidding connections"
5611                            )))
5612                        } else {
5613                            let (client_conn, server_conn, killed) =
5614                                Connection::in_memory(cx.background());
5615                            connection_killers.lock().insert(user_id, killed);
5616                            cx.background()
5617                                .spawn(server.handle_connection(
5618                                    server_conn,
5619                                    client_name,
5620                                    user_id,
5621                                    Some(connection_id_tx),
5622                                    cx.background(),
5623                                ))
5624                                .detach();
5625                            Ok(client_conn)
5626                        }
5627                    })
5628                });
5629
5630            client
5631                .authenticate_and_connect(false, &cx.to_async())
5632                .await
5633                .unwrap();
5634
5635            Channel::init(&client);
5636            Project::init(&client);
5637            cx.update(|cx| {
5638                workspace::init(&client, cx);
5639            });
5640
5641            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5642            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5643
5644            let client = TestClient {
5645                client,
5646                peer_id,
5647                username: name.to_string(),
5648                user_store,
5649                language_registry: Arc::new(LanguageRegistry::test()),
5650                project: Default::default(),
5651                buffers: Default::default(),
5652            };
5653            client.wait_for_current_user(cx).await;
5654            client
5655        }
5656
5657        fn disconnect_client(&self, user_id: UserId) {
5658            self.connection_killers
5659                .lock()
5660                .remove(&user_id)
5661                .unwrap()
5662                .store(true, SeqCst);
5663        }
5664
5665        fn forbid_connections(&self) {
5666            self.forbid_connections.store(true, SeqCst);
5667        }
5668
5669        fn allow_connections(&self) {
5670            self.forbid_connections.store(false, SeqCst);
5671        }
5672
5673        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5674            let mut config = Config::default();
5675            config.session_secret = "a".repeat(32);
5676            config.database_url = test_db.url.clone();
5677            let github_client = github::AppClient::test();
5678            Arc::new(AppState {
5679                db: test_db.db().clone(),
5680                handlebars: Default::default(),
5681                auth_client: auth::build_client("", ""),
5682                repo_client: github::RepoClient::test(&github_client),
5683                github_client,
5684                config,
5685            })
5686        }
5687
5688        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5689            self.server.store.read().await
5690        }
5691
5692        async fn condition<F>(&mut self, mut predicate: F)
5693        where
5694            F: FnMut(&Store) -> bool,
5695        {
5696            async_std::future::timeout(Duration::from_millis(500), async {
5697                while !(predicate)(&*self.server.store.read().await) {
5698                    self.foreground.start_waiting();
5699                    self.notifications.next().await;
5700                    self.foreground.finish_waiting();
5701                }
5702            })
5703            .await
5704            .expect("condition timed out");
5705        }
5706    }
5707
5708    impl Deref for TestServer {
5709        type Target = Server;
5710
5711        fn deref(&self) -> &Self::Target {
5712            &self.server
5713        }
5714    }
5715
5716    impl Drop for TestServer {
5717        fn drop(&mut self) {
5718            self.peer.reset();
5719        }
5720    }
5721
5722    struct TestClient {
5723        client: Arc<Client>,
5724        username: String,
5725        pub peer_id: PeerId,
5726        pub user_store: ModelHandle<UserStore>,
5727        language_registry: Arc<LanguageRegistry>,
5728        project: Option<ModelHandle<Project>>,
5729        buffers: HashSet<ModelHandle<language::Buffer>>,
5730    }
5731
5732    impl Deref for TestClient {
5733        type Target = Arc<Client>;
5734
5735        fn deref(&self) -> &Self::Target {
5736            &self.client
5737        }
5738    }
5739
5740    impl TestClient {
5741        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5742            UserId::from_proto(
5743                self.user_store
5744                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5745            )
5746        }
5747
5748        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5749            let mut authed_user = self
5750                .user_store
5751                .read_with(cx, |user_store, _| user_store.watch_current_user());
5752            while authed_user.next().await.unwrap().is_none() {}
5753        }
5754
5755        async fn build_local_project(
5756            &mut self,
5757            fs: Arc<FakeFs>,
5758            root_path: impl AsRef<Path>,
5759            cx: &mut TestAppContext,
5760        ) -> (ModelHandle<Project>, WorktreeId) {
5761            let project = cx.update(|cx| {
5762                Project::local(
5763                    self.client.clone(),
5764                    self.user_store.clone(),
5765                    self.language_registry.clone(),
5766                    fs,
5767                    cx,
5768                )
5769            });
5770            self.project = Some(project.clone());
5771            let (worktree, _) = project
5772                .update(cx, |p, cx| {
5773                    p.find_or_create_local_worktree(root_path, true, cx)
5774                })
5775                .await
5776                .unwrap();
5777            worktree
5778                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5779                .await;
5780            project
5781                .update(cx, |project, _| project.next_remote_id())
5782                .await;
5783            (project, worktree.read_with(cx, |tree, _| tree.id()))
5784        }
5785
5786        async fn build_remote_project(
5787            &mut self,
5788            project_id: u64,
5789            cx: &mut TestAppContext,
5790        ) -> ModelHandle<Project> {
5791            let project = Project::remote(
5792                project_id,
5793                self.client.clone(),
5794                self.user_store.clone(),
5795                self.language_registry.clone(),
5796                FakeFs::new(cx.background()),
5797                &mut cx.to_async(),
5798            )
5799            .await
5800            .unwrap();
5801            self.project = Some(project.clone());
5802            project
5803        }
5804
5805        fn build_workspace(
5806            &self,
5807            project: &ModelHandle<Project>,
5808            cx: &mut TestAppContext,
5809        ) -> ViewHandle<Workspace> {
5810            let (window_id, _) = cx.add_window(|_| EmptyView);
5811            cx.add_view(window_id, |cx| {
5812                let fs = project.read(cx).fs().clone();
5813                Workspace::new(
5814                    &WorkspaceParams {
5815                        fs,
5816                        project: project.clone(),
5817                        user_store: self.user_store.clone(),
5818                        languages: self.language_registry.clone(),
5819                        channel_list: cx.add_model(|cx| {
5820                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5821                        }),
5822                        client: self.client.clone(),
5823                    },
5824                    cx,
5825                )
5826            })
5827        }
5828
5829        async fn simulate_host(
5830            mut self,
5831            project: ModelHandle<Project>,
5832            files: Arc<Mutex<Vec<PathBuf>>>,
5833            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5834            rng: Arc<Mutex<StdRng>>,
5835            mut cx: TestAppContext,
5836        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5837            async fn simulate_host_internal(
5838                client: &mut TestClient,
5839                project: ModelHandle<Project>,
5840                files: Arc<Mutex<Vec<PathBuf>>>,
5841                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5842                rng: Arc<Mutex<StdRng>>,
5843                cx: &mut TestAppContext,
5844            ) -> anyhow::Result<()> {
5845                let fs = project.read_with(cx, |project, _| project.fs().clone());
5846
5847                while op_start_signal.next().await.is_some() {
5848                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5849                    match distribution {
5850                        0..=20 if !files.lock().is_empty() => {
5851                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5852                            let mut path = path.as_path();
5853                            while let Some(parent_path) = path.parent() {
5854                                path = parent_path;
5855                                if rng.lock().gen() {
5856                                    break;
5857                                }
5858                            }
5859
5860                            log::info!("Host: find/create local worktree {:?}", path);
5861                            let find_or_create_worktree = project.update(cx, |project, cx| {
5862                                project.find_or_create_local_worktree(path, true, cx)
5863                            });
5864                            if rng.lock().gen() {
5865                                cx.background().spawn(find_or_create_worktree).detach();
5866                            } else {
5867                                find_or_create_worktree.await?;
5868                            }
5869                        }
5870                        10..=80 if !files.lock().is_empty() => {
5871                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
5872                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5873                                let (worktree, path) = project
5874                                    .update(cx, |project, cx| {
5875                                        project.find_or_create_local_worktree(
5876                                            file.clone(),
5877                                            true,
5878                                            cx,
5879                                        )
5880                                    })
5881                                    .await?;
5882                                let project_path =
5883                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
5884                                log::info!(
5885                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
5886                                    file,
5887                                    project_path.0,
5888                                    project_path.1
5889                                );
5890                                let buffer = project
5891                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
5892                                    .await
5893                                    .unwrap();
5894                                client.buffers.insert(buffer.clone());
5895                                buffer
5896                            } else {
5897                                client
5898                                    .buffers
5899                                    .iter()
5900                                    .choose(&mut *rng.lock())
5901                                    .unwrap()
5902                                    .clone()
5903                            };
5904
5905                            if rng.lock().gen_bool(0.1) {
5906                                cx.update(|cx| {
5907                                    log::info!(
5908                                        "Host: dropping buffer {:?}",
5909                                        buffer.read(cx).file().unwrap().full_path(cx)
5910                                    );
5911                                    client.buffers.remove(&buffer);
5912                                    drop(buffer);
5913                                });
5914                            } else {
5915                                buffer.update(cx, |buffer, cx| {
5916                                    log::info!(
5917                                        "Host: updating buffer {:?} ({})",
5918                                        buffer.file().unwrap().full_path(cx),
5919                                        buffer.remote_id()
5920                                    );
5921
5922                                    if rng.lock().gen_bool(0.7) {
5923                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
5924                                    } else {
5925                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
5926                                    }
5927                                });
5928                            }
5929                        }
5930                        _ => loop {
5931                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
5932                            let mut path = PathBuf::new();
5933                            path.push("/");
5934                            for _ in 0..path_component_count {
5935                                let letter = rng.lock().gen_range(b'a'..=b'z');
5936                                path.push(std::str::from_utf8(&[letter]).unwrap());
5937                            }
5938                            path.set_extension("rs");
5939                            let parent_path = path.parent().unwrap();
5940
5941                            log::info!("Host: creating file {:?}", path,);
5942
5943                            if fs.create_dir(&parent_path).await.is_ok()
5944                                && fs.create_file(&path, Default::default()).await.is_ok()
5945                            {
5946                                files.lock().push(path);
5947                                break;
5948                            } else {
5949                                log::info!("Host: cannot create file");
5950                            }
5951                        },
5952                    }
5953
5954                    cx.background().simulate_random_delay().await;
5955                }
5956
5957                Ok(())
5958            }
5959
5960            let result = simulate_host_internal(
5961                &mut self,
5962                project.clone(),
5963                files,
5964                op_start_signal,
5965                rng,
5966                &mut cx,
5967            )
5968            .await;
5969            log::info!("Host done");
5970            self.project = Some(project);
5971            (self, cx, result.err())
5972        }
5973
5974        pub async fn simulate_guest(
5975            mut self,
5976            guest_username: String,
5977            project: ModelHandle<Project>,
5978            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5979            rng: Arc<Mutex<StdRng>>,
5980            mut cx: TestAppContext,
5981        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
5982            async fn simulate_guest_internal(
5983                client: &mut TestClient,
5984                guest_username: &str,
5985                project: ModelHandle<Project>,
5986                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
5987                rng: Arc<Mutex<StdRng>>,
5988                cx: &mut TestAppContext,
5989            ) -> anyhow::Result<()> {
5990                while op_start_signal.next().await.is_some() {
5991                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
5992                        let worktree = if let Some(worktree) =
5993                            project.read_with(cx, |project, cx| {
5994                                project
5995                                    .worktrees(&cx)
5996                                    .filter(|worktree| {
5997                                        let worktree = worktree.read(cx);
5998                                        worktree.is_visible()
5999                                            && worktree.entries(false).any(|e| e.is_file())
6000                                    })
6001                                    .choose(&mut *rng.lock())
6002                            }) {
6003                            worktree
6004                        } else {
6005                            cx.background().simulate_random_delay().await;
6006                            continue;
6007                        };
6008
6009                        let (worktree_root_name, project_path) =
6010                            worktree.read_with(cx, |worktree, _| {
6011                                let entry = worktree
6012                                    .entries(false)
6013                                    .filter(|e| e.is_file())
6014                                    .choose(&mut *rng.lock())
6015                                    .unwrap();
6016                                (
6017                                    worktree.root_name().to_string(),
6018                                    (worktree.id(), entry.path.clone()),
6019                                )
6020                            });
6021                        log::info!(
6022                            "{}: opening path {:?} in worktree {} ({})",
6023                            guest_username,
6024                            project_path.1,
6025                            project_path.0,
6026                            worktree_root_name,
6027                        );
6028                        let buffer = project
6029                            .update(cx, |project, cx| {
6030                                project.open_buffer(project_path.clone(), cx)
6031                            })
6032                            .await?;
6033                        log::info!(
6034                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6035                            guest_username,
6036                            project_path.1,
6037                            project_path.0,
6038                            worktree_root_name,
6039                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6040                        );
6041                        client.buffers.insert(buffer.clone());
6042                        buffer
6043                    } else {
6044                        client
6045                            .buffers
6046                            .iter()
6047                            .choose(&mut *rng.lock())
6048                            .unwrap()
6049                            .clone()
6050                    };
6051
6052                    let choice = rng.lock().gen_range(0..100);
6053                    match choice {
6054                        0..=9 => {
6055                            cx.update(|cx| {
6056                                log::info!(
6057                                    "{}: dropping buffer {:?}",
6058                                    guest_username,
6059                                    buffer.read(cx).file().unwrap().full_path(cx)
6060                                );
6061                                client.buffers.remove(&buffer);
6062                                drop(buffer);
6063                            });
6064                        }
6065                        10..=19 => {
6066                            let completions = project.update(cx, |project, cx| {
6067                                log::info!(
6068                                    "{}: requesting completions for buffer {} ({:?})",
6069                                    guest_username,
6070                                    buffer.read(cx).remote_id(),
6071                                    buffer.read(cx).file().unwrap().full_path(cx)
6072                                );
6073                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6074                                project.completions(&buffer, offset, cx)
6075                            });
6076                            let completions = cx.background().spawn(async move {
6077                                completions
6078                                    .await
6079                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6080                            });
6081                            if rng.lock().gen_bool(0.3) {
6082                                log::info!("{}: detaching completions request", guest_username);
6083                                cx.update(|cx| completions.detach_and_log_err(cx));
6084                            } else {
6085                                completions.await?;
6086                            }
6087                        }
6088                        20..=29 => {
6089                            let code_actions = project.update(cx, |project, cx| {
6090                                log::info!(
6091                                    "{}: requesting code actions for buffer {} ({:?})",
6092                                    guest_username,
6093                                    buffer.read(cx).remote_id(),
6094                                    buffer.read(cx).file().unwrap().full_path(cx)
6095                                );
6096                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6097                                project.code_actions(&buffer, range, cx)
6098                            });
6099                            let code_actions = cx.background().spawn(async move {
6100                                code_actions.await.map_err(|err| {
6101                                    anyhow!("code actions request failed: {:?}", err)
6102                                })
6103                            });
6104                            if rng.lock().gen_bool(0.3) {
6105                                log::info!("{}: detaching code actions request", guest_username);
6106                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6107                            } else {
6108                                code_actions.await?;
6109                            }
6110                        }
6111                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6112                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6113                                log::info!(
6114                                    "{}: saving buffer {} ({:?})",
6115                                    guest_username,
6116                                    buffer.remote_id(),
6117                                    buffer.file().unwrap().full_path(cx)
6118                                );
6119                                (buffer.version(), buffer.save(cx))
6120                            });
6121                            let save = cx.background().spawn(async move {
6122                                let (saved_version, _) = save
6123                                    .await
6124                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6125                                assert!(saved_version.observed_all(&requested_version));
6126                                Ok::<_, anyhow::Error>(())
6127                            });
6128                            if rng.lock().gen_bool(0.3) {
6129                                log::info!("{}: detaching save request", guest_username);
6130                                cx.update(|cx| save.detach_and_log_err(cx));
6131                            } else {
6132                                save.await?;
6133                            }
6134                        }
6135                        40..=44 => {
6136                            let prepare_rename = project.update(cx, |project, cx| {
6137                                log::info!(
6138                                    "{}: preparing rename for buffer {} ({:?})",
6139                                    guest_username,
6140                                    buffer.read(cx).remote_id(),
6141                                    buffer.read(cx).file().unwrap().full_path(cx)
6142                                );
6143                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6144                                project.prepare_rename(buffer, offset, cx)
6145                            });
6146                            let prepare_rename = cx.background().spawn(async move {
6147                                prepare_rename.await.map_err(|err| {
6148                                    anyhow!("prepare rename request failed: {:?}", err)
6149                                })
6150                            });
6151                            if rng.lock().gen_bool(0.3) {
6152                                log::info!("{}: detaching prepare rename request", guest_username);
6153                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6154                            } else {
6155                                prepare_rename.await?;
6156                            }
6157                        }
6158                        45..=49 => {
6159                            let definitions = project.update(cx, |project, cx| {
6160                                log::info!(
6161                                    "{}: requesting definitions for buffer {} ({:?})",
6162                                    guest_username,
6163                                    buffer.read(cx).remote_id(),
6164                                    buffer.read(cx).file().unwrap().full_path(cx)
6165                                );
6166                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6167                                project.definition(&buffer, offset, cx)
6168                            });
6169                            let definitions = cx.background().spawn(async move {
6170                                definitions
6171                                    .await
6172                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6173                            });
6174                            if rng.lock().gen_bool(0.3) {
6175                                log::info!("{}: detaching definitions request", guest_username);
6176                                cx.update(|cx| definitions.detach_and_log_err(cx));
6177                            } else {
6178                                client
6179                                    .buffers
6180                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6181                            }
6182                        }
6183                        50..=54 => {
6184                            let highlights = project.update(cx, |project, cx| {
6185                                log::info!(
6186                                    "{}: requesting highlights for buffer {} ({:?})",
6187                                    guest_username,
6188                                    buffer.read(cx).remote_id(),
6189                                    buffer.read(cx).file().unwrap().full_path(cx)
6190                                );
6191                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6192                                project.document_highlights(&buffer, offset, cx)
6193                            });
6194                            let highlights = cx.background().spawn(async move {
6195                                highlights
6196                                    .await
6197                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6198                            });
6199                            if rng.lock().gen_bool(0.3) {
6200                                log::info!("{}: detaching highlights request", guest_username);
6201                                cx.update(|cx| highlights.detach_and_log_err(cx));
6202                            } else {
6203                                highlights.await?;
6204                            }
6205                        }
6206                        55..=59 => {
6207                            let search = project.update(cx, |project, cx| {
6208                                let query = rng.lock().gen_range('a'..='z');
6209                                log::info!("{}: project-wide search {:?}", guest_username, query);
6210                                project.search(SearchQuery::text(query, false, false), cx)
6211                            });
6212                            let search = cx.background().spawn(async move {
6213                                search
6214                                    .await
6215                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6216                            });
6217                            if rng.lock().gen_bool(0.3) {
6218                                log::info!("{}: detaching search request", guest_username);
6219                                cx.update(|cx| search.detach_and_log_err(cx));
6220                            } else {
6221                                client.buffers.extend(search.await?.into_keys());
6222                            }
6223                        }
6224                        _ => {
6225                            buffer.update(cx, |buffer, cx| {
6226                                log::info!(
6227                                    "{}: updating buffer {} ({:?})",
6228                                    guest_username,
6229                                    buffer.remote_id(),
6230                                    buffer.file().unwrap().full_path(cx)
6231                                );
6232                                if rng.lock().gen_bool(0.7) {
6233                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6234                                } else {
6235                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6236                                }
6237                            });
6238                        }
6239                    }
6240                    cx.background().simulate_random_delay().await;
6241                }
6242                Ok(())
6243            }
6244
6245            let result = simulate_guest_internal(
6246                &mut self,
6247                &guest_username,
6248                project.clone(),
6249                op_start_signal,
6250                rng,
6251                &mut cx,
6252            )
6253            .await;
6254            log::info!("{}: done", guest_username);
6255
6256            self.project = Some(project);
6257            (self, cx, result.err())
6258        }
6259    }
6260
6261    impl Drop for TestClient {
6262        fn drop(&mut self) {
6263            self.client.tear_down();
6264        }
6265    }
6266
6267    impl Executor for Arc<gpui::executor::Background> {
6268        type Timer = gpui::executor::Timer;
6269
6270        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6271            self.spawn(future).detach();
6272        }
6273
6274        fn timer(&self, duration: Duration) -> Self::Timer {
6275            self.as_ref().timer(duration)
6276        }
6277    }
6278
6279    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6280        channel
6281            .messages()
6282            .cursor::<()>()
6283            .map(|m| {
6284                (
6285                    m.sender.github_login.clone(),
6286                    m.body.clone(),
6287                    m.is_pending(),
6288                )
6289            })
6290            .collect()
6291    }
6292
6293    struct EmptyView;
6294
6295    impl gpui::Entity for EmptyView {
6296        type Event = ();
6297    }
6298
6299    impl gpui::View for EmptyView {
6300        fn ui_name() -> &'static str {
6301            "empty view"
6302        }
6303
6304        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6305            gpui::Element::boxed(gpui::elements::Empty)
6306        }
6307    }
6308}