rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::{HashMap, HashSet};
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, instrument, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::respond_to_contact_request)
 159            .add_request_handler(Server::join_channel)
 160            .add_message_handler(Server::leave_channel)
 161            .add_request_handler(Server::send_channel_message)
 162            .add_request_handler(Server::follow)
 163            .add_message_handler(Server::unfollow)
 164            .add_message_handler(Server::update_followers)
 165            .add_request_handler(Server::get_channel_messages);
 166
 167        Arc::new(server)
 168    }
 169
 170    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 171    where
 172        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 173        Fut: 'static + Send + Future<Output = Result<()>>,
 174        M: EnvelopedMessage,
 175    {
 176        let prev_handler = self.handlers.insert(
 177            TypeId::of::<M>(),
 178            Box::new(move |server, envelope| {
 179                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 180                let span = info_span!(
 181                    "handle message",
 182                    payload_type = envelope.payload_type_name(),
 183                    payload = format!("{:?}", envelope.payload).as_str(),
 184                );
 185                let future = (handler)(server, *envelope);
 186                async move {
 187                    if let Err(error) = future.await {
 188                        tracing::error!(%error, "error handling message");
 189                    }
 190                }
 191                .instrument(span)
 192                .boxed()
 193            }),
 194        );
 195        if prev_handler.is_some() {
 196            panic!("registered a handler for the same message twice");
 197        }
 198        self
 199    }
 200
 201    /// Handle a request while holding a lock to the store. This is useful when we're registering
 202    /// a connection but we want to respond on the connection before anybody else can send on it.
 203    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 204    where
 205        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 206        Fut: Send + Future<Output = Result<()>>,
 207        M: RequestMessage,
 208    {
 209        let handler = Arc::new(handler);
 210        self.add_message_handler(move |server, envelope| {
 211            let receipt = envelope.receipt();
 212            let handler = handler.clone();
 213            async move {
 214                let responded = Arc::new(AtomicBool::default());
 215                let response = Response {
 216                    server: server.clone(),
 217                    responded: responded.clone(),
 218                    receipt: envelope.receipt(),
 219                };
 220                match (handler)(server.clone(), envelope, response).await {
 221                    Ok(()) => {
 222                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 223                            Ok(())
 224                        } else {
 225                            Err(anyhow!("handler did not send a response"))?
 226                        }
 227                    }
 228                    Err(error) => {
 229                        server.peer.respond_with_error(
 230                            receipt,
 231                            proto::Error {
 232                                message: error.to_string(),
 233                            },
 234                        )?;
 235                        Err(error)
 236                    }
 237                }
 238            }
 239        })
 240    }
 241
 242    pub fn handle_connection<E: Executor>(
 243        self: &Arc<Self>,
 244        connection: Connection,
 245        address: String,
 246        user_id: UserId,
 247        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 248        executor: E,
 249    ) -> impl Future<Output = ()> {
 250        let mut this = self.clone();
 251        let span = info_span!("handle connection", %user_id, %address);
 252        async move {
 253            let (connection_id, handle_io, mut incoming_rx) = this
 254                .peer
 255                .add_connection(connection, {
 256                    let executor = executor.clone();
 257                    move |duration| {
 258                        let timer = executor.sleep(duration);
 259                        async move {
 260                            timer.await;
 261                        }
 262                    }
 263                })
 264                .await;
 265
 266            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 267
 268            if let Some(send_connection_id) = send_connection_id.as_mut() {
 269                let _ = send_connection_id.send(connection_id).await;
 270            }
 271
 272            {
 273                let mut state = this.state_mut().await;
 274                state.add_connection(connection_id, user_id);
 275                this.update_contacts_for_users(&*state, &[user_id]);
 276            }
 277
 278            let handle_io = handle_io.fuse();
 279            futures::pin_mut!(handle_io);
 280            loop {
 281                let next_message = incoming_rx.next().fuse();
 282                futures::pin_mut!(next_message);
 283                futures::select_biased! {
 284                    result = handle_io => {
 285                        if let Err(error) = result {
 286                            tracing::error!(%error, "error handling I/O");
 287                        }
 288                        break;
 289                    }
 290                    message = next_message => {
 291                        if let Some(message) = message {
 292                            let type_name = message.payload_type_name();
 293                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 294                            async {
 295                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 296                                    let notifications = this.notifications.clone();
 297                                    let is_background = message.is_background();
 298                                    let handle_message = (handler)(this.clone(), message);
 299                                    let handle_message = async move {
 300                                        handle_message.await;
 301                                        if let Some(mut notifications) = notifications {
 302                                            let _ = notifications.send(()).await;
 303                                        }
 304                                    };
 305                                    if is_background {
 306                                        executor.spawn_detached(handle_message);
 307                                    } else {
 308                                        handle_message.await;
 309                                    }
 310                                } else {
 311                                    tracing::error!("no message handler");
 312                                }
 313                            }.instrument(span).await;
 314                        } else {
 315                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 316                            break;
 317                        }
 318                    }
 319                }
 320            }
 321
 322            if let Err(error) = this.sign_out(connection_id).await {
 323                tracing::error!(%error, "error signing out");
 324            }
 325        }.instrument(span)
 326    }
 327
 328    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 329        self.peer.disconnect(connection_id);
 330        let mut state = self.state_mut().await;
 331        let removed_connection = state.remove_connection(connection_id)?;
 332
 333        for (project_id, project) in removed_connection.hosted_projects {
 334            if let Some(share) = project.share {
 335                broadcast(
 336                    connection_id,
 337                    share.guests.keys().copied().collect(),
 338                    |conn_id| {
 339                        self.peer
 340                            .send(conn_id, proto::UnshareProject { project_id })
 341                    },
 342                );
 343            }
 344        }
 345
 346        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 347            broadcast(connection_id, peer_ids, |conn_id| {
 348                self.peer.send(
 349                    conn_id,
 350                    proto::RemoveProjectCollaborator {
 351                        project_id,
 352                        peer_id: connection_id.0,
 353                    },
 354                )
 355            });
 356        }
 357
 358        self.update_contacts_for_users(&*state, removed_connection.contact_ids.iter());
 359        Ok(())
 360    }
 361
 362    async fn ping(
 363        self: Arc<Server>,
 364        _: TypedEnvelope<proto::Ping>,
 365        response: Response<proto::Ping>,
 366    ) -> Result<()> {
 367        response.send(proto::Ack {})?;
 368        Ok(())
 369    }
 370
 371    async fn register_project(
 372        self: Arc<Server>,
 373        request: TypedEnvelope<proto::RegisterProject>,
 374        response: Response<proto::RegisterProject>,
 375    ) -> Result<()> {
 376        let project_id = {
 377            let mut state = self.state_mut().await;
 378            let user_id = state.user_id_for_connection(request.sender_id)?;
 379            state.register_project(request.sender_id, user_id)
 380        };
 381        response.send(proto::RegisterProjectResponse { project_id })?;
 382        Ok(())
 383    }
 384
 385    async fn unregister_project(
 386        self: Arc<Server>,
 387        request: TypedEnvelope<proto::UnregisterProject>,
 388    ) -> Result<()> {
 389        let mut state = self.state_mut().await;
 390        let project = state.unregister_project(request.payload.project_id, request.sender_id)?;
 391        self.update_contacts_for_users(&*state, &project.authorized_user_ids());
 392        Ok(())
 393    }
 394
 395    async fn share_project(
 396        self: Arc<Server>,
 397        request: TypedEnvelope<proto::ShareProject>,
 398        response: Response<proto::ShareProject>,
 399    ) -> Result<()> {
 400        let mut state = self.state_mut().await;
 401        let project = state.share_project(request.payload.project_id, request.sender_id)?;
 402        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 403        response.send(proto::Ack {})?;
 404        Ok(())
 405    }
 406
 407    async fn unshare_project(
 408        self: Arc<Server>,
 409        request: TypedEnvelope<proto::UnshareProject>,
 410    ) -> Result<()> {
 411        let project_id = request.payload.project_id;
 412        let mut state = self.state_mut().await;
 413        let project = state.unshare_project(project_id, request.sender_id)?;
 414        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 415            self.peer
 416                .send(conn_id, proto::UnshareProject { project_id })
 417        });
 418        self.update_contacts_for_users(&mut *state, &project.authorized_user_ids);
 419        Ok(())
 420    }
 421
 422    async fn join_project(
 423        self: Arc<Server>,
 424        request: TypedEnvelope<proto::JoinProject>,
 425        response: Response<proto::JoinProject>,
 426    ) -> Result<()> {
 427        let project_id = request.payload.project_id;
 428
 429        let state = &mut *self.state_mut().await;
 430        let user_id = state.user_id_for_connection(request.sender_id)?;
 431        let (response_payload, connection_ids, contact_user_ids) = state
 432            .join_project(request.sender_id, user_id, project_id)
 433            .and_then(|joined| {
 434                let share = joined.project.share()?;
 435                let peer_count = share.guests.len();
 436                let mut collaborators = Vec::with_capacity(peer_count);
 437                collaborators.push(proto::Collaborator {
 438                    peer_id: joined.project.host_connection_id.0,
 439                    replica_id: 0,
 440                    user_id: joined.project.host_user_id.to_proto(),
 441                });
 442                let worktrees = share
 443                    .worktrees
 444                    .iter()
 445                    .filter_map(|(id, shared_worktree)| {
 446                        let worktree = joined.project.worktrees.get(&id)?;
 447                        Some(proto::Worktree {
 448                            id: *id,
 449                            root_name: worktree.root_name.clone(),
 450                            entries: shared_worktree.entries.values().cloned().collect(),
 451                            diagnostic_summaries: shared_worktree
 452                                .diagnostic_summaries
 453                                .values()
 454                                .cloned()
 455                                .collect(),
 456                            visible: worktree.visible,
 457                            scan_id: shared_worktree.scan_id,
 458                        })
 459                    })
 460                    .collect();
 461                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 462                    if *peer_conn_id != request.sender_id {
 463                        collaborators.push(proto::Collaborator {
 464                            peer_id: peer_conn_id.0,
 465                            replica_id: *peer_replica_id as u32,
 466                            user_id: peer_user_id.to_proto(),
 467                        });
 468                    }
 469                }
 470                let response = proto::JoinProjectResponse {
 471                    worktrees,
 472                    replica_id: joined.replica_id as u32,
 473                    collaborators,
 474                    language_servers: joined.project.language_servers.clone(),
 475                };
 476                let connection_ids = joined.project.connection_ids();
 477                let contact_user_ids = joined.project.authorized_user_ids();
 478                Ok((response, connection_ids, contact_user_ids))
 479            })?;
 480
 481        broadcast(request.sender_id, connection_ids, |conn_id| {
 482            self.peer.send(
 483                conn_id,
 484                proto::AddProjectCollaborator {
 485                    project_id,
 486                    collaborator: Some(proto::Collaborator {
 487                        peer_id: request.sender_id.0,
 488                        replica_id: response_payload.replica_id,
 489                        user_id: user_id.to_proto(),
 490                    }),
 491                },
 492            )
 493        });
 494        self.update_contacts_for_users(state, &contact_user_ids);
 495        response.send(response_payload)?;
 496        Ok(())
 497    }
 498
 499    async fn leave_project(
 500        self: Arc<Server>,
 501        request: TypedEnvelope<proto::LeaveProject>,
 502    ) -> Result<()> {
 503        let sender_id = request.sender_id;
 504        let project_id = request.payload.project_id;
 505        let mut state = self.state_mut().await;
 506        let worktree = state.leave_project(sender_id, project_id)?;
 507        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 508            self.peer.send(
 509                conn_id,
 510                proto::RemoveProjectCollaborator {
 511                    project_id,
 512                    peer_id: sender_id.0,
 513                },
 514            )
 515        });
 516        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 517        Ok(())
 518    }
 519
 520    async fn register_worktree(
 521        self: Arc<Server>,
 522        request: TypedEnvelope<proto::RegisterWorktree>,
 523        response: Response<proto::RegisterWorktree>,
 524    ) -> Result<()> {
 525        let mut contact_user_ids = HashSet::default();
 526        for github_login in &request.payload.authorized_logins {
 527            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 528            contact_user_ids.insert(contact_user_id);
 529        }
 530
 531        let mut state = self.state_mut().await;
 532        let host_user_id = state.user_id_for_connection(request.sender_id)?;
 533        contact_user_ids.insert(host_user_id);
 534
 535        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 536        let guest_connection_ids = state
 537            .read_project(request.payload.project_id, request.sender_id)?
 538            .guest_connection_ids();
 539        state.register_worktree(
 540            request.payload.project_id,
 541            request.payload.worktree_id,
 542            request.sender_id,
 543            Worktree {
 544                authorized_user_ids: contact_user_ids.clone(),
 545                root_name: request.payload.root_name.clone(),
 546                visible: request.payload.visible,
 547            },
 548        )?;
 549
 550        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 551            self.peer
 552                .forward_send(request.sender_id, connection_id, request.payload.clone())
 553        });
 554        self.update_contacts_for_users(&*state, &contact_user_ids);
 555        response.send(proto::Ack {})?;
 556        Ok(())
 557    }
 558
 559    async fn unregister_worktree(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::UnregisterWorktree>,
 562    ) -> Result<()> {
 563        let project_id = request.payload.project_id;
 564        let worktree_id = request.payload.worktree_id;
 565        let mut state = self.state_mut().await;
 566        let (worktree, guest_connection_ids) =
 567            state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 568        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 569            self.peer.send(
 570                conn_id,
 571                proto::UnregisterWorktree {
 572                    project_id,
 573                    worktree_id,
 574                },
 575            )
 576        });
 577        self.update_contacts_for_users(&*state, &worktree.authorized_user_ids);
 578        Ok(())
 579    }
 580
 581    async fn update_worktree(
 582        self: Arc<Server>,
 583        request: TypedEnvelope<proto::UpdateWorktree>,
 584        response: Response<proto::UpdateWorktree>,
 585    ) -> Result<()> {
 586        let connection_ids = self.state_mut().await.update_worktree(
 587            request.sender_id,
 588            request.payload.project_id,
 589            request.payload.worktree_id,
 590            &request.payload.removed_entries,
 591            &request.payload.updated_entries,
 592            request.payload.scan_id,
 593        )?;
 594
 595        broadcast(request.sender_id, connection_ids, |connection_id| {
 596            self.peer
 597                .forward_send(request.sender_id, connection_id, request.payload.clone())
 598        });
 599        response.send(proto::Ack {})?;
 600        Ok(())
 601    }
 602
 603    async fn update_diagnostic_summary(
 604        self: Arc<Server>,
 605        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 606    ) -> Result<()> {
 607        let summary = request
 608            .payload
 609            .summary
 610            .clone()
 611            .ok_or_else(|| anyhow!("invalid summary"))?;
 612        let receiver_ids = self.state_mut().await.update_diagnostic_summary(
 613            request.payload.project_id,
 614            request.payload.worktree_id,
 615            request.sender_id,
 616            summary,
 617        )?;
 618
 619        broadcast(request.sender_id, receiver_ids, |connection_id| {
 620            self.peer
 621                .forward_send(request.sender_id, connection_id, request.payload.clone())
 622        });
 623        Ok(())
 624    }
 625
 626    async fn start_language_server(
 627        self: Arc<Server>,
 628        request: TypedEnvelope<proto::StartLanguageServer>,
 629    ) -> Result<()> {
 630        let receiver_ids = self.state_mut().await.start_language_server(
 631            request.payload.project_id,
 632            request.sender_id,
 633            request
 634                .payload
 635                .server
 636                .clone()
 637                .ok_or_else(|| anyhow!("invalid language server"))?,
 638        )?;
 639        broadcast(request.sender_id, receiver_ids, |connection_id| {
 640            self.peer
 641                .forward_send(request.sender_id, connection_id, request.payload.clone())
 642        });
 643        Ok(())
 644    }
 645
 646    async fn update_language_server(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::UpdateLanguageServer>,
 649    ) -> Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .await
 653            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 654        broadcast(request.sender_id, receiver_ids, |connection_id| {
 655            self.peer
 656                .forward_send(request.sender_id, connection_id, request.payload.clone())
 657        });
 658        Ok(())
 659    }
 660
 661    async fn forward_project_request<T>(
 662        self: Arc<Server>,
 663        request: TypedEnvelope<T>,
 664        response: Response<T>,
 665    ) -> Result<()>
 666    where
 667        T: EntityMessage + RequestMessage,
 668    {
 669        let host_connection_id = self
 670            .state()
 671            .await
 672            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 673            .host_connection_id;
 674
 675        response.send(
 676            self.peer
 677                .forward_request(request.sender_id, host_connection_id, request.payload)
 678                .await?,
 679        )?;
 680        Ok(())
 681    }
 682
 683    async fn save_buffer(
 684        self: Arc<Server>,
 685        request: TypedEnvelope<proto::SaveBuffer>,
 686        response: Response<proto::SaveBuffer>,
 687    ) -> Result<()> {
 688        let host = self
 689            .state()
 690            .await
 691            .read_project(request.payload.project_id, request.sender_id)?
 692            .host_connection_id;
 693        let response_payload = self
 694            .peer
 695            .forward_request(request.sender_id, host, request.payload.clone())
 696            .await?;
 697
 698        let mut guests = self
 699            .state()
 700            .await
 701            .read_project(request.payload.project_id, request.sender_id)?
 702            .connection_ids();
 703        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 704        broadcast(host, guests, |conn_id| {
 705            self.peer
 706                .forward_send(host, conn_id, response_payload.clone())
 707        });
 708        response.send(response_payload)?;
 709        Ok(())
 710    }
 711
 712    async fn update_buffer(
 713        self: Arc<Server>,
 714        request: TypedEnvelope<proto::UpdateBuffer>,
 715        response: Response<proto::UpdateBuffer>,
 716    ) -> Result<()> {
 717        let receiver_ids = self
 718            .state()
 719            .await
 720            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 721        broadcast(request.sender_id, receiver_ids, |connection_id| {
 722            self.peer
 723                .forward_send(request.sender_id, connection_id, request.payload.clone())
 724        });
 725        response.send(proto::Ack {})?;
 726        Ok(())
 727    }
 728
 729    async fn update_buffer_file(
 730        self: Arc<Server>,
 731        request: TypedEnvelope<proto::UpdateBufferFile>,
 732    ) -> Result<()> {
 733        let receiver_ids = self
 734            .state()
 735            .await
 736            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 737        broadcast(request.sender_id, receiver_ids, |connection_id| {
 738            self.peer
 739                .forward_send(request.sender_id, connection_id, request.payload.clone())
 740        });
 741        Ok(())
 742    }
 743
 744    async fn buffer_reloaded(
 745        self: Arc<Server>,
 746        request: TypedEnvelope<proto::BufferReloaded>,
 747    ) -> Result<()> {
 748        let receiver_ids = self
 749            .state()
 750            .await
 751            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 752        broadcast(request.sender_id, receiver_ids, |connection_id| {
 753            self.peer
 754                .forward_send(request.sender_id, connection_id, request.payload.clone())
 755        });
 756        Ok(())
 757    }
 758
 759    async fn buffer_saved(
 760        self: Arc<Server>,
 761        request: TypedEnvelope<proto::BufferSaved>,
 762    ) -> Result<()> {
 763        let receiver_ids = self
 764            .state()
 765            .await
 766            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 767        broadcast(request.sender_id, receiver_ids, |connection_id| {
 768            self.peer
 769                .forward_send(request.sender_id, connection_id, request.payload.clone())
 770        });
 771        Ok(())
 772    }
 773
 774    async fn follow(
 775        self: Arc<Self>,
 776        request: TypedEnvelope<proto::Follow>,
 777        response: Response<proto::Follow>,
 778    ) -> Result<()> {
 779        let leader_id = ConnectionId(request.payload.leader_id);
 780        let follower_id = request.sender_id;
 781        if !self
 782            .state()
 783            .await
 784            .project_connection_ids(request.payload.project_id, follower_id)?
 785            .contains(&leader_id)
 786        {
 787            Err(anyhow!("no such peer"))?;
 788        }
 789        let mut response_payload = self
 790            .peer
 791            .forward_request(request.sender_id, leader_id, request.payload)
 792            .await?;
 793        response_payload
 794            .views
 795            .retain(|view| view.leader_id != Some(follower_id.0));
 796        response.send(response_payload)?;
 797        Ok(())
 798    }
 799
 800    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 801        let leader_id = ConnectionId(request.payload.leader_id);
 802        if !self
 803            .state()
 804            .await
 805            .project_connection_ids(request.payload.project_id, request.sender_id)?
 806            .contains(&leader_id)
 807        {
 808            Err(anyhow!("no such peer"))?;
 809        }
 810        self.peer
 811            .forward_send(request.sender_id, leader_id, request.payload)?;
 812        Ok(())
 813    }
 814
 815    async fn update_followers(
 816        self: Arc<Self>,
 817        request: TypedEnvelope<proto::UpdateFollowers>,
 818    ) -> Result<()> {
 819        let connection_ids = self
 820            .state()
 821            .await
 822            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 823        let leader_id = request
 824            .payload
 825            .variant
 826            .as_ref()
 827            .and_then(|variant| match variant {
 828                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 829                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 830                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 831            });
 832        for follower_id in &request.payload.follower_ids {
 833            let follower_id = ConnectionId(*follower_id);
 834            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 835                self.peer
 836                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 837            }
 838        }
 839        Ok(())
 840    }
 841
 842    async fn get_channels(
 843        self: Arc<Server>,
 844        request: TypedEnvelope<proto::GetChannels>,
 845        response: Response<proto::GetChannels>,
 846    ) -> Result<()> {
 847        let user_id = self
 848            .state()
 849            .await
 850            .user_id_for_connection(request.sender_id)?;
 851        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 852        response.send(proto::GetChannelsResponse {
 853            channels: channels
 854                .into_iter()
 855                .map(|chan| proto::Channel {
 856                    id: chan.id.to_proto(),
 857                    name: chan.name,
 858                })
 859                .collect(),
 860        })?;
 861        Ok(())
 862    }
 863
 864    async fn get_users(
 865        self: Arc<Server>,
 866        request: TypedEnvelope<proto::GetUsers>,
 867        response: Response<proto::GetUsers>,
 868    ) -> Result<()> {
 869        let user_ids = request
 870            .payload
 871            .user_ids
 872            .into_iter()
 873            .map(UserId::from_proto)
 874            .collect();
 875        let users = self
 876            .app_state
 877            .db
 878            .get_users_by_ids(user_ids)
 879            .await?
 880            .into_iter()
 881            .map(|user| proto::User {
 882                id: user.id.to_proto(),
 883                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 884                github_login: user.github_login,
 885            })
 886            .collect();
 887        response.send(proto::UsersResponse { users })?;
 888        Ok(())
 889    }
 890
 891    async fn fuzzy_search_users(
 892        self: Arc<Server>,
 893        request: TypedEnvelope<proto::FuzzySearchUsers>,
 894        response: Response<proto::FuzzySearchUsers>,
 895    ) -> Result<()> {
 896        let query = request.payload.query;
 897        let db = &self.app_state.db;
 898        let users = match query.len() {
 899            0 => vec![],
 900            1 | 2 => db
 901                .get_user_by_github_login(&query)
 902                .await?
 903                .into_iter()
 904                .collect(),
 905            _ => db.fuzzy_search_users(&query, 10).await?,
 906        };
 907        let users = users
 908            .into_iter()
 909            .map(|user| proto::User {
 910                id: user.id.to_proto(),
 911                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 912                github_login: user.github_login,
 913            })
 914            .collect();
 915        response.send(proto::UsersResponse { users })?;
 916        Ok(())
 917    }
 918
 919    async fn request_contact(
 920        self: Arc<Server>,
 921        request: TypedEnvelope<proto::RequestContact>,
 922        response: Response<proto::RequestContact>,
 923    ) -> Result<()> {
 924        let requester_id = self
 925            .store
 926            .read()
 927            .await
 928            .user_id_for_connection(request.sender_id)?;
 929        let responder_id = UserId::from_proto(request.payload.to_user_id);
 930        self.app_state
 931            .db
 932            .send_contact_request(requester_id, responder_id)
 933            .await?;
 934        response.send(proto::Ack {})?;
 935        Ok(())
 936    }
 937
 938    async fn respond_to_contact_request(
 939        self: Arc<Server>,
 940        request: TypedEnvelope<proto::RespondToContactRequest>,
 941        response: Response<proto::RespondToContactRequest>,
 942    ) -> Result<()> {
 943        let responder_id = self
 944            .store
 945            .read()
 946            .await
 947            .user_id_for_connection(request.sender_id)?;
 948        let requester_id = UserId::from_proto(request.payload.requesting_user_id);
 949        self.app_state
 950            .db
 951            .respond_to_contact_request(
 952                responder_id,
 953                requester_id,
 954                request.payload.response == proto::ContactRequestResponse::Accept as i32,
 955            )
 956            .await?;
 957        response.send(proto::Ack {})?;
 958        Ok(())
 959    }
 960
 961    #[instrument(skip(self, state, user_ids))]
 962    fn update_contacts_for_users<'a>(
 963        self: &Arc<Self>,
 964        state: &Store,
 965        user_ids: impl IntoIterator<Item = &'a UserId>,
 966    ) {
 967        for user_id in user_ids {
 968            let contacts = state.contacts_for_user(*user_id);
 969            for connection_id in state.connection_ids_for_user(*user_id) {
 970                self.peer
 971                    .send(
 972                        connection_id,
 973                        proto::UpdateContacts {
 974                            contacts: contacts.clone(),
 975                            pending_requests_from_user_ids: Default::default(),
 976                            pending_requests_to_user_ids: Default::default(),
 977                        },
 978                    )
 979                    .trace_err();
 980            }
 981        }
 982    }
 983
 984    async fn join_channel(
 985        self: Arc<Self>,
 986        request: TypedEnvelope<proto::JoinChannel>,
 987        response: Response<proto::JoinChannel>,
 988    ) -> Result<()> {
 989        let user_id = self
 990            .state()
 991            .await
 992            .user_id_for_connection(request.sender_id)?;
 993        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 994        if !self
 995            .app_state
 996            .db
 997            .can_user_access_channel(user_id, channel_id)
 998            .await?
 999        {
1000            Err(anyhow!("access denied"))?;
1001        }
1002
1003        self.state_mut()
1004            .await
1005            .join_channel(request.sender_id, channel_id);
1006        let messages = self
1007            .app_state
1008            .db
1009            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1010            .await?
1011            .into_iter()
1012            .map(|msg| proto::ChannelMessage {
1013                id: msg.id.to_proto(),
1014                body: msg.body,
1015                timestamp: msg.sent_at.unix_timestamp() as u64,
1016                sender_id: msg.sender_id.to_proto(),
1017                nonce: Some(msg.nonce.as_u128().into()),
1018            })
1019            .collect::<Vec<_>>();
1020        response.send(proto::JoinChannelResponse {
1021            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1022            messages,
1023        })?;
1024        Ok(())
1025    }
1026
1027    async fn leave_channel(
1028        self: Arc<Self>,
1029        request: TypedEnvelope<proto::LeaveChannel>,
1030    ) -> Result<()> {
1031        let user_id = self
1032            .state()
1033            .await
1034            .user_id_for_connection(request.sender_id)?;
1035        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1036        if !self
1037            .app_state
1038            .db
1039            .can_user_access_channel(user_id, channel_id)
1040            .await?
1041        {
1042            Err(anyhow!("access denied"))?;
1043        }
1044
1045        self.state_mut()
1046            .await
1047            .leave_channel(request.sender_id, channel_id);
1048
1049        Ok(())
1050    }
1051
1052    async fn send_channel_message(
1053        self: Arc<Self>,
1054        request: TypedEnvelope<proto::SendChannelMessage>,
1055        response: Response<proto::SendChannelMessage>,
1056    ) -> Result<()> {
1057        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1058        let user_id;
1059        let connection_ids;
1060        {
1061            let state = self.state().await;
1062            user_id = state.user_id_for_connection(request.sender_id)?;
1063            connection_ids = state.channel_connection_ids(channel_id)?;
1064        }
1065
1066        // Validate the message body.
1067        let body = request.payload.body.trim().to_string();
1068        if body.len() > MAX_MESSAGE_LEN {
1069            return Err(anyhow!("message is too long"))?;
1070        }
1071        if body.is_empty() {
1072            return Err(anyhow!("message can't be blank"))?;
1073        }
1074
1075        let timestamp = OffsetDateTime::now_utc();
1076        let nonce = request
1077            .payload
1078            .nonce
1079            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1080
1081        let message_id = self
1082            .app_state
1083            .db
1084            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1085            .await?
1086            .to_proto();
1087        let message = proto::ChannelMessage {
1088            sender_id: user_id.to_proto(),
1089            id: message_id,
1090            body,
1091            timestamp: timestamp.unix_timestamp() as u64,
1092            nonce: Some(nonce),
1093        };
1094        broadcast(request.sender_id, connection_ids, |conn_id| {
1095            self.peer.send(
1096                conn_id,
1097                proto::ChannelMessageSent {
1098                    channel_id: channel_id.to_proto(),
1099                    message: Some(message.clone()),
1100                },
1101            )
1102        });
1103        response.send(proto::SendChannelMessageResponse {
1104            message: Some(message),
1105        })?;
1106        Ok(())
1107    }
1108
1109    async fn get_channel_messages(
1110        self: Arc<Self>,
1111        request: TypedEnvelope<proto::GetChannelMessages>,
1112        response: Response<proto::GetChannelMessages>,
1113    ) -> Result<()> {
1114        let user_id = self
1115            .state()
1116            .await
1117            .user_id_for_connection(request.sender_id)?;
1118        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1119        if !self
1120            .app_state
1121            .db
1122            .can_user_access_channel(user_id, channel_id)
1123            .await?
1124        {
1125            Err(anyhow!("access denied"))?;
1126        }
1127
1128        let messages = self
1129            .app_state
1130            .db
1131            .get_channel_messages(
1132                channel_id,
1133                MESSAGE_COUNT_PER_PAGE,
1134                Some(MessageId::from_proto(request.payload.before_message_id)),
1135            )
1136            .await?
1137            .into_iter()
1138            .map(|msg| proto::ChannelMessage {
1139                id: msg.id.to_proto(),
1140                body: msg.body,
1141                timestamp: msg.sent_at.unix_timestamp() as u64,
1142                sender_id: msg.sender_id.to_proto(),
1143                nonce: Some(msg.nonce.as_u128().into()),
1144            })
1145            .collect::<Vec<_>>();
1146        response.send(proto::GetChannelMessagesResponse {
1147            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1148            messages,
1149        })?;
1150        Ok(())
1151    }
1152
1153    async fn state<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1154        #[cfg(test)]
1155        tokio::task::yield_now().await;
1156        let guard = self.store.read().await;
1157        #[cfg(test)]
1158        tokio::task::yield_now().await;
1159        StoreReadGuard {
1160            guard,
1161            _not_send: PhantomData,
1162        }
1163    }
1164
1165    async fn state_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1166        #[cfg(test)]
1167        tokio::task::yield_now().await;
1168        let guard = self.store.write().await;
1169        #[cfg(test)]
1170        tokio::task::yield_now().await;
1171        StoreWriteGuard {
1172            guard,
1173            _not_send: PhantomData,
1174        }
1175    }
1176}
1177
1178impl<'a> Deref for StoreReadGuard<'a> {
1179    type Target = Store;
1180
1181    fn deref(&self) -> &Self::Target {
1182        &*self.guard
1183    }
1184}
1185
1186impl<'a> Deref for StoreWriteGuard<'a> {
1187    type Target = Store;
1188
1189    fn deref(&self) -> &Self::Target {
1190        &*self.guard
1191    }
1192}
1193
1194impl<'a> DerefMut for StoreWriteGuard<'a> {
1195    fn deref_mut(&mut self) -> &mut Self::Target {
1196        &mut *self.guard
1197    }
1198}
1199
1200impl<'a> Drop for StoreWriteGuard<'a> {
1201    fn drop(&mut self) {
1202        #[cfg(test)]
1203        self.check_invariants();
1204
1205        let metrics = self.metrics();
1206        tracing::info!(
1207            connections = metrics.connections,
1208            registered_projects = metrics.registered_projects,
1209            shared_projects = metrics.shared_projects,
1210            "metrics"
1211        );
1212    }
1213}
1214
1215impl Executor for RealExecutor {
1216    type Sleep = Sleep;
1217
1218    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1219        tokio::task::spawn(future);
1220    }
1221
1222    fn sleep(&self, duration: Duration) -> Self::Sleep {
1223        tokio::time::sleep(duration)
1224    }
1225}
1226
1227#[instrument(skip(f))]
1228fn broadcast<F>(sender_id: ConnectionId, receiver_ids: Vec<ConnectionId>, mut f: F)
1229where
1230    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1231{
1232    for receiver_id in receiver_ids {
1233        if receiver_id != sender_id {
1234            f(receiver_id).trace_err();
1235        }
1236    }
1237}
1238
1239lazy_static! {
1240    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1241}
1242
1243pub struct ProtocolVersion(u32);
1244
1245impl Header for ProtocolVersion {
1246    fn name() -> &'static HeaderName {
1247        &ZED_PROTOCOL_VERSION
1248    }
1249
1250    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1251    where
1252        Self: Sized,
1253        I: Iterator<Item = &'i axum::http::HeaderValue>,
1254    {
1255        let version = values
1256            .next()
1257            .ok_or_else(|| axum::headers::Error::invalid())?
1258            .to_str()
1259            .map_err(|_| axum::headers::Error::invalid())?
1260            .parse()
1261            .map_err(|_| axum::headers::Error::invalid())?;
1262        Ok(Self(version))
1263    }
1264
1265    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1266        values.extend([self.0.to_string().parse().unwrap()]);
1267    }
1268}
1269
1270pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1271    let server = Server::new(app_state.clone(), None);
1272    Router::new()
1273        .route("/rpc", get(handle_websocket_request))
1274        .layer(
1275            ServiceBuilder::new()
1276                .layer(Extension(app_state))
1277                .layer(middleware::from_fn(auth::validate_header))
1278                .layer(Extension(server)),
1279        )
1280}
1281
1282pub async fn handle_websocket_request(
1283    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1284    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1285    Extension(server): Extension<Arc<Server>>,
1286    Extension(user_id): Extension<UserId>,
1287    ws: WebSocketUpgrade,
1288) -> axum::response::Response {
1289    if protocol_version != rpc::PROTOCOL_VERSION {
1290        return (
1291            StatusCode::UPGRADE_REQUIRED,
1292            "client must be upgraded".to_string(),
1293        )
1294            .into_response();
1295    }
1296    let socket_address = socket_address.to_string();
1297    ws.on_upgrade(move |socket| {
1298        let socket = socket
1299            .map_ok(to_tungstenite_message)
1300            .err_into()
1301            .with(|message| async move { Ok(to_axum_message(message)) });
1302        let connection = Connection::new(Box::pin(socket));
1303        server.handle_connection(connection, socket_address, user_id, None, RealExecutor)
1304    })
1305}
1306
1307fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1308    match message {
1309        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1310        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1311        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1312        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1313        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1314            code: frame.code.into(),
1315            reason: frame.reason,
1316        })),
1317    }
1318}
1319
1320fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1321    match message {
1322        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1323        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1324        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1325        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1326        AxumMessage::Close(frame) => {
1327            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1328                code: frame.code.into(),
1329                reason: frame.reason,
1330            }))
1331        }
1332    }
1333}
1334
1335pub trait ResultExt {
1336    type Ok;
1337
1338    fn trace_err(self) -> Option<Self::Ok>;
1339}
1340
1341impl<T, E> ResultExt for Result<T, E>
1342where
1343    E: std::fmt::Debug,
1344{
1345    type Ok = T;
1346
1347    fn trace_err(self) -> Option<T> {
1348        match self {
1349            Ok(value) => Some(value),
1350            Err(error) => {
1351                tracing::error!("{:?}", error);
1352                None
1353            }
1354        }
1355    }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360    use super::*;
1361    use crate::{
1362        db::{tests::TestDb, UserId},
1363        AppState,
1364    };
1365    use ::rpc::Peer;
1366    use client::{
1367        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1368        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1369    };
1370    use collections::BTreeMap;
1371    use editor::{
1372        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1373        ToOffset, ToggleCodeActions, Undo,
1374    };
1375    use gpui::{
1376        executor::{self, Deterministic},
1377        geometry::vector::vec2f,
1378        ModelHandle, TestAppContext, ViewHandle,
1379    };
1380    use language::{
1381        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1382        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1383    };
1384    use lsp::{self, FakeLanguageServer};
1385    use parking_lot::Mutex;
1386    use project::{
1387        fs::{FakeFs, Fs as _},
1388        search::SearchQuery,
1389        worktree::WorktreeHandle,
1390        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1391    };
1392    use rand::prelude::*;
1393    use rpc::PeerId;
1394    use serde_json::json;
1395    use settings::Settings;
1396    use sqlx::types::time::OffsetDateTime;
1397    use std::{
1398        env,
1399        ops::Deref,
1400        path::{Path, PathBuf},
1401        rc::Rc,
1402        sync::{
1403            atomic::{AtomicBool, Ordering::SeqCst},
1404            Arc,
1405        },
1406        time::Duration,
1407    };
1408    use theme::ThemeRegistry;
1409    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1410
1411    #[cfg(test)]
1412    #[ctor::ctor]
1413    fn init_logger() {
1414        if std::env::var("RUST_LOG").is_ok() {
1415            env_logger::init();
1416        }
1417    }
1418
1419    #[gpui::test(iterations = 10)]
1420    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1421        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1422        let lang_registry = Arc::new(LanguageRegistry::test());
1423        let fs = FakeFs::new(cx_a.background());
1424        cx_a.foreground().forbid_parking();
1425
1426        // Connect to a server as 2 clients.
1427        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1428        let client_a = server.create_client(cx_a, "user_a").await;
1429        let client_b = server.create_client(cx_b, "user_b").await;
1430
1431        // Share a project as client A
1432        fs.insert_tree(
1433            "/a",
1434            json!({
1435                ".zed.toml": r#"collaborators = ["user_b"]"#,
1436                "a.txt": "a-contents",
1437                "b.txt": "b-contents",
1438            }),
1439        )
1440        .await;
1441        let project_a = cx_a.update(|cx| {
1442            Project::local(
1443                client_a.clone(),
1444                client_a.user_store.clone(),
1445                lang_registry.clone(),
1446                fs.clone(),
1447                cx,
1448            )
1449        });
1450        let (worktree_a, _) = project_a
1451            .update(cx_a, |p, cx| {
1452                p.find_or_create_local_worktree("/a", true, cx)
1453            })
1454            .await
1455            .unwrap();
1456        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1457        worktree_a
1458            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1459            .await;
1460        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1461        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1462
1463        // Join that project as client B
1464        let project_b = Project::remote(
1465            project_id,
1466            client_b.clone(),
1467            client_b.user_store.clone(),
1468            lang_registry.clone(),
1469            fs.clone(),
1470            &mut cx_b.to_async(),
1471        )
1472        .await
1473        .unwrap();
1474
1475        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1476            assert_eq!(
1477                project
1478                    .collaborators()
1479                    .get(&client_a.peer_id)
1480                    .unwrap()
1481                    .user
1482                    .github_login,
1483                "user_a"
1484            );
1485            project.replica_id()
1486        });
1487        project_a
1488            .condition(&cx_a, |tree, _| {
1489                tree.collaborators()
1490                    .get(&client_b.peer_id)
1491                    .map_or(false, |collaborator| {
1492                        collaborator.replica_id == replica_id_b
1493                            && collaborator.user.github_login == "user_b"
1494                    })
1495            })
1496            .await;
1497
1498        // Open the same file as client B and client A.
1499        let buffer_b = project_b
1500            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1501            .await
1502            .unwrap();
1503        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1504        project_a.read_with(cx_a, |project, cx| {
1505            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1506        });
1507        let buffer_a = project_a
1508            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1509            .await
1510            .unwrap();
1511
1512        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1513
1514        // TODO
1515        // // Create a selection set as client B and see that selection set as client A.
1516        // buffer_a
1517        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1518        //     .await;
1519
1520        // Edit the buffer as client B and see that edit as client A.
1521        editor_b.update(cx_b, |editor, cx| {
1522            editor.handle_input(&Input("ok, ".into()), cx)
1523        });
1524        buffer_a
1525            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1526            .await;
1527
1528        // TODO
1529        // // Remove the selection set as client B, see those selections disappear as client A.
1530        cx_b.update(move |_| drop(editor_b));
1531        // buffer_a
1532        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1533        //     .await;
1534
1535        // Dropping the client B's project removes client B from client A's collaborators.
1536        cx_b.update(move |_| drop(project_b));
1537        project_a
1538            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1539            .await;
1540    }
1541
1542    #[gpui::test(iterations = 10)]
1543    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1544        let lang_registry = Arc::new(LanguageRegistry::test());
1545        let fs = FakeFs::new(cx_a.background());
1546        cx_a.foreground().forbid_parking();
1547
1548        // Connect to a server as 2 clients.
1549        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1550        let client_a = server.create_client(cx_a, "user_a").await;
1551        let client_b = server.create_client(cx_b, "user_b").await;
1552
1553        // Share a project as client A
1554        fs.insert_tree(
1555            "/a",
1556            json!({
1557                ".zed.toml": r#"collaborators = ["user_b"]"#,
1558                "a.txt": "a-contents",
1559                "b.txt": "b-contents",
1560            }),
1561        )
1562        .await;
1563        let project_a = cx_a.update(|cx| {
1564            Project::local(
1565                client_a.clone(),
1566                client_a.user_store.clone(),
1567                lang_registry.clone(),
1568                fs.clone(),
1569                cx,
1570            )
1571        });
1572        let (worktree_a, _) = project_a
1573            .update(cx_a, |p, cx| {
1574                p.find_or_create_local_worktree("/a", true, cx)
1575            })
1576            .await
1577            .unwrap();
1578        worktree_a
1579            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1580            .await;
1581        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1582        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1583        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1584        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1585
1586        // Join that project as client B
1587        let project_b = Project::remote(
1588            project_id,
1589            client_b.clone(),
1590            client_b.user_store.clone(),
1591            lang_registry.clone(),
1592            fs.clone(),
1593            &mut cx_b.to_async(),
1594        )
1595        .await
1596        .unwrap();
1597        project_b
1598            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1599            .await
1600            .unwrap();
1601
1602        // Unshare the project as client A
1603        project_a.update(cx_a, |project, cx| project.unshare(cx));
1604        project_b
1605            .condition(cx_b, |project, _| project.is_read_only())
1606            .await;
1607        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1608        cx_b.update(|_| {
1609            drop(project_b);
1610        });
1611
1612        // Share the project again and ensure guests can still join.
1613        project_a
1614            .update(cx_a, |project, cx| project.share(cx))
1615            .await
1616            .unwrap();
1617        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1618
1619        let project_b2 = Project::remote(
1620            project_id,
1621            client_b.clone(),
1622            client_b.user_store.clone(),
1623            lang_registry.clone(),
1624            fs.clone(),
1625            &mut cx_b.to_async(),
1626        )
1627        .await
1628        .unwrap();
1629        project_b2
1630            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1631            .await
1632            .unwrap();
1633    }
1634
1635    #[gpui::test(iterations = 10)]
1636    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1637        let lang_registry = Arc::new(LanguageRegistry::test());
1638        let fs = FakeFs::new(cx_a.background());
1639        cx_a.foreground().forbid_parking();
1640
1641        // Connect to a server as 2 clients.
1642        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1643        let client_a = server.create_client(cx_a, "user_a").await;
1644        let client_b = server.create_client(cx_b, "user_b").await;
1645
1646        // Share a project as client A
1647        fs.insert_tree(
1648            "/a",
1649            json!({
1650                ".zed.toml": r#"collaborators = ["user_b"]"#,
1651                "a.txt": "a-contents",
1652                "b.txt": "b-contents",
1653            }),
1654        )
1655        .await;
1656        let project_a = cx_a.update(|cx| {
1657            Project::local(
1658                client_a.clone(),
1659                client_a.user_store.clone(),
1660                lang_registry.clone(),
1661                fs.clone(),
1662                cx,
1663            )
1664        });
1665        let (worktree_a, _) = project_a
1666            .update(cx_a, |p, cx| {
1667                p.find_or_create_local_worktree("/a", true, cx)
1668            })
1669            .await
1670            .unwrap();
1671        worktree_a
1672            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1673            .await;
1674        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1675        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1676        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1677        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1678
1679        // Join that project as client B
1680        let project_b = Project::remote(
1681            project_id,
1682            client_b.clone(),
1683            client_b.user_store.clone(),
1684            lang_registry.clone(),
1685            fs.clone(),
1686            &mut cx_b.to_async(),
1687        )
1688        .await
1689        .unwrap();
1690        project_b
1691            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1692            .await
1693            .unwrap();
1694
1695        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1696        server.disconnect_client(client_a.current_user_id(cx_a));
1697        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1698        project_a
1699            .condition(cx_a, |project, _| project.collaborators().is_empty())
1700            .await;
1701        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1702        project_b
1703            .condition(cx_b, |project, _| project.is_read_only())
1704            .await;
1705        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1706        cx_b.update(|_| {
1707            drop(project_b);
1708        });
1709
1710        // Await reconnection
1711        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1712
1713        // Share the project again and ensure guests can still join.
1714        project_a
1715            .update(cx_a, |project, cx| project.share(cx))
1716            .await
1717            .unwrap();
1718        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1719
1720        let project_b2 = Project::remote(
1721            project_id,
1722            client_b.clone(),
1723            client_b.user_store.clone(),
1724            lang_registry.clone(),
1725            fs.clone(),
1726            &mut cx_b.to_async(),
1727        )
1728        .await
1729        .unwrap();
1730        project_b2
1731            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1732            .await
1733            .unwrap();
1734    }
1735
1736    #[gpui::test(iterations = 10)]
1737    async fn test_propagate_saves_and_fs_changes(
1738        cx_a: &mut TestAppContext,
1739        cx_b: &mut TestAppContext,
1740        cx_c: &mut TestAppContext,
1741    ) {
1742        let lang_registry = Arc::new(LanguageRegistry::test());
1743        let fs = FakeFs::new(cx_a.background());
1744        cx_a.foreground().forbid_parking();
1745
1746        // Connect to a server as 3 clients.
1747        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1748        let client_a = server.create_client(cx_a, "user_a").await;
1749        let client_b = server.create_client(cx_b, "user_b").await;
1750        let client_c = server.create_client(cx_c, "user_c").await;
1751
1752        // Share a worktree as client A.
1753        fs.insert_tree(
1754            "/a",
1755            json!({
1756                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1757                "file1": "",
1758                "file2": ""
1759            }),
1760        )
1761        .await;
1762        let project_a = cx_a.update(|cx| {
1763            Project::local(
1764                client_a.clone(),
1765                client_a.user_store.clone(),
1766                lang_registry.clone(),
1767                fs.clone(),
1768                cx,
1769            )
1770        });
1771        let (worktree_a, _) = project_a
1772            .update(cx_a, |p, cx| {
1773                p.find_or_create_local_worktree("/a", true, cx)
1774            })
1775            .await
1776            .unwrap();
1777        worktree_a
1778            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1779            .await;
1780        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1781        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1782        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1783
1784        // Join that worktree as clients B and C.
1785        let project_b = Project::remote(
1786            project_id,
1787            client_b.clone(),
1788            client_b.user_store.clone(),
1789            lang_registry.clone(),
1790            fs.clone(),
1791            &mut cx_b.to_async(),
1792        )
1793        .await
1794        .unwrap();
1795        let project_c = Project::remote(
1796            project_id,
1797            client_c.clone(),
1798            client_c.user_store.clone(),
1799            lang_registry.clone(),
1800            fs.clone(),
1801            &mut cx_c.to_async(),
1802        )
1803        .await
1804        .unwrap();
1805        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1806        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1807
1808        // Open and edit a buffer as both guests B and C.
1809        let buffer_b = project_b
1810            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1811            .await
1812            .unwrap();
1813        let buffer_c = project_c
1814            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1815            .await
1816            .unwrap();
1817        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1818        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1819
1820        // Open and edit that buffer as the host.
1821        let buffer_a = project_a
1822            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1823            .await
1824            .unwrap();
1825
1826        buffer_a
1827            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1828            .await;
1829        buffer_a.update(cx_a, |buf, cx| {
1830            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1831        });
1832
1833        // Wait for edits to propagate
1834        buffer_a
1835            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1836            .await;
1837        buffer_b
1838            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1839            .await;
1840        buffer_c
1841            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1842            .await;
1843
1844        // Edit the buffer as the host and concurrently save as guest B.
1845        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1846        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1847        save_b.await.unwrap();
1848        assert_eq!(
1849            fs.load("/a/file1".as_ref()).await.unwrap(),
1850            "hi-a, i-am-c, i-am-b, i-am-a"
1851        );
1852        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1853        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1854        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1855
1856        worktree_a.flush_fs_events(cx_a).await;
1857
1858        // Make changes on host's file system, see those changes on guest worktrees.
1859        fs.rename(
1860            "/a/file1".as_ref(),
1861            "/a/file1-renamed".as_ref(),
1862            Default::default(),
1863        )
1864        .await
1865        .unwrap();
1866
1867        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1868            .await
1869            .unwrap();
1870        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1871
1872        worktree_a
1873            .condition(&cx_a, |tree, _| {
1874                tree.paths()
1875                    .map(|p| p.to_string_lossy())
1876                    .collect::<Vec<_>>()
1877                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1878            })
1879            .await;
1880        worktree_b
1881            .condition(&cx_b, |tree, _| {
1882                tree.paths()
1883                    .map(|p| p.to_string_lossy())
1884                    .collect::<Vec<_>>()
1885                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1886            })
1887            .await;
1888        worktree_c
1889            .condition(&cx_c, |tree, _| {
1890                tree.paths()
1891                    .map(|p| p.to_string_lossy())
1892                    .collect::<Vec<_>>()
1893                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1894            })
1895            .await;
1896
1897        // Ensure buffer files are updated as well.
1898        buffer_a
1899            .condition(&cx_a, |buf, _| {
1900                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1901            })
1902            .await;
1903        buffer_b
1904            .condition(&cx_b, |buf, _| {
1905                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1906            })
1907            .await;
1908        buffer_c
1909            .condition(&cx_c, |buf, _| {
1910                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1911            })
1912            .await;
1913    }
1914
1915    #[gpui::test(iterations = 10)]
1916    async fn test_fs_operations(
1917        executor: Arc<Deterministic>,
1918        cx_a: &mut TestAppContext,
1919        cx_b: &mut TestAppContext,
1920    ) {
1921        executor.forbid_parking();
1922        let fs = FakeFs::new(cx_a.background());
1923
1924        // Connect to a server as 2 clients.
1925        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1926        let mut client_a = server.create_client(cx_a, "user_a").await;
1927        let mut client_b = server.create_client(cx_b, "user_b").await;
1928
1929        // Share a project as client A
1930        fs.insert_tree(
1931            "/dir",
1932            json!({
1933                ".zed.toml": r#"collaborators = ["user_b"]"#,
1934                "a.txt": "a-contents",
1935                "b.txt": "b-contents",
1936            }),
1937        )
1938        .await;
1939
1940        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
1941        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
1942        project_a
1943            .update(cx_a, |project, cx| project.share(cx))
1944            .await
1945            .unwrap();
1946
1947        let project_b = client_b.build_remote_project(project_id, cx_b).await;
1948
1949        let worktree_a =
1950            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
1951        let worktree_b =
1952            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
1953
1954        let entry = project_b
1955            .update(cx_b, |project, cx| {
1956                project
1957                    .create_entry((worktree_id, "c.txt"), false, cx)
1958                    .unwrap()
1959            })
1960            .await
1961            .unwrap();
1962        worktree_a.read_with(cx_a, |worktree, _| {
1963            assert_eq!(
1964                worktree
1965                    .paths()
1966                    .map(|p| p.to_string_lossy())
1967                    .collect::<Vec<_>>(),
1968                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1969            );
1970        });
1971        worktree_b.read_with(cx_b, |worktree, _| {
1972            assert_eq!(
1973                worktree
1974                    .paths()
1975                    .map(|p| p.to_string_lossy())
1976                    .collect::<Vec<_>>(),
1977                [".zed.toml", "a.txt", "b.txt", "c.txt"]
1978            );
1979        });
1980
1981        project_b
1982            .update(cx_b, |project, cx| {
1983                project.rename_entry(entry.id, Path::new("d.txt"), cx)
1984            })
1985            .unwrap()
1986            .await
1987            .unwrap();
1988        worktree_a.read_with(cx_a, |worktree, _| {
1989            assert_eq!(
1990                worktree
1991                    .paths()
1992                    .map(|p| p.to_string_lossy())
1993                    .collect::<Vec<_>>(),
1994                [".zed.toml", "a.txt", "b.txt", "d.txt"]
1995            );
1996        });
1997        worktree_b.read_with(cx_b, |worktree, _| {
1998            assert_eq!(
1999                worktree
2000                    .paths()
2001                    .map(|p| p.to_string_lossy())
2002                    .collect::<Vec<_>>(),
2003                [".zed.toml", "a.txt", "b.txt", "d.txt"]
2004            );
2005        });
2006
2007        let dir_entry = project_b
2008            .update(cx_b, |project, cx| {
2009                project
2010                    .create_entry((worktree_id, "DIR"), true, cx)
2011                    .unwrap()
2012            })
2013            .await
2014            .unwrap();
2015        worktree_a.read_with(cx_a, |worktree, _| {
2016            assert_eq!(
2017                worktree
2018                    .paths()
2019                    .map(|p| p.to_string_lossy())
2020                    .collect::<Vec<_>>(),
2021                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
2022            );
2023        });
2024        worktree_b.read_with(cx_b, |worktree, _| {
2025            assert_eq!(
2026                worktree
2027                    .paths()
2028                    .map(|p| p.to_string_lossy())
2029                    .collect::<Vec<_>>(),
2030                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
2031            );
2032        });
2033
2034        project_b
2035            .update(cx_b, |project, cx| {
2036                project.delete_entry(dir_entry.id, cx).unwrap()
2037            })
2038            .await
2039            .unwrap();
2040        worktree_a.read_with(cx_a, |worktree, _| {
2041            assert_eq!(
2042                worktree
2043                    .paths()
2044                    .map(|p| p.to_string_lossy())
2045                    .collect::<Vec<_>>(),
2046                [".zed.toml", "a.txt", "b.txt", "d.txt"]
2047            );
2048        });
2049        worktree_b.read_with(cx_b, |worktree, _| {
2050            assert_eq!(
2051                worktree
2052                    .paths()
2053                    .map(|p| p.to_string_lossy())
2054                    .collect::<Vec<_>>(),
2055                [".zed.toml", "a.txt", "b.txt", "d.txt"]
2056            );
2057        });
2058
2059        project_b
2060            .update(cx_b, |project, cx| {
2061                project.delete_entry(entry.id, cx).unwrap()
2062            })
2063            .await
2064            .unwrap();
2065        worktree_a.read_with(cx_a, |worktree, _| {
2066            assert_eq!(
2067                worktree
2068                    .paths()
2069                    .map(|p| p.to_string_lossy())
2070                    .collect::<Vec<_>>(),
2071                [".zed.toml", "a.txt", "b.txt"]
2072            );
2073        });
2074        worktree_b.read_with(cx_b, |worktree, _| {
2075            assert_eq!(
2076                worktree
2077                    .paths()
2078                    .map(|p| p.to_string_lossy())
2079                    .collect::<Vec<_>>(),
2080                [".zed.toml", "a.txt", "b.txt"]
2081            );
2082        });
2083    }
2084
2085    #[gpui::test(iterations = 10)]
2086    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2087        cx_a.foreground().forbid_parking();
2088        let lang_registry = Arc::new(LanguageRegistry::test());
2089        let fs = FakeFs::new(cx_a.background());
2090
2091        // Connect to a server as 2 clients.
2092        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2093        let client_a = server.create_client(cx_a, "user_a").await;
2094        let client_b = server.create_client(cx_b, "user_b").await;
2095
2096        // Share a project as client A
2097        fs.insert_tree(
2098            "/dir",
2099            json!({
2100                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2101                "a.txt": "a-contents",
2102            }),
2103        )
2104        .await;
2105
2106        let project_a = cx_a.update(|cx| {
2107            Project::local(
2108                client_a.clone(),
2109                client_a.user_store.clone(),
2110                lang_registry.clone(),
2111                fs.clone(),
2112                cx,
2113            )
2114        });
2115        let (worktree_a, _) = project_a
2116            .update(cx_a, |p, cx| {
2117                p.find_or_create_local_worktree("/dir", true, cx)
2118            })
2119            .await
2120            .unwrap();
2121        worktree_a
2122            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2123            .await;
2124        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2125        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2126        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2127
2128        // Join that project as client B
2129        let project_b = Project::remote(
2130            project_id,
2131            client_b.clone(),
2132            client_b.user_store.clone(),
2133            lang_registry.clone(),
2134            fs.clone(),
2135            &mut cx_b.to_async(),
2136        )
2137        .await
2138        .unwrap();
2139
2140        // Open a buffer as client B
2141        let buffer_b = project_b
2142            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2143            .await
2144            .unwrap();
2145
2146        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2147        buffer_b.read_with(cx_b, |buf, _| {
2148            assert!(buf.is_dirty());
2149            assert!(!buf.has_conflict());
2150        });
2151
2152        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2153        buffer_b
2154            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2155            .await;
2156        buffer_b.read_with(cx_b, |buf, _| {
2157            assert!(!buf.has_conflict());
2158        });
2159
2160        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2161        buffer_b.read_with(cx_b, |buf, _| {
2162            assert!(buf.is_dirty());
2163            assert!(!buf.has_conflict());
2164        });
2165    }
2166
2167    #[gpui::test(iterations = 10)]
2168    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2169        cx_a.foreground().forbid_parking();
2170        let lang_registry = Arc::new(LanguageRegistry::test());
2171        let fs = FakeFs::new(cx_a.background());
2172
2173        // Connect to a server as 2 clients.
2174        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2175        let client_a = server.create_client(cx_a, "user_a").await;
2176        let client_b = server.create_client(cx_b, "user_b").await;
2177
2178        // Share a project as client A
2179        fs.insert_tree(
2180            "/dir",
2181            json!({
2182                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2183                "a.txt": "a-contents",
2184            }),
2185        )
2186        .await;
2187
2188        let project_a = cx_a.update(|cx| {
2189            Project::local(
2190                client_a.clone(),
2191                client_a.user_store.clone(),
2192                lang_registry.clone(),
2193                fs.clone(),
2194                cx,
2195            )
2196        });
2197        let (worktree_a, _) = project_a
2198            .update(cx_a, |p, cx| {
2199                p.find_or_create_local_worktree("/dir", true, cx)
2200            })
2201            .await
2202            .unwrap();
2203        worktree_a
2204            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2205            .await;
2206        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2207        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2208        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2209
2210        // Join that project as client B
2211        let project_b = Project::remote(
2212            project_id,
2213            client_b.clone(),
2214            client_b.user_store.clone(),
2215            lang_registry.clone(),
2216            fs.clone(),
2217            &mut cx_b.to_async(),
2218        )
2219        .await
2220        .unwrap();
2221        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2222
2223        // Open a buffer as client B
2224        let buffer_b = project_b
2225            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2226            .await
2227            .unwrap();
2228        buffer_b.read_with(cx_b, |buf, _| {
2229            assert!(!buf.is_dirty());
2230            assert!(!buf.has_conflict());
2231        });
2232
2233        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2234            .await
2235            .unwrap();
2236        buffer_b
2237            .condition(&cx_b, |buf, _| {
2238                buf.text() == "new contents" && !buf.is_dirty()
2239            })
2240            .await;
2241        buffer_b.read_with(cx_b, |buf, _| {
2242            assert!(!buf.has_conflict());
2243        });
2244    }
2245
2246    #[gpui::test(iterations = 10)]
2247    async fn test_editing_while_guest_opens_buffer(
2248        cx_a: &mut TestAppContext,
2249        cx_b: &mut TestAppContext,
2250    ) {
2251        cx_a.foreground().forbid_parking();
2252        let lang_registry = Arc::new(LanguageRegistry::test());
2253        let fs = FakeFs::new(cx_a.background());
2254
2255        // Connect to a server as 2 clients.
2256        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2257        let client_a = server.create_client(cx_a, "user_a").await;
2258        let client_b = server.create_client(cx_b, "user_b").await;
2259
2260        // Share a project as client A
2261        fs.insert_tree(
2262            "/dir",
2263            json!({
2264                ".zed.toml": r#"collaborators = ["user_b"]"#,
2265                "a.txt": "a-contents",
2266            }),
2267        )
2268        .await;
2269        let project_a = cx_a.update(|cx| {
2270            Project::local(
2271                client_a.clone(),
2272                client_a.user_store.clone(),
2273                lang_registry.clone(),
2274                fs.clone(),
2275                cx,
2276            )
2277        });
2278        let (worktree_a, _) = project_a
2279            .update(cx_a, |p, cx| {
2280                p.find_or_create_local_worktree("/dir", true, cx)
2281            })
2282            .await
2283            .unwrap();
2284        worktree_a
2285            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2286            .await;
2287        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2288        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2289        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2290
2291        // Join that project as client B
2292        let project_b = Project::remote(
2293            project_id,
2294            client_b.clone(),
2295            client_b.user_store.clone(),
2296            lang_registry.clone(),
2297            fs.clone(),
2298            &mut cx_b.to_async(),
2299        )
2300        .await
2301        .unwrap();
2302
2303        // Open a buffer as client A
2304        let buffer_a = project_a
2305            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2306            .await
2307            .unwrap();
2308
2309        // Start opening the same buffer as client B
2310        let buffer_b = cx_b
2311            .background()
2312            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2313
2314        // Edit the buffer as client A while client B is still opening it.
2315        cx_b.background().simulate_random_delay().await;
2316        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2317        cx_b.background().simulate_random_delay().await;
2318        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2319
2320        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2321        let buffer_b = buffer_b.await.unwrap();
2322        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2323    }
2324
2325    #[gpui::test(iterations = 10)]
2326    async fn test_leaving_worktree_while_opening_buffer(
2327        cx_a: &mut TestAppContext,
2328        cx_b: &mut TestAppContext,
2329    ) {
2330        cx_a.foreground().forbid_parking();
2331        let lang_registry = Arc::new(LanguageRegistry::test());
2332        let fs = FakeFs::new(cx_a.background());
2333
2334        // Connect to a server as 2 clients.
2335        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2336        let client_a = server.create_client(cx_a, "user_a").await;
2337        let client_b = server.create_client(cx_b, "user_b").await;
2338
2339        // Share a project as client A
2340        fs.insert_tree(
2341            "/dir",
2342            json!({
2343                ".zed.toml": r#"collaborators = ["user_b"]"#,
2344                "a.txt": "a-contents",
2345            }),
2346        )
2347        .await;
2348        let project_a = cx_a.update(|cx| {
2349            Project::local(
2350                client_a.clone(),
2351                client_a.user_store.clone(),
2352                lang_registry.clone(),
2353                fs.clone(),
2354                cx,
2355            )
2356        });
2357        let (worktree_a, _) = project_a
2358            .update(cx_a, |p, cx| {
2359                p.find_or_create_local_worktree("/dir", true, cx)
2360            })
2361            .await
2362            .unwrap();
2363        worktree_a
2364            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2365            .await;
2366        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2367        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2368        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2369
2370        // Join that project as client B
2371        let project_b = Project::remote(
2372            project_id,
2373            client_b.clone(),
2374            client_b.user_store.clone(),
2375            lang_registry.clone(),
2376            fs.clone(),
2377            &mut cx_b.to_async(),
2378        )
2379        .await
2380        .unwrap();
2381
2382        // See that a guest has joined as client A.
2383        project_a
2384            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2385            .await;
2386
2387        // Begin opening a buffer as client B, but leave the project before the open completes.
2388        let buffer_b = cx_b
2389            .background()
2390            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2391        cx_b.update(|_| drop(project_b));
2392        drop(buffer_b);
2393
2394        // See that the guest has left.
2395        project_a
2396            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2397            .await;
2398    }
2399
2400    #[gpui::test(iterations = 10)]
2401    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2402        cx_a.foreground().forbid_parking();
2403        let lang_registry = Arc::new(LanguageRegistry::test());
2404        let fs = FakeFs::new(cx_a.background());
2405
2406        // Connect to a server as 2 clients.
2407        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2408        let client_a = server.create_client(cx_a, "user_a").await;
2409        let client_b = server.create_client(cx_b, "user_b").await;
2410
2411        // Share a project as client A
2412        fs.insert_tree(
2413            "/a",
2414            json!({
2415                ".zed.toml": r#"collaborators = ["user_b"]"#,
2416                "a.txt": "a-contents",
2417                "b.txt": "b-contents",
2418            }),
2419        )
2420        .await;
2421        let project_a = cx_a.update(|cx| {
2422            Project::local(
2423                client_a.clone(),
2424                client_a.user_store.clone(),
2425                lang_registry.clone(),
2426                fs.clone(),
2427                cx,
2428            )
2429        });
2430        let (worktree_a, _) = project_a
2431            .update(cx_a, |p, cx| {
2432                p.find_or_create_local_worktree("/a", true, cx)
2433            })
2434            .await
2435            .unwrap();
2436        worktree_a
2437            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2438            .await;
2439        let project_id = project_a
2440            .update(cx_a, |project, _| project.next_remote_id())
2441            .await;
2442        project_a
2443            .update(cx_a, |project, cx| project.share(cx))
2444            .await
2445            .unwrap();
2446
2447        // Join that project as client B
2448        let _project_b = Project::remote(
2449            project_id,
2450            client_b.clone(),
2451            client_b.user_store.clone(),
2452            lang_registry.clone(),
2453            fs.clone(),
2454            &mut cx_b.to_async(),
2455        )
2456        .await
2457        .unwrap();
2458
2459        // Client A sees that a guest has joined.
2460        project_a
2461            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2462            .await;
2463
2464        // Drop client B's connection and ensure client A observes client B leaving the project.
2465        client_b.disconnect(&cx_b.to_async()).unwrap();
2466        project_a
2467            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2468            .await;
2469
2470        // Rejoin the project as client B
2471        let _project_b = Project::remote(
2472            project_id,
2473            client_b.clone(),
2474            client_b.user_store.clone(),
2475            lang_registry.clone(),
2476            fs.clone(),
2477            &mut cx_b.to_async(),
2478        )
2479        .await
2480        .unwrap();
2481
2482        // Client A sees that a guest has re-joined.
2483        project_a
2484            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2485            .await;
2486
2487        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2488        client_b.wait_for_current_user(cx_b).await;
2489        server.disconnect_client(client_b.current_user_id(cx_b));
2490        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2491        project_a
2492            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2493            .await;
2494    }
2495
2496    #[gpui::test(iterations = 10)]
2497    async fn test_collaborating_with_diagnostics(
2498        cx_a: &mut TestAppContext,
2499        cx_b: &mut TestAppContext,
2500    ) {
2501        cx_a.foreground().forbid_parking();
2502        let lang_registry = Arc::new(LanguageRegistry::test());
2503        let fs = FakeFs::new(cx_a.background());
2504
2505        // Set up a fake language server.
2506        let mut language = Language::new(
2507            LanguageConfig {
2508                name: "Rust".into(),
2509                path_suffixes: vec!["rs".to_string()],
2510                ..Default::default()
2511            },
2512            Some(tree_sitter_rust::language()),
2513        );
2514        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2515        lang_registry.add(Arc::new(language));
2516
2517        // Connect to a server as 2 clients.
2518        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2519        let client_a = server.create_client(cx_a, "user_a").await;
2520        let client_b = server.create_client(cx_b, "user_b").await;
2521
2522        // Share a project as client A
2523        fs.insert_tree(
2524            "/a",
2525            json!({
2526                ".zed.toml": r#"collaborators = ["user_b"]"#,
2527                "a.rs": "let one = two",
2528                "other.rs": "",
2529            }),
2530        )
2531        .await;
2532        let project_a = cx_a.update(|cx| {
2533            Project::local(
2534                client_a.clone(),
2535                client_a.user_store.clone(),
2536                lang_registry.clone(),
2537                fs.clone(),
2538                cx,
2539            )
2540        });
2541        let (worktree_a, _) = project_a
2542            .update(cx_a, |p, cx| {
2543                p.find_or_create_local_worktree("/a", true, cx)
2544            })
2545            .await
2546            .unwrap();
2547        worktree_a
2548            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2549            .await;
2550        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2551        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2552        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2553
2554        // Cause the language server to start.
2555        let _ = cx_a
2556            .background()
2557            .spawn(project_a.update(cx_a, |project, cx| {
2558                project.open_buffer(
2559                    ProjectPath {
2560                        worktree_id,
2561                        path: Path::new("other.rs").into(),
2562                    },
2563                    cx,
2564                )
2565            }))
2566            .await
2567            .unwrap();
2568
2569        // Simulate a language server reporting errors for a file.
2570        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2571        fake_language_server
2572            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2573            .await;
2574        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2575            lsp::PublishDiagnosticsParams {
2576                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2577                version: None,
2578                diagnostics: vec![lsp::Diagnostic {
2579                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2580                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2581                    message: "message 1".to_string(),
2582                    ..Default::default()
2583                }],
2584            },
2585        );
2586
2587        // Wait for server to see the diagnostics update.
2588        server
2589            .condition(|store| {
2590                let worktree = store
2591                    .project(project_id)
2592                    .unwrap()
2593                    .share
2594                    .as_ref()
2595                    .unwrap()
2596                    .worktrees
2597                    .get(&worktree_id.to_proto())
2598                    .unwrap();
2599
2600                !worktree.diagnostic_summaries.is_empty()
2601            })
2602            .await;
2603
2604        // Join the worktree as client B.
2605        let project_b = Project::remote(
2606            project_id,
2607            client_b.clone(),
2608            client_b.user_store.clone(),
2609            lang_registry.clone(),
2610            fs.clone(),
2611            &mut cx_b.to_async(),
2612        )
2613        .await
2614        .unwrap();
2615
2616        project_b.read_with(cx_b, |project, cx| {
2617            assert_eq!(
2618                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2619                &[(
2620                    ProjectPath {
2621                        worktree_id,
2622                        path: Arc::from(Path::new("a.rs")),
2623                    },
2624                    DiagnosticSummary {
2625                        error_count: 1,
2626                        warning_count: 0,
2627                        ..Default::default()
2628                    },
2629                )]
2630            )
2631        });
2632
2633        // Simulate a language server reporting more errors for a file.
2634        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2635            lsp::PublishDiagnosticsParams {
2636                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2637                version: None,
2638                diagnostics: vec![
2639                    lsp::Diagnostic {
2640                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2641                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2642                        message: "message 1".to_string(),
2643                        ..Default::default()
2644                    },
2645                    lsp::Diagnostic {
2646                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2647                        range: lsp::Range::new(
2648                            lsp::Position::new(0, 10),
2649                            lsp::Position::new(0, 13),
2650                        ),
2651                        message: "message 2".to_string(),
2652                        ..Default::default()
2653                    },
2654                ],
2655            },
2656        );
2657
2658        // Client b gets the updated summaries
2659        project_b
2660            .condition(&cx_b, |project, cx| {
2661                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2662                    == &[(
2663                        ProjectPath {
2664                            worktree_id,
2665                            path: Arc::from(Path::new("a.rs")),
2666                        },
2667                        DiagnosticSummary {
2668                            error_count: 1,
2669                            warning_count: 1,
2670                            ..Default::default()
2671                        },
2672                    )]
2673            })
2674            .await;
2675
2676        // Open the file with the errors on client B. They should be present.
2677        let buffer_b = cx_b
2678            .background()
2679            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2680            .await
2681            .unwrap();
2682
2683        buffer_b.read_with(cx_b, |buffer, _| {
2684            assert_eq!(
2685                buffer
2686                    .snapshot()
2687                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2688                    .map(|entry| entry)
2689                    .collect::<Vec<_>>(),
2690                &[
2691                    DiagnosticEntry {
2692                        range: Point::new(0, 4)..Point::new(0, 7),
2693                        diagnostic: Diagnostic {
2694                            group_id: 0,
2695                            message: "message 1".to_string(),
2696                            severity: lsp::DiagnosticSeverity::ERROR,
2697                            is_primary: true,
2698                            ..Default::default()
2699                        }
2700                    },
2701                    DiagnosticEntry {
2702                        range: Point::new(0, 10)..Point::new(0, 13),
2703                        diagnostic: Diagnostic {
2704                            group_id: 1,
2705                            severity: lsp::DiagnosticSeverity::WARNING,
2706                            message: "message 2".to_string(),
2707                            is_primary: true,
2708                            ..Default::default()
2709                        }
2710                    }
2711                ]
2712            );
2713        });
2714
2715        // Simulate a language server reporting no errors for a file.
2716        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2717            lsp::PublishDiagnosticsParams {
2718                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2719                version: None,
2720                diagnostics: vec![],
2721            },
2722        );
2723        project_a
2724            .condition(cx_a, |project, cx| {
2725                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2726            })
2727            .await;
2728        project_b
2729            .condition(cx_b, |project, cx| {
2730                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2731            })
2732            .await;
2733    }
2734
2735    #[gpui::test(iterations = 10)]
2736    async fn test_collaborating_with_completion(
2737        cx_a: &mut TestAppContext,
2738        cx_b: &mut TestAppContext,
2739    ) {
2740        cx_a.foreground().forbid_parking();
2741        let lang_registry = Arc::new(LanguageRegistry::test());
2742        let fs = FakeFs::new(cx_a.background());
2743
2744        // Set up a fake language server.
2745        let mut language = Language::new(
2746            LanguageConfig {
2747                name: "Rust".into(),
2748                path_suffixes: vec!["rs".to_string()],
2749                ..Default::default()
2750            },
2751            Some(tree_sitter_rust::language()),
2752        );
2753        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2754            capabilities: lsp::ServerCapabilities {
2755                completion_provider: Some(lsp::CompletionOptions {
2756                    trigger_characters: Some(vec![".".to_string()]),
2757                    ..Default::default()
2758                }),
2759                ..Default::default()
2760            },
2761            ..Default::default()
2762        });
2763        lang_registry.add(Arc::new(language));
2764
2765        // Connect to a server as 2 clients.
2766        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2767        let client_a = server.create_client(cx_a, "user_a").await;
2768        let client_b = server.create_client(cx_b, "user_b").await;
2769
2770        // Share a project as client A
2771        fs.insert_tree(
2772            "/a",
2773            json!({
2774                ".zed.toml": r#"collaborators = ["user_b"]"#,
2775                "main.rs": "fn main() { a }",
2776                "other.rs": "",
2777            }),
2778        )
2779        .await;
2780        let project_a = cx_a.update(|cx| {
2781            Project::local(
2782                client_a.clone(),
2783                client_a.user_store.clone(),
2784                lang_registry.clone(),
2785                fs.clone(),
2786                cx,
2787            )
2788        });
2789        let (worktree_a, _) = project_a
2790            .update(cx_a, |p, cx| {
2791                p.find_or_create_local_worktree("/a", true, cx)
2792            })
2793            .await
2794            .unwrap();
2795        worktree_a
2796            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2797            .await;
2798        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2799        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2800        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2801
2802        // Join the worktree as client B.
2803        let project_b = Project::remote(
2804            project_id,
2805            client_b.clone(),
2806            client_b.user_store.clone(),
2807            lang_registry.clone(),
2808            fs.clone(),
2809            &mut cx_b.to_async(),
2810        )
2811        .await
2812        .unwrap();
2813
2814        // Open a file in an editor as the guest.
2815        let buffer_b = project_b
2816            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2817            .await
2818            .unwrap();
2819        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2820        let editor_b = cx_b.add_view(window_b, |cx| {
2821            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2822        });
2823
2824        let fake_language_server = fake_language_servers.next().await.unwrap();
2825        buffer_b
2826            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2827            .await;
2828
2829        // Type a completion trigger character as the guest.
2830        editor_b.update(cx_b, |editor, cx| {
2831            editor.select_ranges([13..13], None, cx);
2832            editor.handle_input(&Input(".".into()), cx);
2833            cx.focus(&editor_b);
2834        });
2835
2836        // Receive a completion request as the host's language server.
2837        // Return some completions from the host's language server.
2838        cx_a.foreground().start_waiting();
2839        fake_language_server
2840            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2841                assert_eq!(
2842                    params.text_document_position.text_document.uri,
2843                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2844                );
2845                assert_eq!(
2846                    params.text_document_position.position,
2847                    lsp::Position::new(0, 14),
2848                );
2849
2850                Ok(Some(lsp::CompletionResponse::Array(vec![
2851                    lsp::CompletionItem {
2852                        label: "first_method(…)".into(),
2853                        detail: Some("fn(&mut self, B) -> C".into()),
2854                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2855                            new_text: "first_method($1)".to_string(),
2856                            range: lsp::Range::new(
2857                                lsp::Position::new(0, 14),
2858                                lsp::Position::new(0, 14),
2859                            ),
2860                        })),
2861                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2862                        ..Default::default()
2863                    },
2864                    lsp::CompletionItem {
2865                        label: "second_method(…)".into(),
2866                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2867                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2868                            new_text: "second_method()".to_string(),
2869                            range: lsp::Range::new(
2870                                lsp::Position::new(0, 14),
2871                                lsp::Position::new(0, 14),
2872                            ),
2873                        })),
2874                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2875                        ..Default::default()
2876                    },
2877                ])))
2878            })
2879            .next()
2880            .await
2881            .unwrap();
2882        cx_a.foreground().finish_waiting();
2883
2884        // Open the buffer on the host.
2885        let buffer_a = project_a
2886            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2887            .await
2888            .unwrap();
2889        buffer_a
2890            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2891            .await;
2892
2893        // Confirm a completion on the guest.
2894        editor_b
2895            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2896            .await;
2897        editor_b.update(cx_b, |editor, cx| {
2898            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2899            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2900        });
2901
2902        // Return a resolved completion from the host's language server.
2903        // The resolved completion has an additional text edit.
2904        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
2905            |params, _| async move {
2906                assert_eq!(params.label, "first_method(…)");
2907                Ok(lsp::CompletionItem {
2908                    label: "first_method(…)".into(),
2909                    detail: Some("fn(&mut self, B) -> C".into()),
2910                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2911                        new_text: "first_method($1)".to_string(),
2912                        range: lsp::Range::new(
2913                            lsp::Position::new(0, 14),
2914                            lsp::Position::new(0, 14),
2915                        ),
2916                    })),
2917                    additional_text_edits: Some(vec![lsp::TextEdit {
2918                        new_text: "use d::SomeTrait;\n".to_string(),
2919                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2920                    }]),
2921                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2922                    ..Default::default()
2923                })
2924            },
2925        );
2926
2927        // The additional edit is applied.
2928        buffer_a
2929            .condition(&cx_a, |buffer, _| {
2930                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2931            })
2932            .await;
2933        buffer_b
2934            .condition(&cx_b, |buffer, _| {
2935                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2936            })
2937            .await;
2938    }
2939
2940    #[gpui::test(iterations = 10)]
2941    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2942        cx_a.foreground().forbid_parking();
2943        let lang_registry = Arc::new(LanguageRegistry::test());
2944        let fs = FakeFs::new(cx_a.background());
2945
2946        // Connect to a server as 2 clients.
2947        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2948        let client_a = server.create_client(cx_a, "user_a").await;
2949        let client_b = server.create_client(cx_b, "user_b").await;
2950
2951        // Share a project as client A
2952        fs.insert_tree(
2953            "/a",
2954            json!({
2955                ".zed.toml": r#"collaborators = ["user_b"]"#,
2956                "a.rs": "let one = 1;",
2957            }),
2958        )
2959        .await;
2960        let project_a = cx_a.update(|cx| {
2961            Project::local(
2962                client_a.clone(),
2963                client_a.user_store.clone(),
2964                lang_registry.clone(),
2965                fs.clone(),
2966                cx,
2967            )
2968        });
2969        let (worktree_a, _) = project_a
2970            .update(cx_a, |p, cx| {
2971                p.find_or_create_local_worktree("/a", true, cx)
2972            })
2973            .await
2974            .unwrap();
2975        worktree_a
2976            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2977            .await;
2978        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2979        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2980        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2981        let buffer_a = project_a
2982            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
2983            .await
2984            .unwrap();
2985
2986        // Join the worktree as client B.
2987        let project_b = Project::remote(
2988            project_id,
2989            client_b.clone(),
2990            client_b.user_store.clone(),
2991            lang_registry.clone(),
2992            fs.clone(),
2993            &mut cx_b.to_async(),
2994        )
2995        .await
2996        .unwrap();
2997
2998        let buffer_b = cx_b
2999            .background()
3000            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3001            .await
3002            .unwrap();
3003        buffer_b.update(cx_b, |buffer, cx| {
3004            buffer.edit([(4..7, "six")], cx);
3005            buffer.edit([(10..11, "6")], cx);
3006            assert_eq!(buffer.text(), "let six = 6;");
3007            assert!(buffer.is_dirty());
3008            assert!(!buffer.has_conflict());
3009        });
3010        buffer_a
3011            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3012            .await;
3013
3014        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3015            .await
3016            .unwrap();
3017        buffer_a
3018            .condition(cx_a, |buffer, _| buffer.has_conflict())
3019            .await;
3020        buffer_b
3021            .condition(cx_b, |buffer, _| buffer.has_conflict())
3022            .await;
3023
3024        project_b
3025            .update(cx_b, |project, cx| {
3026                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3027            })
3028            .await
3029            .unwrap();
3030        buffer_a.read_with(cx_a, |buffer, _| {
3031            assert_eq!(buffer.text(), "let seven = 7;");
3032            assert!(!buffer.is_dirty());
3033            assert!(!buffer.has_conflict());
3034        });
3035        buffer_b.read_with(cx_b, |buffer, _| {
3036            assert_eq!(buffer.text(), "let seven = 7;");
3037            assert!(!buffer.is_dirty());
3038            assert!(!buffer.has_conflict());
3039        });
3040
3041        buffer_a.update(cx_a, |buffer, cx| {
3042            // Undoing on the host is a no-op when the reload was initiated by the guest.
3043            buffer.undo(cx);
3044            assert_eq!(buffer.text(), "let seven = 7;");
3045            assert!(!buffer.is_dirty());
3046            assert!(!buffer.has_conflict());
3047        });
3048        buffer_b.update(cx_b, |buffer, cx| {
3049            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3050            buffer.undo(cx);
3051            assert_eq!(buffer.text(), "let six = 6;");
3052            assert!(buffer.is_dirty());
3053            assert!(!buffer.has_conflict());
3054        });
3055    }
3056
3057    #[gpui::test(iterations = 10)]
3058    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3059        cx_a.foreground().forbid_parking();
3060        let lang_registry = Arc::new(LanguageRegistry::test());
3061        let fs = FakeFs::new(cx_a.background());
3062
3063        // Set up a fake language server.
3064        let mut language = Language::new(
3065            LanguageConfig {
3066                name: "Rust".into(),
3067                path_suffixes: vec!["rs".to_string()],
3068                ..Default::default()
3069            },
3070            Some(tree_sitter_rust::language()),
3071        );
3072        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3073        lang_registry.add(Arc::new(language));
3074
3075        // Connect to a server as 2 clients.
3076        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3077        let client_a = server.create_client(cx_a, "user_a").await;
3078        let client_b = server.create_client(cx_b, "user_b").await;
3079
3080        // Share a project as client A
3081        fs.insert_tree(
3082            "/a",
3083            json!({
3084                ".zed.toml": r#"collaborators = ["user_b"]"#,
3085                "a.rs": "let one = two",
3086            }),
3087        )
3088        .await;
3089        let project_a = cx_a.update(|cx| {
3090            Project::local(
3091                client_a.clone(),
3092                client_a.user_store.clone(),
3093                lang_registry.clone(),
3094                fs.clone(),
3095                cx,
3096            )
3097        });
3098        let (worktree_a, _) = project_a
3099            .update(cx_a, |p, cx| {
3100                p.find_or_create_local_worktree("/a", true, cx)
3101            })
3102            .await
3103            .unwrap();
3104        worktree_a
3105            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3106            .await;
3107        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3108        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3109        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3110
3111        // Join the worktree as client B.
3112        let project_b = Project::remote(
3113            project_id,
3114            client_b.clone(),
3115            client_b.user_store.clone(),
3116            lang_registry.clone(),
3117            fs.clone(),
3118            &mut cx_b.to_async(),
3119        )
3120        .await
3121        .unwrap();
3122
3123        let buffer_b = cx_b
3124            .background()
3125            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3126            .await
3127            .unwrap();
3128
3129        let fake_language_server = fake_language_servers.next().await.unwrap();
3130        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3131            Ok(Some(vec![
3132                lsp::TextEdit {
3133                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3134                    new_text: "h".to_string(),
3135                },
3136                lsp::TextEdit {
3137                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3138                    new_text: "y".to_string(),
3139                },
3140            ]))
3141        });
3142
3143        project_b
3144            .update(cx_b, |project, cx| {
3145                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3146            })
3147            .await
3148            .unwrap();
3149        assert_eq!(
3150            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3151            "let honey = two"
3152        );
3153    }
3154
3155    #[gpui::test(iterations = 10)]
3156    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3157        cx_a.foreground().forbid_parking();
3158        let lang_registry = Arc::new(LanguageRegistry::test());
3159        let fs = FakeFs::new(cx_a.background());
3160        fs.insert_tree(
3161            "/root-1",
3162            json!({
3163                ".zed.toml": r#"collaborators = ["user_b"]"#,
3164                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3165            }),
3166        )
3167        .await;
3168        fs.insert_tree(
3169            "/root-2",
3170            json!({
3171                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3172            }),
3173        )
3174        .await;
3175
3176        // Set up a fake language server.
3177        let mut language = Language::new(
3178            LanguageConfig {
3179                name: "Rust".into(),
3180                path_suffixes: vec!["rs".to_string()],
3181                ..Default::default()
3182            },
3183            Some(tree_sitter_rust::language()),
3184        );
3185        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3186        lang_registry.add(Arc::new(language));
3187
3188        // Connect to a server as 2 clients.
3189        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3190        let client_a = server.create_client(cx_a, "user_a").await;
3191        let client_b = server.create_client(cx_b, "user_b").await;
3192
3193        // Share a project as client A
3194        let project_a = cx_a.update(|cx| {
3195            Project::local(
3196                client_a.clone(),
3197                client_a.user_store.clone(),
3198                lang_registry.clone(),
3199                fs.clone(),
3200                cx,
3201            )
3202        });
3203        let (worktree_a, _) = project_a
3204            .update(cx_a, |p, cx| {
3205                p.find_or_create_local_worktree("/root-1", true, cx)
3206            })
3207            .await
3208            .unwrap();
3209        worktree_a
3210            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3211            .await;
3212        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3213        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3214        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3215
3216        // Join the worktree as client B.
3217        let project_b = Project::remote(
3218            project_id,
3219            client_b.clone(),
3220            client_b.user_store.clone(),
3221            lang_registry.clone(),
3222            fs.clone(),
3223            &mut cx_b.to_async(),
3224        )
3225        .await
3226        .unwrap();
3227
3228        // Open the file on client B.
3229        let buffer_b = cx_b
3230            .background()
3231            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3232            .await
3233            .unwrap();
3234
3235        // Request the definition of a symbol as the guest.
3236        let fake_language_server = fake_language_servers.next().await.unwrap();
3237        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3238            |_, _| async move {
3239                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3240                    lsp::Location::new(
3241                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3242                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3243                    ),
3244                )))
3245            },
3246        );
3247
3248        let definitions_1 = project_b
3249            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3250            .await
3251            .unwrap();
3252        cx_b.read(|cx| {
3253            assert_eq!(definitions_1.len(), 1);
3254            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3255            let target_buffer = definitions_1[0].buffer.read(cx);
3256            assert_eq!(
3257                target_buffer.text(),
3258                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3259            );
3260            assert_eq!(
3261                definitions_1[0].range.to_point(target_buffer),
3262                Point::new(0, 6)..Point::new(0, 9)
3263            );
3264        });
3265
3266        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3267        // the previous call to `definition`.
3268        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3269            |_, _| async move {
3270                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3271                    lsp::Location::new(
3272                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3273                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3274                    ),
3275                )))
3276            },
3277        );
3278
3279        let definitions_2 = project_b
3280            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3281            .await
3282            .unwrap();
3283        cx_b.read(|cx| {
3284            assert_eq!(definitions_2.len(), 1);
3285            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3286            let target_buffer = definitions_2[0].buffer.read(cx);
3287            assert_eq!(
3288                target_buffer.text(),
3289                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3290            );
3291            assert_eq!(
3292                definitions_2[0].range.to_point(target_buffer),
3293                Point::new(1, 6)..Point::new(1, 11)
3294            );
3295        });
3296        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3297    }
3298
3299    #[gpui::test(iterations = 10)]
3300    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3301        cx_a.foreground().forbid_parking();
3302        let lang_registry = Arc::new(LanguageRegistry::test());
3303        let fs = FakeFs::new(cx_a.background());
3304        fs.insert_tree(
3305            "/root-1",
3306            json!({
3307                ".zed.toml": r#"collaborators = ["user_b"]"#,
3308                "one.rs": "const ONE: usize = 1;",
3309                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3310            }),
3311        )
3312        .await;
3313        fs.insert_tree(
3314            "/root-2",
3315            json!({
3316                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3317            }),
3318        )
3319        .await;
3320
3321        // Set up a fake language server.
3322        let mut language = Language::new(
3323            LanguageConfig {
3324                name: "Rust".into(),
3325                path_suffixes: vec!["rs".to_string()],
3326                ..Default::default()
3327            },
3328            Some(tree_sitter_rust::language()),
3329        );
3330        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3331        lang_registry.add(Arc::new(language));
3332
3333        // Connect to a server as 2 clients.
3334        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3335        let client_a = server.create_client(cx_a, "user_a").await;
3336        let client_b = server.create_client(cx_b, "user_b").await;
3337
3338        // Share a project as client A
3339        let project_a = cx_a.update(|cx| {
3340            Project::local(
3341                client_a.clone(),
3342                client_a.user_store.clone(),
3343                lang_registry.clone(),
3344                fs.clone(),
3345                cx,
3346            )
3347        });
3348        let (worktree_a, _) = project_a
3349            .update(cx_a, |p, cx| {
3350                p.find_or_create_local_worktree("/root-1", true, cx)
3351            })
3352            .await
3353            .unwrap();
3354        worktree_a
3355            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3356            .await;
3357        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3358        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3359        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3360
3361        // Join the worktree as client B.
3362        let project_b = Project::remote(
3363            project_id,
3364            client_b.clone(),
3365            client_b.user_store.clone(),
3366            lang_registry.clone(),
3367            fs.clone(),
3368            &mut cx_b.to_async(),
3369        )
3370        .await
3371        .unwrap();
3372
3373        // Open the file on client B.
3374        let buffer_b = cx_b
3375            .background()
3376            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3377            .await
3378            .unwrap();
3379
3380        // Request references to a symbol as the guest.
3381        let fake_language_server = fake_language_servers.next().await.unwrap();
3382        fake_language_server.handle_request::<lsp::request::References, _, _>(
3383            |params, _| async move {
3384                assert_eq!(
3385                    params.text_document_position.text_document.uri.as_str(),
3386                    "file:///root-1/one.rs"
3387                );
3388                Ok(Some(vec![
3389                    lsp::Location {
3390                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3391                        range: lsp::Range::new(
3392                            lsp::Position::new(0, 24),
3393                            lsp::Position::new(0, 27),
3394                        ),
3395                    },
3396                    lsp::Location {
3397                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3398                        range: lsp::Range::new(
3399                            lsp::Position::new(0, 35),
3400                            lsp::Position::new(0, 38),
3401                        ),
3402                    },
3403                    lsp::Location {
3404                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3405                        range: lsp::Range::new(
3406                            lsp::Position::new(0, 37),
3407                            lsp::Position::new(0, 40),
3408                        ),
3409                    },
3410                ]))
3411            },
3412        );
3413
3414        let references = project_b
3415            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3416            .await
3417            .unwrap();
3418        cx_b.read(|cx| {
3419            assert_eq!(references.len(), 3);
3420            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3421
3422            let two_buffer = references[0].buffer.read(cx);
3423            let three_buffer = references[2].buffer.read(cx);
3424            assert_eq!(
3425                two_buffer.file().unwrap().path().as_ref(),
3426                Path::new("two.rs")
3427            );
3428            assert_eq!(references[1].buffer, references[0].buffer);
3429            assert_eq!(
3430                three_buffer.file().unwrap().full_path(cx),
3431                Path::new("three.rs")
3432            );
3433
3434            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3435            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3436            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3437        });
3438    }
3439
3440    #[gpui::test(iterations = 10)]
3441    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3442        cx_a.foreground().forbid_parking();
3443        let lang_registry = Arc::new(LanguageRegistry::test());
3444        let fs = FakeFs::new(cx_a.background());
3445        fs.insert_tree(
3446            "/root-1",
3447            json!({
3448                ".zed.toml": r#"collaborators = ["user_b"]"#,
3449                "a": "hello world",
3450                "b": "goodnight moon",
3451                "c": "a world of goo",
3452                "d": "world champion of clown world",
3453            }),
3454        )
3455        .await;
3456        fs.insert_tree(
3457            "/root-2",
3458            json!({
3459                "e": "disney world is fun",
3460            }),
3461        )
3462        .await;
3463
3464        // Connect to a server as 2 clients.
3465        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3466        let client_a = server.create_client(cx_a, "user_a").await;
3467        let client_b = server.create_client(cx_b, "user_b").await;
3468
3469        // Share a project as client A
3470        let project_a = cx_a.update(|cx| {
3471            Project::local(
3472                client_a.clone(),
3473                client_a.user_store.clone(),
3474                lang_registry.clone(),
3475                fs.clone(),
3476                cx,
3477            )
3478        });
3479        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3480
3481        let (worktree_1, _) = project_a
3482            .update(cx_a, |p, cx| {
3483                p.find_or_create_local_worktree("/root-1", true, cx)
3484            })
3485            .await
3486            .unwrap();
3487        worktree_1
3488            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3489            .await;
3490        let (worktree_2, _) = project_a
3491            .update(cx_a, |p, cx| {
3492                p.find_or_create_local_worktree("/root-2", true, cx)
3493            })
3494            .await
3495            .unwrap();
3496        worktree_2
3497            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3498            .await;
3499
3500        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3501
3502        // Join the worktree as client B.
3503        let project_b = Project::remote(
3504            project_id,
3505            client_b.clone(),
3506            client_b.user_store.clone(),
3507            lang_registry.clone(),
3508            fs.clone(),
3509            &mut cx_b.to_async(),
3510        )
3511        .await
3512        .unwrap();
3513
3514        let results = project_b
3515            .update(cx_b, |project, cx| {
3516                project.search(SearchQuery::text("world", false, false), cx)
3517            })
3518            .await
3519            .unwrap();
3520
3521        let mut ranges_by_path = results
3522            .into_iter()
3523            .map(|(buffer, ranges)| {
3524                buffer.read_with(cx_b, |buffer, cx| {
3525                    let path = buffer.file().unwrap().full_path(cx);
3526                    let offset_ranges = ranges
3527                        .into_iter()
3528                        .map(|range| range.to_offset(buffer))
3529                        .collect::<Vec<_>>();
3530                    (path, offset_ranges)
3531                })
3532            })
3533            .collect::<Vec<_>>();
3534        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3535
3536        assert_eq!(
3537            ranges_by_path,
3538            &[
3539                (PathBuf::from("root-1/a"), vec![6..11]),
3540                (PathBuf::from("root-1/c"), vec![2..7]),
3541                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3542                (PathBuf::from("root-2/e"), vec![7..12]),
3543            ]
3544        );
3545    }
3546
3547    #[gpui::test(iterations = 10)]
3548    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3549        cx_a.foreground().forbid_parking();
3550        let lang_registry = Arc::new(LanguageRegistry::test());
3551        let fs = FakeFs::new(cx_a.background());
3552        fs.insert_tree(
3553            "/root-1",
3554            json!({
3555                ".zed.toml": r#"collaborators = ["user_b"]"#,
3556                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3557            }),
3558        )
3559        .await;
3560
3561        // Set up a fake language server.
3562        let mut language = Language::new(
3563            LanguageConfig {
3564                name: "Rust".into(),
3565                path_suffixes: vec!["rs".to_string()],
3566                ..Default::default()
3567            },
3568            Some(tree_sitter_rust::language()),
3569        );
3570        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3571        lang_registry.add(Arc::new(language));
3572
3573        // Connect to a server as 2 clients.
3574        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3575        let client_a = server.create_client(cx_a, "user_a").await;
3576        let client_b = server.create_client(cx_b, "user_b").await;
3577
3578        // Share a project as client A
3579        let project_a = cx_a.update(|cx| {
3580            Project::local(
3581                client_a.clone(),
3582                client_a.user_store.clone(),
3583                lang_registry.clone(),
3584                fs.clone(),
3585                cx,
3586            )
3587        });
3588        let (worktree_a, _) = project_a
3589            .update(cx_a, |p, cx| {
3590                p.find_or_create_local_worktree("/root-1", true, cx)
3591            })
3592            .await
3593            .unwrap();
3594        worktree_a
3595            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3596            .await;
3597        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3598        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3599        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3600
3601        // Join the worktree as client B.
3602        let project_b = Project::remote(
3603            project_id,
3604            client_b.clone(),
3605            client_b.user_store.clone(),
3606            lang_registry.clone(),
3607            fs.clone(),
3608            &mut cx_b.to_async(),
3609        )
3610        .await
3611        .unwrap();
3612
3613        // Open the file on client B.
3614        let buffer_b = cx_b
3615            .background()
3616            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3617            .await
3618            .unwrap();
3619
3620        // Request document highlights as the guest.
3621        let fake_language_server = fake_language_servers.next().await.unwrap();
3622        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3623            |params, _| async move {
3624                assert_eq!(
3625                    params
3626                        .text_document_position_params
3627                        .text_document
3628                        .uri
3629                        .as_str(),
3630                    "file:///root-1/main.rs"
3631                );
3632                assert_eq!(
3633                    params.text_document_position_params.position,
3634                    lsp::Position::new(0, 34)
3635                );
3636                Ok(Some(vec![
3637                    lsp::DocumentHighlight {
3638                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3639                        range: lsp::Range::new(
3640                            lsp::Position::new(0, 10),
3641                            lsp::Position::new(0, 16),
3642                        ),
3643                    },
3644                    lsp::DocumentHighlight {
3645                        kind: Some(lsp::DocumentHighlightKind::READ),
3646                        range: lsp::Range::new(
3647                            lsp::Position::new(0, 32),
3648                            lsp::Position::new(0, 38),
3649                        ),
3650                    },
3651                    lsp::DocumentHighlight {
3652                        kind: Some(lsp::DocumentHighlightKind::READ),
3653                        range: lsp::Range::new(
3654                            lsp::Position::new(0, 41),
3655                            lsp::Position::new(0, 47),
3656                        ),
3657                    },
3658                ]))
3659            },
3660        );
3661
3662        let highlights = project_b
3663            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3664            .await
3665            .unwrap();
3666        buffer_b.read_with(cx_b, |buffer, _| {
3667            let snapshot = buffer.snapshot();
3668
3669            let highlights = highlights
3670                .into_iter()
3671                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3672                .collect::<Vec<_>>();
3673            assert_eq!(
3674                highlights,
3675                &[
3676                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3677                    (lsp::DocumentHighlightKind::READ, 32..38),
3678                    (lsp::DocumentHighlightKind::READ, 41..47)
3679                ]
3680            )
3681        });
3682    }
3683
3684    #[gpui::test(iterations = 10)]
3685    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3686        cx_a.foreground().forbid_parking();
3687        let lang_registry = Arc::new(LanguageRegistry::test());
3688        let fs = FakeFs::new(cx_a.background());
3689        fs.insert_tree(
3690            "/code",
3691            json!({
3692                "crate-1": {
3693                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3694                    "one.rs": "const ONE: usize = 1;",
3695                },
3696                "crate-2": {
3697                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3698                },
3699                "private": {
3700                    "passwords.txt": "the-password",
3701                }
3702            }),
3703        )
3704        .await;
3705
3706        // Set up a fake language server.
3707        let mut language = Language::new(
3708            LanguageConfig {
3709                name: "Rust".into(),
3710                path_suffixes: vec!["rs".to_string()],
3711                ..Default::default()
3712            },
3713            Some(tree_sitter_rust::language()),
3714        );
3715        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3716        lang_registry.add(Arc::new(language));
3717
3718        // Connect to a server as 2 clients.
3719        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3720        let client_a = server.create_client(cx_a, "user_a").await;
3721        let client_b = server.create_client(cx_b, "user_b").await;
3722
3723        // Share a project as client A
3724        let project_a = cx_a.update(|cx| {
3725            Project::local(
3726                client_a.clone(),
3727                client_a.user_store.clone(),
3728                lang_registry.clone(),
3729                fs.clone(),
3730                cx,
3731            )
3732        });
3733        let (worktree_a, _) = project_a
3734            .update(cx_a, |p, cx| {
3735                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3736            })
3737            .await
3738            .unwrap();
3739        worktree_a
3740            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3741            .await;
3742        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3743        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3744        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3745
3746        // Join the worktree as client B.
3747        let project_b = Project::remote(
3748            project_id,
3749            client_b.clone(),
3750            client_b.user_store.clone(),
3751            lang_registry.clone(),
3752            fs.clone(),
3753            &mut cx_b.to_async(),
3754        )
3755        .await
3756        .unwrap();
3757
3758        // Cause the language server to start.
3759        let _buffer = cx_b
3760            .background()
3761            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3762            .await
3763            .unwrap();
3764
3765        let fake_language_server = fake_language_servers.next().await.unwrap();
3766        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3767            |_, _| async move {
3768                #[allow(deprecated)]
3769                Ok(Some(vec![lsp::SymbolInformation {
3770                    name: "TWO".into(),
3771                    location: lsp::Location {
3772                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3773                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3774                    },
3775                    kind: lsp::SymbolKind::CONSTANT,
3776                    tags: None,
3777                    container_name: None,
3778                    deprecated: None,
3779                }]))
3780            },
3781        );
3782
3783        // Request the definition of a symbol as the guest.
3784        let symbols = project_b
3785            .update(cx_b, |p, cx| p.symbols("two", cx))
3786            .await
3787            .unwrap();
3788        assert_eq!(symbols.len(), 1);
3789        assert_eq!(symbols[0].name, "TWO");
3790
3791        // Open one of the returned symbols.
3792        let buffer_b_2 = project_b
3793            .update(cx_b, |project, cx| {
3794                project.open_buffer_for_symbol(&symbols[0], cx)
3795            })
3796            .await
3797            .unwrap();
3798        buffer_b_2.read_with(cx_b, |buffer, _| {
3799            assert_eq!(
3800                buffer.file().unwrap().path().as_ref(),
3801                Path::new("../crate-2/two.rs")
3802            );
3803        });
3804
3805        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3806        let mut fake_symbol = symbols[0].clone();
3807        fake_symbol.path = Path::new("/code/secrets").into();
3808        let error = project_b
3809            .update(cx_b, |project, cx| {
3810                project.open_buffer_for_symbol(&fake_symbol, cx)
3811            })
3812            .await
3813            .unwrap_err();
3814        assert!(error.to_string().contains("invalid symbol signature"));
3815    }
3816
3817    #[gpui::test(iterations = 10)]
3818    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3819        cx_a: &mut TestAppContext,
3820        cx_b: &mut TestAppContext,
3821        mut rng: StdRng,
3822    ) {
3823        cx_a.foreground().forbid_parking();
3824        let lang_registry = Arc::new(LanguageRegistry::test());
3825        let fs = FakeFs::new(cx_a.background());
3826        fs.insert_tree(
3827            "/root",
3828            json!({
3829                ".zed.toml": r#"collaborators = ["user_b"]"#,
3830                "a.rs": "const ONE: usize = b::TWO;",
3831                "b.rs": "const TWO: usize = 2",
3832            }),
3833        )
3834        .await;
3835
3836        // Set up a fake language server.
3837        let mut language = Language::new(
3838            LanguageConfig {
3839                name: "Rust".into(),
3840                path_suffixes: vec!["rs".to_string()],
3841                ..Default::default()
3842            },
3843            Some(tree_sitter_rust::language()),
3844        );
3845        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3846        lang_registry.add(Arc::new(language));
3847
3848        // Connect to a server as 2 clients.
3849        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3850        let client_a = server.create_client(cx_a, "user_a").await;
3851        let client_b = server.create_client(cx_b, "user_b").await;
3852
3853        // Share a project as client A
3854        let project_a = cx_a.update(|cx| {
3855            Project::local(
3856                client_a.clone(),
3857                client_a.user_store.clone(),
3858                lang_registry.clone(),
3859                fs.clone(),
3860                cx,
3861            )
3862        });
3863
3864        let (worktree_a, _) = project_a
3865            .update(cx_a, |p, cx| {
3866                p.find_or_create_local_worktree("/root", true, cx)
3867            })
3868            .await
3869            .unwrap();
3870        worktree_a
3871            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3872            .await;
3873        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3874        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3875        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3876
3877        // Join the worktree as client B.
3878        let project_b = Project::remote(
3879            project_id,
3880            client_b.clone(),
3881            client_b.user_store.clone(),
3882            lang_registry.clone(),
3883            fs.clone(),
3884            &mut cx_b.to_async(),
3885        )
3886        .await
3887        .unwrap();
3888
3889        let buffer_b1 = cx_b
3890            .background()
3891            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3892            .await
3893            .unwrap();
3894
3895        let fake_language_server = fake_language_servers.next().await.unwrap();
3896        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3897            |_, _| async move {
3898                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3899                    lsp::Location::new(
3900                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
3901                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3902                    ),
3903                )))
3904            },
3905        );
3906
3907        let definitions;
3908        let buffer_b2;
3909        if rng.gen() {
3910            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3911            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3912        } else {
3913            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3914            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3915        }
3916
3917        let buffer_b2 = buffer_b2.await.unwrap();
3918        let definitions = definitions.await.unwrap();
3919        assert_eq!(definitions.len(), 1);
3920        assert_eq!(definitions[0].buffer, buffer_b2);
3921    }
3922
3923    #[gpui::test(iterations = 10)]
3924    async fn test_collaborating_with_code_actions(
3925        cx_a: &mut TestAppContext,
3926        cx_b: &mut TestAppContext,
3927    ) {
3928        cx_a.foreground().forbid_parking();
3929        let lang_registry = Arc::new(LanguageRegistry::test());
3930        let fs = FakeFs::new(cx_a.background());
3931        cx_b.update(|cx| editor::init(cx));
3932
3933        // Set up a fake language server.
3934        let mut language = Language::new(
3935            LanguageConfig {
3936                name: "Rust".into(),
3937                path_suffixes: vec!["rs".to_string()],
3938                ..Default::default()
3939            },
3940            Some(tree_sitter_rust::language()),
3941        );
3942        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3943        lang_registry.add(Arc::new(language));
3944
3945        // Connect to a server as 2 clients.
3946        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3947        let client_a = server.create_client(cx_a, "user_a").await;
3948        let client_b = server.create_client(cx_b, "user_b").await;
3949
3950        // Share a project as client A
3951        fs.insert_tree(
3952            "/a",
3953            json!({
3954                ".zed.toml": r#"collaborators = ["user_b"]"#,
3955                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3956                "other.rs": "pub fn foo() -> usize { 4 }",
3957            }),
3958        )
3959        .await;
3960        let project_a = cx_a.update(|cx| {
3961            Project::local(
3962                client_a.clone(),
3963                client_a.user_store.clone(),
3964                lang_registry.clone(),
3965                fs.clone(),
3966                cx,
3967            )
3968        });
3969        let (worktree_a, _) = project_a
3970            .update(cx_a, |p, cx| {
3971                p.find_or_create_local_worktree("/a", true, cx)
3972            })
3973            .await
3974            .unwrap();
3975        worktree_a
3976            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3977            .await;
3978        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3979        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3980        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3981
3982        // Join the worktree as client B.
3983        let project_b = Project::remote(
3984            project_id,
3985            client_b.clone(),
3986            client_b.user_store.clone(),
3987            lang_registry.clone(),
3988            fs.clone(),
3989            &mut cx_b.to_async(),
3990        )
3991        .await
3992        .unwrap();
3993        let mut params = cx_b.update(WorkspaceParams::test);
3994        params.languages = lang_registry.clone();
3995        params.client = client_b.client.clone();
3996        params.user_store = client_b.user_store.clone();
3997        params.project = project_b;
3998
3999        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4000        let editor_b = workspace_b
4001            .update(cx_b, |workspace, cx| {
4002                workspace.open_path((worktree_id, "main.rs"), true, cx)
4003            })
4004            .await
4005            .unwrap()
4006            .downcast::<Editor>()
4007            .unwrap();
4008
4009        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4010        fake_language_server
4011            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4012                assert_eq!(
4013                    params.text_document.uri,
4014                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4015                );
4016                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4017                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4018                Ok(None)
4019            })
4020            .next()
4021            .await;
4022
4023        // Move cursor to a location that contains code actions.
4024        editor_b.update(cx_b, |editor, cx| {
4025            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4026            cx.focus(&editor_b);
4027        });
4028
4029        fake_language_server
4030            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4031                assert_eq!(
4032                    params.text_document.uri,
4033                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4034                );
4035                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4036                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4037
4038                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4039                    lsp::CodeAction {
4040                        title: "Inline into all callers".to_string(),
4041                        edit: Some(lsp::WorkspaceEdit {
4042                            changes: Some(
4043                                [
4044                                    (
4045                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4046                                        vec![lsp::TextEdit::new(
4047                                            lsp::Range::new(
4048                                                lsp::Position::new(1, 22),
4049                                                lsp::Position::new(1, 34),
4050                                            ),
4051                                            "4".to_string(),
4052                                        )],
4053                                    ),
4054                                    (
4055                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4056                                        vec![lsp::TextEdit::new(
4057                                            lsp::Range::new(
4058                                                lsp::Position::new(0, 0),
4059                                                lsp::Position::new(0, 27),
4060                                            ),
4061                                            "".to_string(),
4062                                        )],
4063                                    ),
4064                                ]
4065                                .into_iter()
4066                                .collect(),
4067                            ),
4068                            ..Default::default()
4069                        }),
4070                        data: Some(json!({
4071                            "codeActionParams": {
4072                                "range": {
4073                                    "start": {"line": 1, "column": 31},
4074                                    "end": {"line": 1, "column": 31},
4075                                }
4076                            }
4077                        })),
4078                        ..Default::default()
4079                    },
4080                )]))
4081            })
4082            .next()
4083            .await;
4084
4085        // Toggle code actions and wait for them to display.
4086        editor_b.update(cx_b, |editor, cx| {
4087            editor.toggle_code_actions(
4088                &ToggleCodeActions {
4089                    deployed_from_indicator: false,
4090                },
4091                cx,
4092            );
4093        });
4094        editor_b
4095            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4096            .await;
4097
4098        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4099
4100        // Confirming the code action will trigger a resolve request.
4101        let confirm_action = workspace_b
4102            .update(cx_b, |workspace, cx| {
4103                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4104            })
4105            .unwrap();
4106        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4107            |_, _| async move {
4108                Ok(lsp::CodeAction {
4109                    title: "Inline into all callers".to_string(),
4110                    edit: Some(lsp::WorkspaceEdit {
4111                        changes: Some(
4112                            [
4113                                (
4114                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4115                                    vec![lsp::TextEdit::new(
4116                                        lsp::Range::new(
4117                                            lsp::Position::new(1, 22),
4118                                            lsp::Position::new(1, 34),
4119                                        ),
4120                                        "4".to_string(),
4121                                    )],
4122                                ),
4123                                (
4124                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4125                                    vec![lsp::TextEdit::new(
4126                                        lsp::Range::new(
4127                                            lsp::Position::new(0, 0),
4128                                            lsp::Position::new(0, 27),
4129                                        ),
4130                                        "".to_string(),
4131                                    )],
4132                                ),
4133                            ]
4134                            .into_iter()
4135                            .collect(),
4136                        ),
4137                        ..Default::default()
4138                    }),
4139                    ..Default::default()
4140                })
4141            },
4142        );
4143
4144        // After the action is confirmed, an editor containing both modified files is opened.
4145        confirm_action.await.unwrap();
4146        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4147            workspace
4148                .active_item(cx)
4149                .unwrap()
4150                .downcast::<Editor>()
4151                .unwrap()
4152        });
4153        code_action_editor.update(cx_b, |editor, cx| {
4154            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4155            editor.undo(&Undo, cx);
4156            assert_eq!(
4157                editor.text(cx),
4158                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4159            );
4160            editor.redo(&Redo, cx);
4161            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4162        });
4163    }
4164
4165    #[gpui::test(iterations = 10)]
4166    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4167        cx_a.foreground().forbid_parking();
4168        let lang_registry = Arc::new(LanguageRegistry::test());
4169        let fs = FakeFs::new(cx_a.background());
4170        cx_b.update(|cx| editor::init(cx));
4171
4172        // Set up a fake language server.
4173        let mut language = Language::new(
4174            LanguageConfig {
4175                name: "Rust".into(),
4176                path_suffixes: vec!["rs".to_string()],
4177                ..Default::default()
4178            },
4179            Some(tree_sitter_rust::language()),
4180        );
4181        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4182            capabilities: lsp::ServerCapabilities {
4183                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4184                    prepare_provider: Some(true),
4185                    work_done_progress_options: Default::default(),
4186                })),
4187                ..Default::default()
4188            },
4189            ..Default::default()
4190        });
4191        lang_registry.add(Arc::new(language));
4192
4193        // Connect to a server as 2 clients.
4194        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4195        let client_a = server.create_client(cx_a, "user_a").await;
4196        let client_b = server.create_client(cx_b, "user_b").await;
4197
4198        // Share a project as client A
4199        fs.insert_tree(
4200            "/dir",
4201            json!({
4202                ".zed.toml": r#"collaborators = ["user_b"]"#,
4203                "one.rs": "const ONE: usize = 1;",
4204                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4205            }),
4206        )
4207        .await;
4208        let project_a = cx_a.update(|cx| {
4209            Project::local(
4210                client_a.clone(),
4211                client_a.user_store.clone(),
4212                lang_registry.clone(),
4213                fs.clone(),
4214                cx,
4215            )
4216        });
4217        let (worktree_a, _) = project_a
4218            .update(cx_a, |p, cx| {
4219                p.find_or_create_local_worktree("/dir", true, cx)
4220            })
4221            .await
4222            .unwrap();
4223        worktree_a
4224            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4225            .await;
4226        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4227        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4228        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4229
4230        // Join the worktree as client B.
4231        let project_b = Project::remote(
4232            project_id,
4233            client_b.clone(),
4234            client_b.user_store.clone(),
4235            lang_registry.clone(),
4236            fs.clone(),
4237            &mut cx_b.to_async(),
4238        )
4239        .await
4240        .unwrap();
4241        let mut params = cx_b.update(WorkspaceParams::test);
4242        params.languages = lang_registry.clone();
4243        params.client = client_b.client.clone();
4244        params.user_store = client_b.user_store.clone();
4245        params.project = project_b;
4246
4247        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4248        let editor_b = workspace_b
4249            .update(cx_b, |workspace, cx| {
4250                workspace.open_path((worktree_id, "one.rs"), true, cx)
4251            })
4252            .await
4253            .unwrap()
4254            .downcast::<Editor>()
4255            .unwrap();
4256        let fake_language_server = fake_language_servers.next().await.unwrap();
4257
4258        // Move cursor to a location that can be renamed.
4259        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4260            editor.select_ranges([7..7], None, cx);
4261            editor.rename(&Rename, cx).unwrap()
4262        });
4263
4264        fake_language_server
4265            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4266                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4267                assert_eq!(params.position, lsp::Position::new(0, 7));
4268                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4269                    lsp::Position::new(0, 6),
4270                    lsp::Position::new(0, 9),
4271                ))))
4272            })
4273            .next()
4274            .await
4275            .unwrap();
4276        prepare_rename.await.unwrap();
4277        editor_b.update(cx_b, |editor, cx| {
4278            let rename = editor.pending_rename().unwrap();
4279            let buffer = editor.buffer().read(cx).snapshot(cx);
4280            assert_eq!(
4281                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4282                6..9
4283            );
4284            rename.editor.update(cx, |rename_editor, cx| {
4285                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4286                    rename_buffer.edit([(0..3, "THREE")], cx);
4287                });
4288            });
4289        });
4290
4291        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4292            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4293        });
4294        fake_language_server
4295            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4296                assert_eq!(
4297                    params.text_document_position.text_document.uri.as_str(),
4298                    "file:///dir/one.rs"
4299                );
4300                assert_eq!(
4301                    params.text_document_position.position,
4302                    lsp::Position::new(0, 6)
4303                );
4304                assert_eq!(params.new_name, "THREE");
4305                Ok(Some(lsp::WorkspaceEdit {
4306                    changes: Some(
4307                        [
4308                            (
4309                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4310                                vec![lsp::TextEdit::new(
4311                                    lsp::Range::new(
4312                                        lsp::Position::new(0, 6),
4313                                        lsp::Position::new(0, 9),
4314                                    ),
4315                                    "THREE".to_string(),
4316                                )],
4317                            ),
4318                            (
4319                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4320                                vec![
4321                                    lsp::TextEdit::new(
4322                                        lsp::Range::new(
4323                                            lsp::Position::new(0, 24),
4324                                            lsp::Position::new(0, 27),
4325                                        ),
4326                                        "THREE".to_string(),
4327                                    ),
4328                                    lsp::TextEdit::new(
4329                                        lsp::Range::new(
4330                                            lsp::Position::new(0, 35),
4331                                            lsp::Position::new(0, 38),
4332                                        ),
4333                                        "THREE".to_string(),
4334                                    ),
4335                                ],
4336                            ),
4337                        ]
4338                        .into_iter()
4339                        .collect(),
4340                    ),
4341                    ..Default::default()
4342                }))
4343            })
4344            .next()
4345            .await
4346            .unwrap();
4347        confirm_rename.await.unwrap();
4348
4349        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4350            workspace
4351                .active_item(cx)
4352                .unwrap()
4353                .downcast::<Editor>()
4354                .unwrap()
4355        });
4356        rename_editor.update(cx_b, |editor, cx| {
4357            assert_eq!(
4358                editor.text(cx),
4359                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4360            );
4361            editor.undo(&Undo, cx);
4362            assert_eq!(
4363                editor.text(cx),
4364                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4365            );
4366            editor.redo(&Redo, cx);
4367            assert_eq!(
4368                editor.text(cx),
4369                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4370            );
4371        });
4372
4373        // Ensure temporary rename edits cannot be undone/redone.
4374        editor_b.update(cx_b, |editor, cx| {
4375            editor.undo(&Undo, cx);
4376            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4377            editor.undo(&Undo, cx);
4378            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4379            editor.redo(&Redo, cx);
4380            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4381        })
4382    }
4383
4384    #[gpui::test(iterations = 10)]
4385    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4386        cx_a.foreground().forbid_parking();
4387
4388        // Connect to a server as 2 clients.
4389        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4390        let client_a = server.create_client(cx_a, "user_a").await;
4391        let client_b = server.create_client(cx_b, "user_b").await;
4392
4393        // Create an org that includes these 2 users.
4394        let db = &server.app_state.db;
4395        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4396        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4397            .await
4398            .unwrap();
4399        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4400            .await
4401            .unwrap();
4402
4403        // Create a channel that includes all the users.
4404        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4405        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4406            .await
4407            .unwrap();
4408        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4409            .await
4410            .unwrap();
4411        db.create_channel_message(
4412            channel_id,
4413            client_b.current_user_id(&cx_b),
4414            "hello A, it's B.",
4415            OffsetDateTime::now_utc(),
4416            1,
4417        )
4418        .await
4419        .unwrap();
4420
4421        let channels_a = cx_a
4422            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4423        channels_a
4424            .condition(cx_a, |list, _| list.available_channels().is_some())
4425            .await;
4426        channels_a.read_with(cx_a, |list, _| {
4427            assert_eq!(
4428                list.available_channels().unwrap(),
4429                &[ChannelDetails {
4430                    id: channel_id.to_proto(),
4431                    name: "test-channel".to_string()
4432                }]
4433            )
4434        });
4435        let channel_a = channels_a.update(cx_a, |this, cx| {
4436            this.get_channel(channel_id.to_proto(), cx).unwrap()
4437        });
4438        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4439        channel_a
4440            .condition(&cx_a, |channel, _| {
4441                channel_messages(channel)
4442                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4443            })
4444            .await;
4445
4446        let channels_b = cx_b
4447            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4448        channels_b
4449            .condition(cx_b, |list, _| list.available_channels().is_some())
4450            .await;
4451        channels_b.read_with(cx_b, |list, _| {
4452            assert_eq!(
4453                list.available_channels().unwrap(),
4454                &[ChannelDetails {
4455                    id: channel_id.to_proto(),
4456                    name: "test-channel".to_string()
4457                }]
4458            )
4459        });
4460
4461        let channel_b = channels_b.update(cx_b, |this, cx| {
4462            this.get_channel(channel_id.to_proto(), cx).unwrap()
4463        });
4464        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4465        channel_b
4466            .condition(&cx_b, |channel, _| {
4467                channel_messages(channel)
4468                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4469            })
4470            .await;
4471
4472        channel_a
4473            .update(cx_a, |channel, cx| {
4474                channel
4475                    .send_message("oh, hi B.".to_string(), cx)
4476                    .unwrap()
4477                    .detach();
4478                let task = channel.send_message("sup".to_string(), cx).unwrap();
4479                assert_eq!(
4480                    channel_messages(channel),
4481                    &[
4482                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4483                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4484                        ("user_a".to_string(), "sup".to_string(), true)
4485                    ]
4486                );
4487                task
4488            })
4489            .await
4490            .unwrap();
4491
4492        channel_b
4493            .condition(&cx_b, |channel, _| {
4494                channel_messages(channel)
4495                    == [
4496                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4497                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4498                        ("user_a".to_string(), "sup".to_string(), false),
4499                    ]
4500            })
4501            .await;
4502
4503        assert_eq!(
4504            server
4505                .state()
4506                .await
4507                .channel(channel_id)
4508                .unwrap()
4509                .connection_ids
4510                .len(),
4511            2
4512        );
4513        cx_b.update(|_| drop(channel_b));
4514        server
4515            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4516            .await;
4517
4518        cx_a.update(|_| drop(channel_a));
4519        server
4520            .condition(|state| state.channel(channel_id).is_none())
4521            .await;
4522    }
4523
4524    #[gpui::test(iterations = 10)]
4525    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4526        cx_a.foreground().forbid_parking();
4527
4528        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4529        let client_a = server.create_client(cx_a, "user_a").await;
4530
4531        let db = &server.app_state.db;
4532        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4533        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4534        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4535            .await
4536            .unwrap();
4537        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4538            .await
4539            .unwrap();
4540
4541        let channels_a = cx_a
4542            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4543        channels_a
4544            .condition(cx_a, |list, _| list.available_channels().is_some())
4545            .await;
4546        let channel_a = channels_a.update(cx_a, |this, cx| {
4547            this.get_channel(channel_id.to_proto(), cx).unwrap()
4548        });
4549
4550        // Messages aren't allowed to be too long.
4551        channel_a
4552            .update(cx_a, |channel, cx| {
4553                let long_body = "this is long.\n".repeat(1024);
4554                channel.send_message(long_body, cx).unwrap()
4555            })
4556            .await
4557            .unwrap_err();
4558
4559        // Messages aren't allowed to be blank.
4560        channel_a.update(cx_a, |channel, cx| {
4561            channel.send_message(String::new(), cx).unwrap_err()
4562        });
4563
4564        // Leading and trailing whitespace are trimmed.
4565        channel_a
4566            .update(cx_a, |channel, cx| {
4567                channel
4568                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4569                    .unwrap()
4570            })
4571            .await
4572            .unwrap();
4573        assert_eq!(
4574            db.get_channel_messages(channel_id, 10, None)
4575                .await
4576                .unwrap()
4577                .iter()
4578                .map(|m| &m.body)
4579                .collect::<Vec<_>>(),
4580            &["surrounded by whitespace"]
4581        );
4582    }
4583
4584    #[gpui::test(iterations = 10)]
4585    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4586        cx_a.foreground().forbid_parking();
4587
4588        // Connect to a server as 2 clients.
4589        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4590        let client_a = server.create_client(cx_a, "user_a").await;
4591        let client_b = server.create_client(cx_b, "user_b").await;
4592        let mut status_b = client_b.status();
4593
4594        // Create an org that includes these 2 users.
4595        let db = &server.app_state.db;
4596        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4597        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4598            .await
4599            .unwrap();
4600        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4601            .await
4602            .unwrap();
4603
4604        // Create a channel that includes all the users.
4605        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4606        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4607            .await
4608            .unwrap();
4609        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4610            .await
4611            .unwrap();
4612        db.create_channel_message(
4613            channel_id,
4614            client_b.current_user_id(&cx_b),
4615            "hello A, it's B.",
4616            OffsetDateTime::now_utc(),
4617            2,
4618        )
4619        .await
4620        .unwrap();
4621
4622        let channels_a = cx_a
4623            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4624        channels_a
4625            .condition(cx_a, |list, _| list.available_channels().is_some())
4626            .await;
4627
4628        channels_a.read_with(cx_a, |list, _| {
4629            assert_eq!(
4630                list.available_channels().unwrap(),
4631                &[ChannelDetails {
4632                    id: channel_id.to_proto(),
4633                    name: "test-channel".to_string()
4634                }]
4635            )
4636        });
4637        let channel_a = channels_a.update(cx_a, |this, cx| {
4638            this.get_channel(channel_id.to_proto(), cx).unwrap()
4639        });
4640        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4641        channel_a
4642            .condition(&cx_a, |channel, _| {
4643                channel_messages(channel)
4644                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4645            })
4646            .await;
4647
4648        let channels_b = cx_b
4649            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4650        channels_b
4651            .condition(cx_b, |list, _| list.available_channels().is_some())
4652            .await;
4653        channels_b.read_with(cx_b, |list, _| {
4654            assert_eq!(
4655                list.available_channels().unwrap(),
4656                &[ChannelDetails {
4657                    id: channel_id.to_proto(),
4658                    name: "test-channel".to_string()
4659                }]
4660            )
4661        });
4662
4663        let channel_b = channels_b.update(cx_b, |this, cx| {
4664            this.get_channel(channel_id.to_proto(), cx).unwrap()
4665        });
4666        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4667        channel_b
4668            .condition(&cx_b, |channel, _| {
4669                channel_messages(channel)
4670                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4671            })
4672            .await;
4673
4674        // Disconnect client B, ensuring we can still access its cached channel data.
4675        server.forbid_connections();
4676        server.disconnect_client(client_b.current_user_id(&cx_b));
4677        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4678        while !matches!(
4679            status_b.next().await,
4680            Some(client::Status::ReconnectionError { .. })
4681        ) {}
4682
4683        channels_b.read_with(cx_b, |channels, _| {
4684            assert_eq!(
4685                channels.available_channels().unwrap(),
4686                [ChannelDetails {
4687                    id: channel_id.to_proto(),
4688                    name: "test-channel".to_string()
4689                }]
4690            )
4691        });
4692        channel_b.read_with(cx_b, |channel, _| {
4693            assert_eq!(
4694                channel_messages(channel),
4695                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4696            )
4697        });
4698
4699        // Send a message from client B while it is disconnected.
4700        channel_b
4701            .update(cx_b, |channel, cx| {
4702                let task = channel
4703                    .send_message("can you see this?".to_string(), cx)
4704                    .unwrap();
4705                assert_eq!(
4706                    channel_messages(channel),
4707                    &[
4708                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4709                        ("user_b".to_string(), "can you see this?".to_string(), true)
4710                    ]
4711                );
4712                task
4713            })
4714            .await
4715            .unwrap_err();
4716
4717        // Send a message from client A while B is disconnected.
4718        channel_a
4719            .update(cx_a, |channel, cx| {
4720                channel
4721                    .send_message("oh, hi B.".to_string(), cx)
4722                    .unwrap()
4723                    .detach();
4724                let task = channel.send_message("sup".to_string(), cx).unwrap();
4725                assert_eq!(
4726                    channel_messages(channel),
4727                    &[
4728                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4729                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4730                        ("user_a".to_string(), "sup".to_string(), true)
4731                    ]
4732                );
4733                task
4734            })
4735            .await
4736            .unwrap();
4737
4738        // Give client B a chance to reconnect.
4739        server.allow_connections();
4740        cx_b.foreground().advance_clock(Duration::from_secs(10));
4741
4742        // Verify that B sees the new messages upon reconnection, as well as the message client B
4743        // sent while offline.
4744        channel_b
4745            .condition(&cx_b, |channel, _| {
4746                channel_messages(channel)
4747                    == [
4748                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4749                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4750                        ("user_a".to_string(), "sup".to_string(), false),
4751                        ("user_b".to_string(), "can you see this?".to_string(), false),
4752                    ]
4753            })
4754            .await;
4755
4756        // Ensure client A and B can communicate normally after reconnection.
4757        channel_a
4758            .update(cx_a, |channel, cx| {
4759                channel.send_message("you online?".to_string(), cx).unwrap()
4760            })
4761            .await
4762            .unwrap();
4763        channel_b
4764            .condition(&cx_b, |channel, _| {
4765                channel_messages(channel)
4766                    == [
4767                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4768                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4769                        ("user_a".to_string(), "sup".to_string(), false),
4770                        ("user_b".to_string(), "can you see this?".to_string(), false),
4771                        ("user_a".to_string(), "you online?".to_string(), false),
4772                    ]
4773            })
4774            .await;
4775
4776        channel_b
4777            .update(cx_b, |channel, cx| {
4778                channel.send_message("yep".to_string(), cx).unwrap()
4779            })
4780            .await
4781            .unwrap();
4782        channel_a
4783            .condition(&cx_a, |channel, _| {
4784                channel_messages(channel)
4785                    == [
4786                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4787                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4788                        ("user_a".to_string(), "sup".to_string(), false),
4789                        ("user_b".to_string(), "can you see this?".to_string(), false),
4790                        ("user_a".to_string(), "you online?".to_string(), false),
4791                        ("user_b".to_string(), "yep".to_string(), false),
4792                    ]
4793            })
4794            .await;
4795    }
4796
4797    #[gpui::test(iterations = 10)]
4798    async fn test_contacts(
4799        cx_a: &mut TestAppContext,
4800        cx_b: &mut TestAppContext,
4801        cx_c: &mut TestAppContext,
4802    ) {
4803        cx_a.foreground().forbid_parking();
4804        let lang_registry = Arc::new(LanguageRegistry::test());
4805        let fs = FakeFs::new(cx_a.background());
4806
4807        // Connect to a server as 3 clients.
4808        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4809        let client_a = server.create_client(cx_a, "user_a").await;
4810        let client_b = server.create_client(cx_b, "user_b").await;
4811        let client_c = server.create_client(cx_c, "user_c").await;
4812
4813        // Share a worktree as client A.
4814        fs.insert_tree(
4815            "/a",
4816            json!({
4817                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4818            }),
4819        )
4820        .await;
4821
4822        let project_a = cx_a.update(|cx| {
4823            Project::local(
4824                client_a.clone(),
4825                client_a.user_store.clone(),
4826                lang_registry.clone(),
4827                fs.clone(),
4828                cx,
4829            )
4830        });
4831        let (worktree_a, _) = project_a
4832            .update(cx_a, |p, cx| {
4833                p.find_or_create_local_worktree("/a", true, cx)
4834            })
4835            .await
4836            .unwrap();
4837        worktree_a
4838            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4839            .await;
4840
4841        client_a
4842            .user_store
4843            .condition(&cx_a, |user_store, _| {
4844                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4845            })
4846            .await;
4847        client_b
4848            .user_store
4849            .condition(&cx_b, |user_store, _| {
4850                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4851            })
4852            .await;
4853        client_c
4854            .user_store
4855            .condition(&cx_c, |user_store, _| {
4856                contacts(user_store) == vec![("user_a", vec![("a", false, vec![])])]
4857            })
4858            .await;
4859
4860        let project_id = project_a
4861            .update(cx_a, |project, _| project.next_remote_id())
4862            .await;
4863        project_a
4864            .update(cx_a, |project, cx| project.share(cx))
4865            .await
4866            .unwrap();
4867        client_a
4868            .user_store
4869            .condition(&cx_a, |user_store, _| {
4870                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4871            })
4872            .await;
4873        client_b
4874            .user_store
4875            .condition(&cx_b, |user_store, _| {
4876                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4877            })
4878            .await;
4879        client_c
4880            .user_store
4881            .condition(&cx_c, |user_store, _| {
4882                contacts(user_store) == vec![("user_a", vec![("a", true, vec![])])]
4883            })
4884            .await;
4885
4886        let _project_b = Project::remote(
4887            project_id,
4888            client_b.clone(),
4889            client_b.user_store.clone(),
4890            lang_registry.clone(),
4891            fs.clone(),
4892            &mut cx_b.to_async(),
4893        )
4894        .await
4895        .unwrap();
4896
4897        client_a
4898            .user_store
4899            .condition(&cx_a, |user_store, _| {
4900                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4901            })
4902            .await;
4903        client_b
4904            .user_store
4905            .condition(&cx_b, |user_store, _| {
4906                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4907            })
4908            .await;
4909        client_c
4910            .user_store
4911            .condition(&cx_c, |user_store, _| {
4912                contacts(user_store) == vec![("user_a", vec![("a", true, vec!["user_b"])])]
4913            })
4914            .await;
4915
4916        project_a
4917            .condition(&cx_a, |project, _| {
4918                project.collaborators().contains_key(&client_b.peer_id)
4919            })
4920            .await;
4921
4922        cx_a.update(move |_| drop(project_a));
4923        client_a
4924            .user_store
4925            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4926            .await;
4927        client_b
4928            .user_store
4929            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4930            .await;
4931        client_c
4932            .user_store
4933            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4934            .await;
4935
4936        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
4937            user_store
4938                .contacts()
4939                .iter()
4940                .map(|contact| {
4941                    let worktrees = contact
4942                        .projects
4943                        .iter()
4944                        .map(|p| {
4945                            (
4946                                p.worktree_root_names[0].as_str(),
4947                                p.is_shared,
4948                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4949                            )
4950                        })
4951                        .collect();
4952                    (contact.user.github_login.as_str(), worktrees)
4953                })
4954                .collect()
4955        }
4956    }
4957
4958    #[gpui::test(iterations = 10)]
4959    async fn test_contacts_requests(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4960        cx_a.foreground().forbid_parking();
4961
4962        // Connect to a server as 3 clients.
4963        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4964        let client_a = server.create_client(cx_a, "user_a").await;
4965        let client_b = server.create_client(cx_b, "user_b").await;
4966
4967        client_a
4968            .user_store
4969            .read_with(cx_a, |store, _| {
4970                store.request_contact(client_b.user_id().unwrap())
4971            })
4972            .await
4973            .unwrap();
4974
4975        client_a.user_store.read_with(cx_a, |store, _| {
4976            let contacts = store
4977                .contacts()
4978                .iter()
4979                .map(|contact| contact.user.github_login.clone())
4980                .collect::<Vec<_>>();
4981            assert_eq!(contacts, &["user_b"])
4982        });
4983    }
4984
4985    #[gpui::test(iterations = 10)]
4986    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4987        cx_a.foreground().forbid_parking();
4988        let fs = FakeFs::new(cx_a.background());
4989
4990        // 2 clients connect to a server.
4991        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4992        let mut client_a = server.create_client(cx_a, "user_a").await;
4993        let mut client_b = server.create_client(cx_b, "user_b").await;
4994        cx_a.update(editor::init);
4995        cx_b.update(editor::init);
4996
4997        // Client A shares a project.
4998        fs.insert_tree(
4999            "/a",
5000            json!({
5001                ".zed.toml": r#"collaborators = ["user_b"]"#,
5002                "1.txt": "one",
5003                "2.txt": "two",
5004                "3.txt": "three",
5005            }),
5006        )
5007        .await;
5008        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5009        project_a
5010            .update(cx_a, |project, cx| project.share(cx))
5011            .await
5012            .unwrap();
5013
5014        // Client B joins the project.
5015        let project_b = client_b
5016            .build_remote_project(
5017                project_a
5018                    .read_with(cx_a, |project, _| project.remote_id())
5019                    .unwrap(),
5020                cx_b,
5021            )
5022            .await;
5023
5024        // Client A opens some editors.
5025        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5026        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5027        let editor_a1 = workspace_a
5028            .update(cx_a, |workspace, cx| {
5029                workspace.open_path((worktree_id, "1.txt"), true, cx)
5030            })
5031            .await
5032            .unwrap()
5033            .downcast::<Editor>()
5034            .unwrap();
5035        let editor_a2 = workspace_a
5036            .update(cx_a, |workspace, cx| {
5037                workspace.open_path((worktree_id, "2.txt"), true, cx)
5038            })
5039            .await
5040            .unwrap()
5041            .downcast::<Editor>()
5042            .unwrap();
5043
5044        // Client B opens an editor.
5045        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5046        let editor_b1 = workspace_b
5047            .update(cx_b, |workspace, cx| {
5048                workspace.open_path((worktree_id, "1.txt"), true, cx)
5049            })
5050            .await
5051            .unwrap()
5052            .downcast::<Editor>()
5053            .unwrap();
5054
5055        let client_a_id = project_b.read_with(cx_b, |project, _| {
5056            project.collaborators().values().next().unwrap().peer_id
5057        });
5058        let client_b_id = project_a.read_with(cx_a, |project, _| {
5059            project.collaborators().values().next().unwrap().peer_id
5060        });
5061
5062        // When client B starts following client A, all visible view states are replicated to client B.
5063        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5064        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5065        workspace_b
5066            .update(cx_b, |workspace, cx| {
5067                workspace
5068                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5069                    .unwrap()
5070            })
5071            .await
5072            .unwrap();
5073        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5074            workspace
5075                .active_item(cx)
5076                .unwrap()
5077                .downcast::<Editor>()
5078                .unwrap()
5079        });
5080        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5081        assert_eq!(
5082            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5083            Some((worktree_id, "2.txt").into())
5084        );
5085        assert_eq!(
5086            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5087            vec![2..3]
5088        );
5089        assert_eq!(
5090            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5091            vec![0..1]
5092        );
5093
5094        // When client A activates a different editor, client B does so as well.
5095        workspace_a.update(cx_a, |workspace, cx| {
5096            workspace.activate_item(&editor_a1, cx)
5097        });
5098        workspace_b
5099            .condition(cx_b, |workspace, cx| {
5100                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5101            })
5102            .await;
5103
5104        // When client A navigates back and forth, client B does so as well.
5105        workspace_a
5106            .update(cx_a, |workspace, cx| {
5107                workspace::Pane::go_back(workspace, None, cx)
5108            })
5109            .await;
5110        workspace_b
5111            .condition(cx_b, |workspace, cx| {
5112                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5113            })
5114            .await;
5115
5116        workspace_a
5117            .update(cx_a, |workspace, cx| {
5118                workspace::Pane::go_forward(workspace, None, cx)
5119            })
5120            .await;
5121        workspace_b
5122            .condition(cx_b, |workspace, cx| {
5123                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5124            })
5125            .await;
5126
5127        // Changes to client A's editor are reflected on client B.
5128        editor_a1.update(cx_a, |editor, cx| {
5129            editor.select_ranges([1..1, 2..2], None, cx);
5130        });
5131        editor_b1
5132            .condition(cx_b, |editor, cx| {
5133                editor.selected_ranges(cx) == vec![1..1, 2..2]
5134            })
5135            .await;
5136
5137        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5138        editor_b1
5139            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5140            .await;
5141
5142        editor_a1.update(cx_a, |editor, cx| {
5143            editor.select_ranges([3..3], None, cx);
5144            editor.set_scroll_position(vec2f(0., 100.), cx);
5145        });
5146        editor_b1
5147            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5148            .await;
5149
5150        // After unfollowing, client B stops receiving updates from client A.
5151        workspace_b.update(cx_b, |workspace, cx| {
5152            workspace.unfollow(&workspace.active_pane().clone(), cx)
5153        });
5154        workspace_a.update(cx_a, |workspace, cx| {
5155            workspace.activate_item(&editor_a2, cx)
5156        });
5157        cx_a.foreground().run_until_parked();
5158        assert_eq!(
5159            workspace_b.read_with(cx_b, |workspace, cx| workspace
5160                .active_item(cx)
5161                .unwrap()
5162                .id()),
5163            editor_b1.id()
5164        );
5165
5166        // Client A starts following client B.
5167        workspace_a
5168            .update(cx_a, |workspace, cx| {
5169                workspace
5170                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5171                    .unwrap()
5172            })
5173            .await
5174            .unwrap();
5175        assert_eq!(
5176            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5177            Some(client_b_id)
5178        );
5179        assert_eq!(
5180            workspace_a.read_with(cx_a, |workspace, cx| workspace
5181                .active_item(cx)
5182                .unwrap()
5183                .id()),
5184            editor_a1.id()
5185        );
5186
5187        // Following interrupts when client B disconnects.
5188        client_b.disconnect(&cx_b.to_async()).unwrap();
5189        cx_a.foreground().run_until_parked();
5190        assert_eq!(
5191            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5192            None
5193        );
5194    }
5195
5196    #[gpui::test(iterations = 10)]
5197    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5198        cx_a.foreground().forbid_parking();
5199        let fs = FakeFs::new(cx_a.background());
5200
5201        // 2 clients connect to a server.
5202        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5203        let mut client_a = server.create_client(cx_a, "user_a").await;
5204        let mut client_b = server.create_client(cx_b, "user_b").await;
5205        cx_a.update(editor::init);
5206        cx_b.update(editor::init);
5207
5208        // Client A shares a project.
5209        fs.insert_tree(
5210            "/a",
5211            json!({
5212                ".zed.toml": r#"collaborators = ["user_b"]"#,
5213                "1.txt": "one",
5214                "2.txt": "two",
5215                "3.txt": "three",
5216                "4.txt": "four",
5217            }),
5218        )
5219        .await;
5220        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5221        project_a
5222            .update(cx_a, |project, cx| project.share(cx))
5223            .await
5224            .unwrap();
5225
5226        // Client B joins the project.
5227        let project_b = client_b
5228            .build_remote_project(
5229                project_a
5230                    .read_with(cx_a, |project, _| project.remote_id())
5231                    .unwrap(),
5232                cx_b,
5233            )
5234            .await;
5235
5236        // Client A opens some editors.
5237        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5238        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5239        let _editor_a1 = workspace_a
5240            .update(cx_a, |workspace, cx| {
5241                workspace.open_path((worktree_id, "1.txt"), true, cx)
5242            })
5243            .await
5244            .unwrap()
5245            .downcast::<Editor>()
5246            .unwrap();
5247
5248        // Client B opens an editor.
5249        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5250        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5251        let _editor_b1 = workspace_b
5252            .update(cx_b, |workspace, cx| {
5253                workspace.open_path((worktree_id, "2.txt"), true, cx)
5254            })
5255            .await
5256            .unwrap()
5257            .downcast::<Editor>()
5258            .unwrap();
5259
5260        // Clients A and B follow each other in split panes
5261        workspace_a
5262            .update(cx_a, |workspace, cx| {
5263                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5264                assert_ne!(*workspace.active_pane(), pane_a1);
5265                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5266                workspace
5267                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5268                    .unwrap()
5269            })
5270            .await
5271            .unwrap();
5272        workspace_b
5273            .update(cx_b, |workspace, cx| {
5274                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5275                assert_ne!(*workspace.active_pane(), pane_b1);
5276                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5277                workspace
5278                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5279                    .unwrap()
5280            })
5281            .await
5282            .unwrap();
5283
5284        workspace_a
5285            .update(cx_a, |workspace, cx| {
5286                workspace.activate_next_pane(cx);
5287                assert_eq!(*workspace.active_pane(), pane_a1);
5288                workspace.open_path((worktree_id, "3.txt"), true, cx)
5289            })
5290            .await
5291            .unwrap();
5292        workspace_b
5293            .update(cx_b, |workspace, cx| {
5294                workspace.activate_next_pane(cx);
5295                assert_eq!(*workspace.active_pane(), pane_b1);
5296                workspace.open_path((worktree_id, "4.txt"), true, cx)
5297            })
5298            .await
5299            .unwrap();
5300        cx_a.foreground().run_until_parked();
5301
5302        // Ensure leader updates don't change the active pane of followers
5303        workspace_a.read_with(cx_a, |workspace, _| {
5304            assert_eq!(*workspace.active_pane(), pane_a1);
5305        });
5306        workspace_b.read_with(cx_b, |workspace, _| {
5307            assert_eq!(*workspace.active_pane(), pane_b1);
5308        });
5309
5310        // Ensure peers following each other doesn't cause an infinite loop.
5311        assert_eq!(
5312            workspace_a.read_with(cx_a, |workspace, cx| workspace
5313                .active_item(cx)
5314                .unwrap()
5315                .project_path(cx)),
5316            Some((worktree_id, "3.txt").into())
5317        );
5318        workspace_a.update(cx_a, |workspace, cx| {
5319            assert_eq!(
5320                workspace.active_item(cx).unwrap().project_path(cx),
5321                Some((worktree_id, "3.txt").into())
5322            );
5323            workspace.activate_next_pane(cx);
5324            assert_eq!(
5325                workspace.active_item(cx).unwrap().project_path(cx),
5326                Some((worktree_id, "4.txt").into())
5327            );
5328        });
5329        workspace_b.update(cx_b, |workspace, cx| {
5330            assert_eq!(
5331                workspace.active_item(cx).unwrap().project_path(cx),
5332                Some((worktree_id, "4.txt").into())
5333            );
5334            workspace.activate_next_pane(cx);
5335            assert_eq!(
5336                workspace.active_item(cx).unwrap().project_path(cx),
5337                Some((worktree_id, "3.txt").into())
5338            );
5339        });
5340    }
5341
5342    #[gpui::test(iterations = 10)]
5343    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5344        cx_a.foreground().forbid_parking();
5345        let fs = FakeFs::new(cx_a.background());
5346
5347        // 2 clients connect to a server.
5348        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5349        let mut client_a = server.create_client(cx_a, "user_a").await;
5350        let mut client_b = server.create_client(cx_b, "user_b").await;
5351        cx_a.update(editor::init);
5352        cx_b.update(editor::init);
5353
5354        // Client A shares a project.
5355        fs.insert_tree(
5356            "/a",
5357            json!({
5358                ".zed.toml": r#"collaborators = ["user_b"]"#,
5359                "1.txt": "one",
5360                "2.txt": "two",
5361                "3.txt": "three",
5362            }),
5363        )
5364        .await;
5365        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5366        project_a
5367            .update(cx_a, |project, cx| project.share(cx))
5368            .await
5369            .unwrap();
5370
5371        // Client B joins the project.
5372        let project_b = client_b
5373            .build_remote_project(
5374                project_a
5375                    .read_with(cx_a, |project, _| project.remote_id())
5376                    .unwrap(),
5377                cx_b,
5378            )
5379            .await;
5380
5381        // Client A opens some editors.
5382        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5383        let _editor_a1 = workspace_a
5384            .update(cx_a, |workspace, cx| {
5385                workspace.open_path((worktree_id, "1.txt"), true, cx)
5386            })
5387            .await
5388            .unwrap()
5389            .downcast::<Editor>()
5390            .unwrap();
5391
5392        // Client B starts following client A.
5393        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5394        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5395        let leader_id = project_b.read_with(cx_b, |project, _| {
5396            project.collaborators().values().next().unwrap().peer_id
5397        });
5398        workspace_b
5399            .update(cx_b, |workspace, cx| {
5400                workspace
5401                    .toggle_follow(&ToggleFollow(leader_id), cx)
5402                    .unwrap()
5403            })
5404            .await
5405            .unwrap();
5406        assert_eq!(
5407            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5408            Some(leader_id)
5409        );
5410        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5411            workspace
5412                .active_item(cx)
5413                .unwrap()
5414                .downcast::<Editor>()
5415                .unwrap()
5416        });
5417
5418        // When client B moves, it automatically stops following client A.
5419        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5420        assert_eq!(
5421            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5422            None
5423        );
5424
5425        workspace_b
5426            .update(cx_b, |workspace, cx| {
5427                workspace
5428                    .toggle_follow(&ToggleFollow(leader_id), cx)
5429                    .unwrap()
5430            })
5431            .await
5432            .unwrap();
5433        assert_eq!(
5434            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5435            Some(leader_id)
5436        );
5437
5438        // When client B edits, it automatically stops following client A.
5439        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5440        assert_eq!(
5441            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5442            None
5443        );
5444
5445        workspace_b
5446            .update(cx_b, |workspace, cx| {
5447                workspace
5448                    .toggle_follow(&ToggleFollow(leader_id), cx)
5449                    .unwrap()
5450            })
5451            .await
5452            .unwrap();
5453        assert_eq!(
5454            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5455            Some(leader_id)
5456        );
5457
5458        // When client B scrolls, it automatically stops following client A.
5459        editor_b2.update(cx_b, |editor, cx| {
5460            editor.set_scroll_position(vec2f(0., 3.), cx)
5461        });
5462        assert_eq!(
5463            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5464            None
5465        );
5466
5467        workspace_b
5468            .update(cx_b, |workspace, cx| {
5469                workspace
5470                    .toggle_follow(&ToggleFollow(leader_id), cx)
5471                    .unwrap()
5472            })
5473            .await
5474            .unwrap();
5475        assert_eq!(
5476            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5477            Some(leader_id)
5478        );
5479
5480        // When client B activates a different pane, it continues following client A in the original pane.
5481        workspace_b.update(cx_b, |workspace, cx| {
5482            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5483        });
5484        assert_eq!(
5485            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5486            Some(leader_id)
5487        );
5488
5489        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5490        assert_eq!(
5491            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5492            Some(leader_id)
5493        );
5494
5495        // When client B activates a different item in the original pane, it automatically stops following client A.
5496        workspace_b
5497            .update(cx_b, |workspace, cx| {
5498                workspace.open_path((worktree_id, "2.txt"), true, cx)
5499            })
5500            .await
5501            .unwrap();
5502        assert_eq!(
5503            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5504            None
5505        );
5506    }
5507
5508    #[gpui::test(iterations = 100)]
5509    async fn test_random_collaboration(
5510        cx: &mut TestAppContext,
5511        deterministic: Arc<Deterministic>,
5512        rng: StdRng,
5513    ) {
5514        cx.foreground().forbid_parking();
5515        let max_peers = env::var("MAX_PEERS")
5516            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5517            .unwrap_or(5);
5518        assert!(max_peers <= 5);
5519
5520        let max_operations = env::var("OPERATIONS")
5521            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5522            .unwrap_or(10);
5523
5524        let rng = Arc::new(Mutex::new(rng));
5525
5526        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5527        let host_language_registry = Arc::new(LanguageRegistry::test());
5528
5529        let fs = FakeFs::new(cx.background());
5530        fs.insert_tree(
5531            "/_collab",
5532            json!({
5533                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5534            }),
5535        )
5536        .await;
5537
5538        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5539        let mut clients = Vec::new();
5540        let mut user_ids = Vec::new();
5541        let mut op_start_signals = Vec::new();
5542        let files = Arc::new(Mutex::new(Vec::new()));
5543
5544        let mut next_entity_id = 100000;
5545        let mut host_cx = TestAppContext::new(
5546            cx.foreground_platform(),
5547            cx.platform(),
5548            deterministic.build_foreground(next_entity_id),
5549            deterministic.build_background(),
5550            cx.font_cache(),
5551            cx.leak_detector(),
5552            next_entity_id,
5553        );
5554        let host = server.create_client(&mut host_cx, "host").await;
5555        let host_project = host_cx.update(|cx| {
5556            Project::local(
5557                host.client.clone(),
5558                host.user_store.clone(),
5559                host_language_registry.clone(),
5560                fs.clone(),
5561                cx,
5562            )
5563        });
5564        let host_project_id = host_project
5565            .update(&mut host_cx, |p, _| p.next_remote_id())
5566            .await;
5567
5568        let (collab_worktree, _) = host_project
5569            .update(&mut host_cx, |project, cx| {
5570                project.find_or_create_local_worktree("/_collab", true, cx)
5571            })
5572            .await
5573            .unwrap();
5574        collab_worktree
5575            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5576            .await;
5577        host_project
5578            .update(&mut host_cx, |project, cx| project.share(cx))
5579            .await
5580            .unwrap();
5581
5582        // Set up fake language servers.
5583        let mut language = Language::new(
5584            LanguageConfig {
5585                name: "Rust".into(),
5586                path_suffixes: vec!["rs".to_string()],
5587                ..Default::default()
5588            },
5589            None,
5590        );
5591        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5592            name: "the-fake-language-server",
5593            capabilities: lsp::LanguageServer::full_capabilities(),
5594            initializer: Some(Box::new({
5595                let rng = rng.clone();
5596                let files = files.clone();
5597                let project = host_project.downgrade();
5598                move |fake_server: &mut FakeLanguageServer| {
5599                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5600                        |_, _| async move {
5601                            Ok(Some(lsp::CompletionResponse::Array(vec![
5602                                lsp::CompletionItem {
5603                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5604                                        range: lsp::Range::new(
5605                                            lsp::Position::new(0, 0),
5606                                            lsp::Position::new(0, 0),
5607                                        ),
5608                                        new_text: "the-new-text".to_string(),
5609                                    })),
5610                                    ..Default::default()
5611                                },
5612                            ])))
5613                        },
5614                    );
5615
5616                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5617                        |_, _| async move {
5618                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5619                                lsp::CodeAction {
5620                                    title: "the-code-action".to_string(),
5621                                    ..Default::default()
5622                                },
5623                            )]))
5624                        },
5625                    );
5626
5627                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5628                        |params, _| async move {
5629                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5630                                params.position,
5631                                params.position,
5632                            ))))
5633                        },
5634                    );
5635
5636                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5637                        let files = files.clone();
5638                        let rng = rng.clone();
5639                        move |_, _| {
5640                            let files = files.clone();
5641                            let rng = rng.clone();
5642                            async move {
5643                                let files = files.lock();
5644                                let mut rng = rng.lock();
5645                                let count = rng.gen_range::<usize, _>(1..3);
5646                                let files = (0..count)
5647                                    .map(|_| files.choose(&mut *rng).unwrap())
5648                                    .collect::<Vec<_>>();
5649                                log::info!("LSP: Returning definitions in files {:?}", &files);
5650                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5651                                    files
5652                                        .into_iter()
5653                                        .map(|file| lsp::Location {
5654                                            uri: lsp::Url::from_file_path(file).unwrap(),
5655                                            range: Default::default(),
5656                                        })
5657                                        .collect(),
5658                                )))
5659                            }
5660                        }
5661                    });
5662
5663                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5664                        let rng = rng.clone();
5665                        let project = project.clone();
5666                        move |params, mut cx| {
5667                            let highlights = if let Some(project) = project.upgrade(&cx) {
5668                                project.update(&mut cx, |project, cx| {
5669                                    let path = params
5670                                        .text_document_position_params
5671                                        .text_document
5672                                        .uri
5673                                        .to_file_path()
5674                                        .unwrap();
5675                                    let (worktree, relative_path) =
5676                                        project.find_local_worktree(&path, cx)?;
5677                                    let project_path =
5678                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5679                                    let buffer =
5680                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5681
5682                                    let mut highlights = Vec::new();
5683                                    let highlight_count = rng.lock().gen_range(1..=5);
5684                                    let mut prev_end = 0;
5685                                    for _ in 0..highlight_count {
5686                                        let range =
5687                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5688
5689                                        highlights.push(lsp::DocumentHighlight {
5690                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5691                                            kind: Some(lsp::DocumentHighlightKind::READ),
5692                                        });
5693                                        prev_end = range.end;
5694                                    }
5695                                    Some(highlights)
5696                                })
5697                            } else {
5698                                None
5699                            };
5700                            async move { Ok(highlights) }
5701                        }
5702                    });
5703                }
5704            })),
5705            ..Default::default()
5706        });
5707        host_language_registry.add(Arc::new(language));
5708
5709        let op_start_signal = futures::channel::mpsc::unbounded();
5710        user_ids.push(host.current_user_id(&host_cx));
5711        op_start_signals.push(op_start_signal.0);
5712        clients.push(host_cx.foreground().spawn(host.simulate_host(
5713            host_project,
5714            files,
5715            op_start_signal.1,
5716            rng.clone(),
5717            host_cx,
5718        )));
5719
5720        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5721            rng.lock().gen_range(0..max_operations)
5722        } else {
5723            max_operations
5724        };
5725        let mut available_guests = vec![
5726            "guest-1".to_string(),
5727            "guest-2".to_string(),
5728            "guest-3".to_string(),
5729            "guest-4".to_string(),
5730        ];
5731        let mut operations = 0;
5732        while operations < max_operations {
5733            if operations == disconnect_host_at {
5734                server.disconnect_client(user_ids[0]);
5735                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5736                drop(op_start_signals);
5737                let mut clients = futures::future::join_all(clients).await;
5738                cx.foreground().run_until_parked();
5739
5740                let (host, mut host_cx, host_err) = clients.remove(0);
5741                if let Some(host_err) = host_err {
5742                    log::error!("host error - {}", host_err);
5743                }
5744                host.project
5745                    .as_ref()
5746                    .unwrap()
5747                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
5748                for (guest, mut guest_cx, guest_err) in clients {
5749                    if let Some(guest_err) = guest_err {
5750                        log::error!("{} error - {}", guest.username, guest_err);
5751                    }
5752                    let contacts = server
5753                        .store
5754                        .read()
5755                        .await
5756                        .contacts_for_user(guest.current_user_id(&guest_cx));
5757                    assert!(!contacts
5758                        .iter()
5759                        .flat_map(|contact| &contact.projects)
5760                        .any(|project| project.id == host_project_id));
5761                    guest
5762                        .project
5763                        .as_ref()
5764                        .unwrap()
5765                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5766                    guest_cx.update(|_| drop(guest));
5767                }
5768                host_cx.update(|_| drop(host));
5769
5770                return;
5771            }
5772
5773            let distribution = rng.lock().gen_range(0..100);
5774            match distribution {
5775                0..=19 if !available_guests.is_empty() => {
5776                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
5777                    let guest_username = available_guests.remove(guest_ix);
5778                    log::info!("Adding new connection for {}", guest_username);
5779                    next_entity_id += 100000;
5780                    let mut guest_cx = TestAppContext::new(
5781                        cx.foreground_platform(),
5782                        cx.platform(),
5783                        deterministic.build_foreground(next_entity_id),
5784                        deterministic.build_background(),
5785                        cx.font_cache(),
5786                        cx.leak_detector(),
5787                        next_entity_id,
5788                    );
5789                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
5790                    let guest_project = Project::remote(
5791                        host_project_id,
5792                        guest.client.clone(),
5793                        guest.user_store.clone(),
5794                        guest_lang_registry.clone(),
5795                        FakeFs::new(cx.background()),
5796                        &mut guest_cx.to_async(),
5797                    )
5798                    .await
5799                    .unwrap();
5800                    let op_start_signal = futures::channel::mpsc::unbounded();
5801                    user_ids.push(guest.current_user_id(&guest_cx));
5802                    op_start_signals.push(op_start_signal.0);
5803                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
5804                        guest_username.clone(),
5805                        guest_project,
5806                        op_start_signal.1,
5807                        rng.clone(),
5808                        guest_cx,
5809                    )));
5810
5811                    log::info!("Added connection for {}", guest_username);
5812                    operations += 1;
5813                }
5814                20..=29 if clients.len() > 1 => {
5815                    log::info!("Removing guest");
5816                    let guest_ix = rng.lock().gen_range(1..clients.len());
5817                    let removed_guest_id = user_ids.remove(guest_ix);
5818                    let guest = clients.remove(guest_ix);
5819                    op_start_signals.remove(guest_ix);
5820                    server.disconnect_client(removed_guest_id);
5821                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
5822                    let (guest, mut guest_cx, guest_err) = guest.await;
5823                    if let Some(guest_err) = guest_err {
5824                        log::error!("{} error - {}", guest.username, guest_err);
5825                    }
5826                    guest
5827                        .project
5828                        .as_ref()
5829                        .unwrap()
5830                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
5831                    for user_id in &user_ids {
5832                        for contact in server.store.read().await.contacts_for_user(*user_id) {
5833                            assert_ne!(
5834                                contact.user_id, removed_guest_id.0 as u64,
5835                                "removed guest is still a contact of another peer"
5836                            );
5837                            for project in contact.projects {
5838                                for project_guest_id in project.guests {
5839                                    assert_ne!(
5840                                        project_guest_id, removed_guest_id.0 as u64,
5841                                        "removed guest appears as still participating on a project"
5842                                    );
5843                                }
5844                            }
5845                        }
5846                    }
5847
5848                    log::info!("{} removed", guest.username);
5849                    available_guests.push(guest.username.clone());
5850                    guest_cx.update(|_| drop(guest));
5851
5852                    operations += 1;
5853                }
5854                _ => {
5855                    while operations < max_operations && rng.lock().gen_bool(0.7) {
5856                        op_start_signals
5857                            .choose(&mut *rng.lock())
5858                            .unwrap()
5859                            .unbounded_send(())
5860                            .unwrap();
5861                        operations += 1;
5862                    }
5863
5864                    if rng.lock().gen_bool(0.8) {
5865                        cx.foreground().run_until_parked();
5866                    }
5867                }
5868            }
5869        }
5870
5871        drop(op_start_signals);
5872        let mut clients = futures::future::join_all(clients).await;
5873        cx.foreground().run_until_parked();
5874
5875        let (host_client, mut host_cx, host_err) = clients.remove(0);
5876        if let Some(host_err) = host_err {
5877            panic!("host error - {}", host_err);
5878        }
5879        let host_project = host_client.project.as_ref().unwrap();
5880        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
5881            project
5882                .worktrees(cx)
5883                .map(|worktree| {
5884                    let snapshot = worktree.read(cx).snapshot();
5885                    (snapshot.id(), snapshot)
5886                })
5887                .collect::<BTreeMap<_, _>>()
5888        });
5889
5890        host_client
5891            .project
5892            .as_ref()
5893            .unwrap()
5894            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
5895
5896        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
5897            if let Some(guest_err) = guest_err {
5898                panic!("{} error - {}", guest_client.username, guest_err);
5899            }
5900            let worktree_snapshots =
5901                guest_client
5902                    .project
5903                    .as_ref()
5904                    .unwrap()
5905                    .read_with(&guest_cx, |project, cx| {
5906                        project
5907                            .worktrees(cx)
5908                            .map(|worktree| {
5909                                let worktree = worktree.read(cx);
5910                                (worktree.id(), worktree.snapshot())
5911                            })
5912                            .collect::<BTreeMap<_, _>>()
5913                    });
5914
5915            assert_eq!(
5916                worktree_snapshots.keys().collect::<Vec<_>>(),
5917                host_worktree_snapshots.keys().collect::<Vec<_>>(),
5918                "{} has different worktrees than the host",
5919                guest_client.username
5920            );
5921            for (id, host_snapshot) in &host_worktree_snapshots {
5922                let guest_snapshot = &worktree_snapshots[id];
5923                assert_eq!(
5924                    guest_snapshot.root_name(),
5925                    host_snapshot.root_name(),
5926                    "{} has different root name than the host for worktree {}",
5927                    guest_client.username,
5928                    id
5929                );
5930                assert_eq!(
5931                    guest_snapshot.entries(false).collect::<Vec<_>>(),
5932                    host_snapshot.entries(false).collect::<Vec<_>>(),
5933                    "{} has different snapshot than the host for worktree {}",
5934                    guest_client.username,
5935                    id
5936                );
5937                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
5938            }
5939
5940            guest_client
5941                .project
5942                .as_ref()
5943                .unwrap()
5944                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
5945
5946            for guest_buffer in &guest_client.buffers {
5947                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
5948                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
5949                    project.buffer_for_id(buffer_id, cx).expect(&format!(
5950                        "host does not have buffer for guest:{}, peer:{}, id:{}",
5951                        guest_client.username, guest_client.peer_id, buffer_id
5952                    ))
5953                });
5954                let path = host_buffer
5955                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
5956
5957                assert_eq!(
5958                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
5959                    0,
5960                    "{}, buffer {}, path {:?} has deferred operations",
5961                    guest_client.username,
5962                    buffer_id,
5963                    path,
5964                );
5965                assert_eq!(
5966                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
5967                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
5968                    "{}, buffer {}, path {:?}, differs from the host's buffer",
5969                    guest_client.username,
5970                    buffer_id,
5971                    path
5972                );
5973            }
5974
5975            guest_cx.update(|_| drop(guest_client));
5976        }
5977
5978        host_cx.update(|_| drop(host_client));
5979    }
5980
5981    struct TestServer {
5982        peer: Arc<Peer>,
5983        app_state: Arc<AppState>,
5984        server: Arc<Server>,
5985        foreground: Rc<executor::Foreground>,
5986        notifications: mpsc::UnboundedReceiver<()>,
5987        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
5988        forbid_connections: Arc<AtomicBool>,
5989        _test_db: TestDb,
5990    }
5991
5992    impl TestServer {
5993        async fn start(
5994            foreground: Rc<executor::Foreground>,
5995            background: Arc<executor::Background>,
5996        ) -> Self {
5997            let test_db = TestDb::fake(background);
5998            let app_state = Self::build_app_state(&test_db).await;
5999            let peer = Peer::new();
6000            let notifications = mpsc::unbounded();
6001            let server = Server::new(app_state.clone(), Some(notifications.0));
6002            Self {
6003                peer,
6004                app_state,
6005                server,
6006                foreground,
6007                notifications: notifications.1,
6008                connection_killers: Default::default(),
6009                forbid_connections: Default::default(),
6010                _test_db: test_db,
6011            }
6012        }
6013
6014        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6015            cx.update(|cx| {
6016                let settings = Settings::test(cx);
6017                cx.set_global(settings);
6018            });
6019
6020            let http = FakeHttpClient::with_404_response();
6021            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
6022            let client_name = name.to_string();
6023            let mut client = Client::new(http.clone());
6024            let server = self.server.clone();
6025            let connection_killers = self.connection_killers.clone();
6026            let forbid_connections = self.forbid_connections.clone();
6027            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6028
6029            Arc::get_mut(&mut client)
6030                .unwrap()
6031                .override_authenticate(move |cx| {
6032                    cx.spawn(|_| async move {
6033                        let access_token = "the-token".to_string();
6034                        Ok(Credentials {
6035                            user_id: user_id.0 as u64,
6036                            access_token,
6037                        })
6038                    })
6039                })
6040                .override_establish_connection(move |credentials, cx| {
6041                    assert_eq!(credentials.user_id, user_id.0 as u64);
6042                    assert_eq!(credentials.access_token, "the-token");
6043
6044                    let server = server.clone();
6045                    let connection_killers = connection_killers.clone();
6046                    let forbid_connections = forbid_connections.clone();
6047                    let client_name = client_name.clone();
6048                    let connection_id_tx = connection_id_tx.clone();
6049                    cx.spawn(move |cx| async move {
6050                        if forbid_connections.load(SeqCst) {
6051                            Err(EstablishConnectionError::other(anyhow!(
6052                                "server is forbidding connections"
6053                            )))
6054                        } else {
6055                            let (client_conn, server_conn, killed) =
6056                                Connection::in_memory(cx.background());
6057                            connection_killers.lock().insert(user_id, killed);
6058                            cx.background()
6059                                .spawn(server.handle_connection(
6060                                    server_conn,
6061                                    client_name,
6062                                    user_id,
6063                                    Some(connection_id_tx),
6064                                    cx.background(),
6065                                ))
6066                                .detach();
6067                            Ok(client_conn)
6068                        }
6069                    })
6070                });
6071
6072            client
6073                .authenticate_and_connect(false, &cx.to_async())
6074                .await
6075                .unwrap();
6076
6077            Channel::init(&client);
6078            Project::init(&client);
6079            cx.update(|cx| {
6080                workspace::init(&client, cx);
6081            });
6082
6083            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6084            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6085
6086            let client = TestClient {
6087                client,
6088                peer_id,
6089                username: name.to_string(),
6090                user_store,
6091                language_registry: Arc::new(LanguageRegistry::test()),
6092                project: Default::default(),
6093                buffers: Default::default(),
6094            };
6095            client.wait_for_current_user(cx).await;
6096            client
6097        }
6098
6099        fn disconnect_client(&self, user_id: UserId) {
6100            self.connection_killers
6101                .lock()
6102                .remove(&user_id)
6103                .unwrap()
6104                .store(true, SeqCst);
6105        }
6106
6107        fn forbid_connections(&self) {
6108            self.forbid_connections.store(true, SeqCst);
6109        }
6110
6111        fn allow_connections(&self) {
6112            self.forbid_connections.store(false, SeqCst);
6113        }
6114
6115        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6116            Arc::new(AppState {
6117                db: test_db.db().clone(),
6118                api_token: Default::default(),
6119            })
6120        }
6121
6122        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6123            self.server.store.read().await
6124        }
6125
6126        async fn condition<F>(&mut self, mut predicate: F)
6127        where
6128            F: FnMut(&Store) -> bool,
6129        {
6130            assert!(
6131                self.foreground.parking_forbidden(),
6132                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6133            );
6134            while !(predicate)(&*self.server.store.read().await) {
6135                self.foreground.start_waiting();
6136                self.notifications.next().await;
6137                self.foreground.finish_waiting();
6138            }
6139        }
6140    }
6141
6142    impl Deref for TestServer {
6143        type Target = Server;
6144
6145        fn deref(&self) -> &Self::Target {
6146            &self.server
6147        }
6148    }
6149
6150    impl Drop for TestServer {
6151        fn drop(&mut self) {
6152            self.peer.reset();
6153        }
6154    }
6155
6156    struct TestClient {
6157        client: Arc<Client>,
6158        username: String,
6159        pub peer_id: PeerId,
6160        pub user_store: ModelHandle<UserStore>,
6161        language_registry: Arc<LanguageRegistry>,
6162        project: Option<ModelHandle<Project>>,
6163        buffers: HashSet<ModelHandle<language::Buffer>>,
6164    }
6165
6166    impl Deref for TestClient {
6167        type Target = Arc<Client>;
6168
6169        fn deref(&self) -> &Self::Target {
6170            &self.client
6171        }
6172    }
6173
6174    impl TestClient {
6175        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6176            UserId::from_proto(
6177                self.user_store
6178                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6179            )
6180        }
6181
6182        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6183            let mut authed_user = self
6184                .user_store
6185                .read_with(cx, |user_store, _| user_store.watch_current_user());
6186            while authed_user.next().await.unwrap().is_none() {}
6187        }
6188
6189        async fn build_local_project(
6190            &mut self,
6191            fs: Arc<FakeFs>,
6192            root_path: impl AsRef<Path>,
6193            cx: &mut TestAppContext,
6194        ) -> (ModelHandle<Project>, WorktreeId) {
6195            let project = cx.update(|cx| {
6196                Project::local(
6197                    self.client.clone(),
6198                    self.user_store.clone(),
6199                    self.language_registry.clone(),
6200                    fs,
6201                    cx,
6202                )
6203            });
6204            self.project = Some(project.clone());
6205            let (worktree, _) = project
6206                .update(cx, |p, cx| {
6207                    p.find_or_create_local_worktree(root_path, true, cx)
6208                })
6209                .await
6210                .unwrap();
6211            worktree
6212                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6213                .await;
6214            project
6215                .update(cx, |project, _| project.next_remote_id())
6216                .await;
6217            (project, worktree.read_with(cx, |tree, _| tree.id()))
6218        }
6219
6220        async fn build_remote_project(
6221            &mut self,
6222            project_id: u64,
6223            cx: &mut TestAppContext,
6224        ) -> ModelHandle<Project> {
6225            let project = Project::remote(
6226                project_id,
6227                self.client.clone(),
6228                self.user_store.clone(),
6229                self.language_registry.clone(),
6230                FakeFs::new(cx.background()),
6231                &mut cx.to_async(),
6232            )
6233            .await
6234            .unwrap();
6235            self.project = Some(project.clone());
6236            project
6237        }
6238
6239        fn build_workspace(
6240            &self,
6241            project: &ModelHandle<Project>,
6242            cx: &mut TestAppContext,
6243        ) -> ViewHandle<Workspace> {
6244            let (window_id, _) = cx.add_window(|_| EmptyView);
6245            cx.add_view(window_id, |cx| {
6246                let fs = project.read(cx).fs().clone();
6247                Workspace::new(
6248                    &WorkspaceParams {
6249                        fs,
6250                        project: project.clone(),
6251                        user_store: self.user_store.clone(),
6252                        languages: self.language_registry.clone(),
6253                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6254                        channel_list: cx.add_model(|cx| {
6255                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6256                        }),
6257                        client: self.client.clone(),
6258                    },
6259                    cx,
6260                )
6261            })
6262        }
6263
6264        async fn simulate_host(
6265            mut self,
6266            project: ModelHandle<Project>,
6267            files: Arc<Mutex<Vec<PathBuf>>>,
6268            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6269            rng: Arc<Mutex<StdRng>>,
6270            mut cx: TestAppContext,
6271        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6272            async fn simulate_host_internal(
6273                client: &mut TestClient,
6274                project: ModelHandle<Project>,
6275                files: Arc<Mutex<Vec<PathBuf>>>,
6276                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6277                rng: Arc<Mutex<StdRng>>,
6278                cx: &mut TestAppContext,
6279            ) -> anyhow::Result<()> {
6280                let fs = project.read_with(cx, |project, _| project.fs().clone());
6281
6282                while op_start_signal.next().await.is_some() {
6283                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6284                    match distribution {
6285                        0..=20 if !files.lock().is_empty() => {
6286                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6287                            let mut path = path.as_path();
6288                            while let Some(parent_path) = path.parent() {
6289                                path = parent_path;
6290                                if rng.lock().gen() {
6291                                    break;
6292                                }
6293                            }
6294
6295                            log::info!("Host: find/create local worktree {:?}", path);
6296                            let find_or_create_worktree = project.update(cx, |project, cx| {
6297                                project.find_or_create_local_worktree(path, true, cx)
6298                            });
6299                            if rng.lock().gen() {
6300                                cx.background().spawn(find_or_create_worktree).detach();
6301                            } else {
6302                                find_or_create_worktree.await?;
6303                            }
6304                        }
6305                        10..=80 if !files.lock().is_empty() => {
6306                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6307                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6308                                let (worktree, path) = project
6309                                    .update(cx, |project, cx| {
6310                                        project.find_or_create_local_worktree(
6311                                            file.clone(),
6312                                            true,
6313                                            cx,
6314                                        )
6315                                    })
6316                                    .await?;
6317                                let project_path =
6318                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6319                                log::info!(
6320                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6321                                    file,
6322                                    project_path.0,
6323                                    project_path.1
6324                                );
6325                                let buffer = project
6326                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6327                                    .await
6328                                    .unwrap();
6329                                client.buffers.insert(buffer.clone());
6330                                buffer
6331                            } else {
6332                                client
6333                                    .buffers
6334                                    .iter()
6335                                    .choose(&mut *rng.lock())
6336                                    .unwrap()
6337                                    .clone()
6338                            };
6339
6340                            if rng.lock().gen_bool(0.1) {
6341                                cx.update(|cx| {
6342                                    log::info!(
6343                                        "Host: dropping buffer {:?}",
6344                                        buffer.read(cx).file().unwrap().full_path(cx)
6345                                    );
6346                                    client.buffers.remove(&buffer);
6347                                    drop(buffer);
6348                                });
6349                            } else {
6350                                buffer.update(cx, |buffer, cx| {
6351                                    log::info!(
6352                                        "Host: updating buffer {:?} ({})",
6353                                        buffer.file().unwrap().full_path(cx),
6354                                        buffer.remote_id()
6355                                    );
6356
6357                                    if rng.lock().gen_bool(0.7) {
6358                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6359                                    } else {
6360                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6361                                    }
6362                                });
6363                            }
6364                        }
6365                        _ => loop {
6366                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6367                            let mut path = PathBuf::new();
6368                            path.push("/");
6369                            for _ in 0..path_component_count {
6370                                let letter = rng.lock().gen_range(b'a'..=b'z');
6371                                path.push(std::str::from_utf8(&[letter]).unwrap());
6372                            }
6373                            path.set_extension("rs");
6374                            let parent_path = path.parent().unwrap();
6375
6376                            log::info!("Host: creating file {:?}", path,);
6377
6378                            if fs.create_dir(&parent_path).await.is_ok()
6379                                && fs.create_file(&path, Default::default()).await.is_ok()
6380                            {
6381                                files.lock().push(path);
6382                                break;
6383                            } else {
6384                                log::info!("Host: cannot create file");
6385                            }
6386                        },
6387                    }
6388
6389                    cx.background().simulate_random_delay().await;
6390                }
6391
6392                Ok(())
6393            }
6394
6395            let result = simulate_host_internal(
6396                &mut self,
6397                project.clone(),
6398                files,
6399                op_start_signal,
6400                rng,
6401                &mut cx,
6402            )
6403            .await;
6404            log::info!("Host done");
6405            self.project = Some(project);
6406            (self, cx, result.err())
6407        }
6408
6409        pub async fn simulate_guest(
6410            mut self,
6411            guest_username: String,
6412            project: ModelHandle<Project>,
6413            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6414            rng: Arc<Mutex<StdRng>>,
6415            mut cx: TestAppContext,
6416        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6417            async fn simulate_guest_internal(
6418                client: &mut TestClient,
6419                guest_username: &str,
6420                project: ModelHandle<Project>,
6421                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6422                rng: Arc<Mutex<StdRng>>,
6423                cx: &mut TestAppContext,
6424            ) -> anyhow::Result<()> {
6425                while op_start_signal.next().await.is_some() {
6426                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6427                        let worktree = if let Some(worktree) =
6428                            project.read_with(cx, |project, cx| {
6429                                project
6430                                    .worktrees(&cx)
6431                                    .filter(|worktree| {
6432                                        let worktree = worktree.read(cx);
6433                                        worktree.is_visible()
6434                                            && worktree.entries(false).any(|e| e.is_file())
6435                                    })
6436                                    .choose(&mut *rng.lock())
6437                            }) {
6438                            worktree
6439                        } else {
6440                            cx.background().simulate_random_delay().await;
6441                            continue;
6442                        };
6443
6444                        let (worktree_root_name, project_path) =
6445                            worktree.read_with(cx, |worktree, _| {
6446                                let entry = worktree
6447                                    .entries(false)
6448                                    .filter(|e| e.is_file())
6449                                    .choose(&mut *rng.lock())
6450                                    .unwrap();
6451                                (
6452                                    worktree.root_name().to_string(),
6453                                    (worktree.id(), entry.path.clone()),
6454                                )
6455                            });
6456                        log::info!(
6457                            "{}: opening path {:?} in worktree {} ({})",
6458                            guest_username,
6459                            project_path.1,
6460                            project_path.0,
6461                            worktree_root_name,
6462                        );
6463                        let buffer = project
6464                            .update(cx, |project, cx| {
6465                                project.open_buffer(project_path.clone(), cx)
6466                            })
6467                            .await?;
6468                        log::info!(
6469                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6470                            guest_username,
6471                            project_path.1,
6472                            project_path.0,
6473                            worktree_root_name,
6474                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6475                        );
6476                        client.buffers.insert(buffer.clone());
6477                        buffer
6478                    } else {
6479                        client
6480                            .buffers
6481                            .iter()
6482                            .choose(&mut *rng.lock())
6483                            .unwrap()
6484                            .clone()
6485                    };
6486
6487                    let choice = rng.lock().gen_range(0..100);
6488                    match choice {
6489                        0..=9 => {
6490                            cx.update(|cx| {
6491                                log::info!(
6492                                    "{}: dropping buffer {:?}",
6493                                    guest_username,
6494                                    buffer.read(cx).file().unwrap().full_path(cx)
6495                                );
6496                                client.buffers.remove(&buffer);
6497                                drop(buffer);
6498                            });
6499                        }
6500                        10..=19 => {
6501                            let completions = project.update(cx, |project, cx| {
6502                                log::info!(
6503                                    "{}: requesting completions for buffer {} ({:?})",
6504                                    guest_username,
6505                                    buffer.read(cx).remote_id(),
6506                                    buffer.read(cx).file().unwrap().full_path(cx)
6507                                );
6508                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6509                                project.completions(&buffer, offset, cx)
6510                            });
6511                            let completions = cx.background().spawn(async move {
6512                                completions
6513                                    .await
6514                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6515                            });
6516                            if rng.lock().gen_bool(0.3) {
6517                                log::info!("{}: detaching completions request", guest_username);
6518                                cx.update(|cx| completions.detach_and_log_err(cx));
6519                            } else {
6520                                completions.await?;
6521                            }
6522                        }
6523                        20..=29 => {
6524                            let code_actions = project.update(cx, |project, cx| {
6525                                log::info!(
6526                                    "{}: requesting code actions for buffer {} ({:?})",
6527                                    guest_username,
6528                                    buffer.read(cx).remote_id(),
6529                                    buffer.read(cx).file().unwrap().full_path(cx)
6530                                );
6531                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6532                                project.code_actions(&buffer, range, cx)
6533                            });
6534                            let code_actions = cx.background().spawn(async move {
6535                                code_actions.await.map_err(|err| {
6536                                    anyhow!("code actions request failed: {:?}", err)
6537                                })
6538                            });
6539                            if rng.lock().gen_bool(0.3) {
6540                                log::info!("{}: detaching code actions request", guest_username);
6541                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6542                            } else {
6543                                code_actions.await?;
6544                            }
6545                        }
6546                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6547                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6548                                log::info!(
6549                                    "{}: saving buffer {} ({:?})",
6550                                    guest_username,
6551                                    buffer.remote_id(),
6552                                    buffer.file().unwrap().full_path(cx)
6553                                );
6554                                (buffer.version(), buffer.save(cx))
6555                            });
6556                            let save = cx.background().spawn(async move {
6557                                let (saved_version, _) = save
6558                                    .await
6559                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6560                                assert!(saved_version.observed_all(&requested_version));
6561                                Ok::<_, anyhow::Error>(())
6562                            });
6563                            if rng.lock().gen_bool(0.3) {
6564                                log::info!("{}: detaching save request", guest_username);
6565                                cx.update(|cx| save.detach_and_log_err(cx));
6566                            } else {
6567                                save.await?;
6568                            }
6569                        }
6570                        40..=44 => {
6571                            let prepare_rename = project.update(cx, |project, cx| {
6572                                log::info!(
6573                                    "{}: preparing rename for buffer {} ({:?})",
6574                                    guest_username,
6575                                    buffer.read(cx).remote_id(),
6576                                    buffer.read(cx).file().unwrap().full_path(cx)
6577                                );
6578                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6579                                project.prepare_rename(buffer, offset, cx)
6580                            });
6581                            let prepare_rename = cx.background().spawn(async move {
6582                                prepare_rename.await.map_err(|err| {
6583                                    anyhow!("prepare rename request failed: {:?}", err)
6584                                })
6585                            });
6586                            if rng.lock().gen_bool(0.3) {
6587                                log::info!("{}: detaching prepare rename request", guest_username);
6588                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6589                            } else {
6590                                prepare_rename.await?;
6591                            }
6592                        }
6593                        45..=49 => {
6594                            let definitions = project.update(cx, |project, cx| {
6595                                log::info!(
6596                                    "{}: requesting definitions for buffer {} ({:?})",
6597                                    guest_username,
6598                                    buffer.read(cx).remote_id(),
6599                                    buffer.read(cx).file().unwrap().full_path(cx)
6600                                );
6601                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6602                                project.definition(&buffer, offset, cx)
6603                            });
6604                            let definitions = cx.background().spawn(async move {
6605                                definitions
6606                                    .await
6607                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6608                            });
6609                            if rng.lock().gen_bool(0.3) {
6610                                log::info!("{}: detaching definitions request", guest_username);
6611                                cx.update(|cx| definitions.detach_and_log_err(cx));
6612                            } else {
6613                                client
6614                                    .buffers
6615                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6616                            }
6617                        }
6618                        50..=54 => {
6619                            let highlights = project.update(cx, |project, cx| {
6620                                log::info!(
6621                                    "{}: requesting highlights for buffer {} ({:?})",
6622                                    guest_username,
6623                                    buffer.read(cx).remote_id(),
6624                                    buffer.read(cx).file().unwrap().full_path(cx)
6625                                );
6626                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6627                                project.document_highlights(&buffer, offset, cx)
6628                            });
6629                            let highlights = cx.background().spawn(async move {
6630                                highlights
6631                                    .await
6632                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6633                            });
6634                            if rng.lock().gen_bool(0.3) {
6635                                log::info!("{}: detaching highlights request", guest_username);
6636                                cx.update(|cx| highlights.detach_and_log_err(cx));
6637                            } else {
6638                                highlights.await?;
6639                            }
6640                        }
6641                        55..=59 => {
6642                            let search = project.update(cx, |project, cx| {
6643                                let query = rng.lock().gen_range('a'..='z');
6644                                log::info!("{}: project-wide search {:?}", guest_username, query);
6645                                project.search(SearchQuery::text(query, false, false), cx)
6646                            });
6647                            let search = cx.background().spawn(async move {
6648                                search
6649                                    .await
6650                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6651                            });
6652                            if rng.lock().gen_bool(0.3) {
6653                                log::info!("{}: detaching search request", guest_username);
6654                                cx.update(|cx| search.detach_and_log_err(cx));
6655                            } else {
6656                                client.buffers.extend(search.await?.into_keys());
6657                            }
6658                        }
6659                        60..=69 => {
6660                            let worktree = project
6661                                .read_with(cx, |project, cx| {
6662                                    project
6663                                        .worktrees(&cx)
6664                                        .filter(|worktree| {
6665                                            let worktree = worktree.read(cx);
6666                                            worktree.is_visible()
6667                                                && worktree.entries(false).any(|e| e.is_file())
6668                                                && worktree
6669                                                    .root_entry()
6670                                                    .map_or(false, |e| e.is_dir())
6671                                        })
6672                                        .choose(&mut *rng.lock())
6673                                })
6674                                .unwrap();
6675                            let (worktree_id, worktree_root_name) = worktree
6676                                .read_with(cx, |worktree, _| {
6677                                    (worktree.id(), worktree.root_name().to_string())
6678                                });
6679
6680                            let mut new_name = String::new();
6681                            for _ in 0..10 {
6682                                let letter = rng.lock().gen_range('a'..='z');
6683                                new_name.push(letter);
6684                            }
6685                            let mut new_path = PathBuf::new();
6686                            new_path.push(new_name);
6687                            new_path.set_extension("rs");
6688                            log::info!(
6689                                "{}: creating {:?} in worktree {} ({})",
6690                                guest_username,
6691                                new_path,
6692                                worktree_id,
6693                                worktree_root_name,
6694                            );
6695                            project
6696                                .update(cx, |project, cx| {
6697                                    project.create_entry((worktree_id, new_path), false, cx)
6698                                })
6699                                .unwrap()
6700                                .await?;
6701                        }
6702                        _ => {
6703                            buffer.update(cx, |buffer, cx| {
6704                                log::info!(
6705                                    "{}: updating buffer {} ({:?})",
6706                                    guest_username,
6707                                    buffer.remote_id(),
6708                                    buffer.file().unwrap().full_path(cx)
6709                                );
6710                                if rng.lock().gen_bool(0.7) {
6711                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6712                                } else {
6713                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6714                                }
6715                            });
6716                        }
6717                    }
6718                    cx.background().simulate_random_delay().await;
6719                }
6720                Ok(())
6721            }
6722
6723            let result = simulate_guest_internal(
6724                &mut self,
6725                &guest_username,
6726                project.clone(),
6727                op_start_signal,
6728                rng,
6729                &mut cx,
6730            )
6731            .await;
6732            log::info!("{}: done", guest_username);
6733
6734            self.project = Some(project);
6735            (self, cx, result.err())
6736        }
6737    }
6738
6739    impl Drop for TestClient {
6740        fn drop(&mut self) {
6741            self.client.tear_down();
6742        }
6743    }
6744
6745    impl Executor for Arc<gpui::executor::Background> {
6746        type Sleep = gpui::executor::Timer;
6747
6748        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
6749            self.spawn(future).detach();
6750        }
6751
6752        fn sleep(&self, duration: Duration) -> Self::Sleep {
6753            self.as_ref().timer(duration)
6754        }
6755    }
6756
6757    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
6758        channel
6759            .messages()
6760            .cursor::<()>()
6761            .map(|m| {
6762                (
6763                    m.sender.github_login.clone(),
6764                    m.body.clone(),
6765                    m.is_pending(),
6766                )
6767            })
6768            .collect()
6769    }
6770
6771    struct EmptyView;
6772
6773    impl gpui::Entity for EmptyView {
6774        type Event = ();
6775    }
6776
6777    impl gpui::View for EmptyView {
6778        fn ui_name() -> &'static str {
6779            "empty view"
6780        }
6781
6782        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
6783            gpui::Element::boxed(gpui::elements::Empty)
6784        }
6785    }
6786}