rpc.rs

   1mod store;
   2
   3use crate::{
   4    auth,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState, Result,
   7};
   8use anyhow::anyhow;
   9use async_tungstenite::tungstenite::{
  10    protocol::CloseFrame as TungsteniteCloseFrame, Message as TungsteniteMessage,
  11};
  12use axum::{
  13    body::Body,
  14    extract::{
  15        ws::{CloseFrame as AxumCloseFrame, Message as AxumMessage},
  16        ConnectInfo, WebSocketUpgrade,
  17    },
  18    headers::{Header, HeaderName},
  19    http::StatusCode,
  20    middleware,
  21    response::IntoResponse,
  22    routing::get,
  23    Extension, Router, TypedHeader,
  24};
  25use collections::HashMap;
  26use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt, TryStreamExt};
  27use lazy_static::lazy_static;
  28use rpc::{
  29    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  30    Connection, ConnectionId, Peer, Receipt, TypedEnvelope,
  31};
  32use std::{
  33    any::TypeId,
  34    future::Future,
  35    marker::PhantomData,
  36    net::SocketAddr,
  37    ops::{Deref, DerefMut},
  38    rc::Rc,
  39    sync::{
  40        atomic::{AtomicBool, Ordering::SeqCst},
  41        Arc,
  42    },
  43    time::Duration,
  44};
  45use store::{Store, Worktree};
  46use time::OffsetDateTime;
  47use tokio::{
  48    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard},
  49    time::Sleep,
  50};
  51use tower::ServiceBuilder;
  52use tracing::{info_span, Instrument};
  53
  54type MessageHandler =
  55    Box<dyn Send + Sync + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, ()>>;
  56
  57struct Response<R> {
  58    server: Arc<Server>,
  59    receipt: Receipt<R>,
  60    responded: Arc<AtomicBool>,
  61}
  62
  63impl<R: RequestMessage> Response<R> {
  64    fn send(self, payload: R::Response) -> Result<()> {
  65        self.responded.store(true, SeqCst);
  66        self.server.peer.respond(self.receipt, payload)?;
  67        Ok(())
  68    }
  69}
  70
  71pub struct Server {
  72    peer: Arc<Peer>,
  73    store: RwLock<Store>,
  74    app_state: Arc<AppState>,
  75    handlers: HashMap<TypeId, MessageHandler>,
  76    notifications: Option<mpsc::UnboundedSender<()>>,
  77}
  78
  79pub trait Executor: Send + Clone {
  80    type Sleep: Send + Future;
  81    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  82    fn sleep(&self, duration: Duration) -> Self::Sleep;
  83}
  84
  85#[derive(Clone)]
  86pub struct RealExecutor;
  87
  88const MESSAGE_COUNT_PER_PAGE: usize = 100;
  89const MAX_MESSAGE_LEN: usize = 1024;
  90
  91struct StoreReadGuard<'a> {
  92    guard: RwLockReadGuard<'a, Store>,
  93    _not_send: PhantomData<Rc<()>>,
  94}
  95
  96struct StoreWriteGuard<'a> {
  97    guard: RwLockWriteGuard<'a, Store>,
  98    _not_send: PhantomData<Rc<()>>,
  99}
 100
 101impl Server {
 102    pub fn new(
 103        app_state: Arc<AppState>,
 104        notifications: Option<mpsc::UnboundedSender<()>>,
 105    ) -> Arc<Self> {
 106        let mut server = Self {
 107            peer: Peer::new(),
 108            app_state,
 109            store: Default::default(),
 110            handlers: Default::default(),
 111            notifications,
 112        };
 113
 114        server
 115            .add_request_handler(Server::ping)
 116            .add_request_handler(Server::register_project)
 117            .add_message_handler(Server::unregister_project)
 118            .add_request_handler(Server::share_project)
 119            .add_message_handler(Server::unshare_project)
 120            .add_request_handler(Server::join_project)
 121            .add_message_handler(Server::leave_project)
 122            .add_request_handler(Server::register_worktree)
 123            .add_message_handler(Server::unregister_worktree)
 124            .add_request_handler(Server::update_worktree)
 125            .add_message_handler(Server::start_language_server)
 126            .add_message_handler(Server::update_language_server)
 127            .add_message_handler(Server::update_diagnostic_summary)
 128            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
 129            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
 130            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
 131            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
 132            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
 133            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
 134            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
 135            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
 136            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
 137            .add_request_handler(
 138                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 139            )
 140            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 141            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 142            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 143            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 144            .add_request_handler(Server::forward_project_request::<proto::ReloadBuffers>)
 145            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 146            .add_request_handler(Server::forward_project_request::<proto::CreateProjectEntry>)
 147            .add_request_handler(Server::forward_project_request::<proto::RenameProjectEntry>)
 148            .add_request_handler(Server::forward_project_request::<proto::DeleteProjectEntry>)
 149            .add_request_handler(Server::update_buffer)
 150            .add_message_handler(Server::update_buffer_file)
 151            .add_message_handler(Server::buffer_reloaded)
 152            .add_message_handler(Server::buffer_saved)
 153            .add_request_handler(Server::save_buffer)
 154            .add_request_handler(Server::get_channels)
 155            .add_request_handler(Server::get_users)
 156            .add_request_handler(Server::fuzzy_search_users)
 157            .add_request_handler(Server::request_contact)
 158            .add_request_handler(Server::respond_to_contact_request)
 159            .add_request_handler(Server::join_channel)
 160            .add_message_handler(Server::leave_channel)
 161            .add_request_handler(Server::send_channel_message)
 162            .add_request_handler(Server::follow)
 163            .add_message_handler(Server::unfollow)
 164            .add_message_handler(Server::update_followers)
 165            .add_request_handler(Server::get_channel_messages);
 166
 167        Arc::new(server)
 168    }
 169
 170    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 171    where
 172        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 173        Fut: 'static + Send + Future<Output = Result<()>>,
 174        M: EnvelopedMessage,
 175    {
 176        let prev_handler = self.handlers.insert(
 177            TypeId::of::<M>(),
 178            Box::new(move |server, envelope| {
 179                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 180                let span = info_span!(
 181                    "handle message",
 182                    payload_type = envelope.payload_type_name(),
 183                    payload = format!("{:?}", envelope.payload).as_str(),
 184                );
 185                let future = (handler)(server, *envelope);
 186                async move {
 187                    if let Err(error) = future.await {
 188                        tracing::error!(%error, "error handling message");
 189                    }
 190                }
 191                .instrument(span)
 192                .boxed()
 193            }),
 194        );
 195        if prev_handler.is_some() {
 196            panic!("registered a handler for the same message twice");
 197        }
 198        self
 199    }
 200
 201    /// Handle a request while holding a lock to the store. This is useful when we're registering
 202    /// a connection but we want to respond on the connection before anybody else can send on it.
 203    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 204    where
 205        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>, Response<M>) -> Fut,
 206        Fut: Send + Future<Output = Result<()>>,
 207        M: RequestMessage,
 208    {
 209        let handler = Arc::new(handler);
 210        self.add_message_handler(move |server, envelope| {
 211            let receipt = envelope.receipt();
 212            let handler = handler.clone();
 213            async move {
 214                let responded = Arc::new(AtomicBool::default());
 215                let response = Response {
 216                    server: server.clone(),
 217                    responded: responded.clone(),
 218                    receipt: envelope.receipt(),
 219                };
 220                match (handler)(server.clone(), envelope, response).await {
 221                    Ok(()) => {
 222                        if responded.load(std::sync::atomic::Ordering::SeqCst) {
 223                            Ok(())
 224                        } else {
 225                            Err(anyhow!("handler did not send a response"))?
 226                        }
 227                    }
 228                    Err(error) => {
 229                        server.peer.respond_with_error(
 230                            receipt,
 231                            proto::Error {
 232                                message: error.to_string(),
 233                            },
 234                        )?;
 235                        Err(error)
 236                    }
 237                }
 238            }
 239        })
 240    }
 241
 242    pub fn handle_connection<E: Executor>(
 243        self: &Arc<Self>,
 244        connection: Connection,
 245        address: String,
 246        user_id: UserId,
 247        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 248        executor: E,
 249    ) -> impl Future<Output = Result<()>> {
 250        let mut this = self.clone();
 251        let span = info_span!("handle connection", %user_id, %address);
 252        async move {
 253            let (connection_id, handle_io, mut incoming_rx) = this
 254                .peer
 255                .add_connection(connection, {
 256                    let executor = executor.clone();
 257                    move |duration| {
 258                        let timer = executor.sleep(duration);
 259                        async move {
 260                            timer.await;
 261                        }
 262                    }
 263                })
 264                .await;
 265
 266            tracing::info!(%user_id, %connection_id, %address, "connection opened");
 267
 268            if let Some(send_connection_id) = send_connection_id.as_mut() {
 269                let _ = send_connection_id.send(connection_id).await;
 270            }
 271
 272            let contacts = this.app_state.db.get_contacts(user_id).await?;
 273
 274            {
 275                let mut store = this.store_mut().await;
 276                store.add_connection(connection_id, user_id);
 277                this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?;
 278            }
 279
 280            let handle_io = handle_io.fuse();
 281            futures::pin_mut!(handle_io);
 282            loop {
 283                let next_message = incoming_rx.next().fuse();
 284                futures::pin_mut!(next_message);
 285                futures::select_biased! {
 286                    result = handle_io => {
 287                        if let Err(error) = result {
 288                            tracing::error!(%error, "error handling I/O");
 289                        }
 290                        break;
 291                    }
 292                    message = next_message => {
 293                        if let Some(message) = message {
 294                            let type_name = message.payload_type_name();
 295                            let span = tracing::info_span!("receive message", %user_id, %connection_id, %address, type_name);
 296                            async {
 297                                if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 298                                    let notifications = this.notifications.clone();
 299                                    let is_background = message.is_background();
 300                                    let handle_message = (handler)(this.clone(), message);
 301                                    let handle_message = async move {
 302                                        handle_message.await;
 303                                        if let Some(mut notifications) = notifications {
 304                                            let _ = notifications.send(()).await;
 305                                        }
 306                                    };
 307                                    if is_background {
 308                                        executor.spawn_detached(handle_message);
 309                                    } else {
 310                                        handle_message.await;
 311                                    }
 312                                } else {
 313                                    tracing::error!("no message handler");
 314                                }
 315                            }.instrument(span).await;
 316                        } else {
 317                            tracing::info!(%user_id, %connection_id, %address, "connection closed");
 318                            break;
 319                        }
 320                    }
 321                }
 322            }
 323
 324            if let Err(error) = this.sign_out(connection_id).await {
 325                tracing::error!(%error, "error signing out");
 326            }
 327
 328            Ok(())
 329        }.instrument(span)
 330    }
 331
 332    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> Result<()> {
 333        self.peer.disconnect(connection_id);
 334        let removed_connection = self.store_mut().await.remove_connection(connection_id)?;
 335
 336        for (project_id, project) in removed_connection.hosted_projects {
 337            if let Some(share) = project.share {
 338                broadcast(connection_id, share.guests.keys().copied(), |conn_id| {
 339                    self.peer
 340                        .send(conn_id, proto::UnshareProject { project_id })
 341                });
 342            }
 343        }
 344
 345        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 346            broadcast(connection_id, peer_ids, |conn_id| {
 347                self.peer.send(
 348                    conn_id,
 349                    proto::RemoveProjectCollaborator {
 350                        project_id,
 351                        peer_id: connection_id.0,
 352                    },
 353                )
 354            });
 355        }
 356
 357        let contacts_to_update = self
 358            .app_state
 359            .db
 360            .get_contacts(removed_connection.user_id)
 361            .await?;
 362        let store = self.store().await;
 363        let mut update = proto::UpdateContacts::default();
 364        update
 365            .contacts
 366            .push(store.contact_for_user(removed_connection.user_id));
 367
 368        for user_id in contacts_to_update.current {
 369            for connection_id in store.connection_ids_for_user(user_id) {
 370                self.peer.send(connection_id, update.clone()).trace_err();
 371            }
 372        }
 373
 374        Ok(())
 375    }
 376
 377    async fn ping(
 378        self: Arc<Server>,
 379        _: TypedEnvelope<proto::Ping>,
 380        response: Response<proto::Ping>,
 381    ) -> Result<()> {
 382        response.send(proto::Ack {})?;
 383        Ok(())
 384    }
 385
 386    async fn register_project(
 387        self: Arc<Server>,
 388        request: TypedEnvelope<proto::RegisterProject>,
 389        response: Response<proto::RegisterProject>,
 390    ) -> Result<()> {
 391        let project_id = {
 392            let mut state = self.store_mut().await;
 393            let user_id = state.user_id_for_connection(request.sender_id)?;
 394            state.register_project(request.sender_id, user_id)
 395        };
 396        response.send(proto::RegisterProjectResponse { project_id })?;
 397        Ok(())
 398    }
 399
 400    async fn unregister_project(
 401        self: Arc<Server>,
 402        request: TypedEnvelope<proto::UnregisterProject>,
 403    ) -> Result<()> {
 404        let user_id = {
 405            let mut state = self.store_mut().await;
 406            state.unregister_project(request.payload.project_id, request.sender_id)?;
 407            state.user_id_for_connection(request.sender_id)?
 408        };
 409
 410        self.update_user_contacts(user_id).await?;
 411        Ok(())
 412    }
 413
 414    async fn share_project(
 415        self: Arc<Server>,
 416        request: TypedEnvelope<proto::ShareProject>,
 417        response: Response<proto::ShareProject>,
 418    ) -> Result<()> {
 419        let user_id = {
 420            let mut state = self.store_mut().await;
 421            state.share_project(request.payload.project_id, request.sender_id)?;
 422            state.user_id_for_connection(request.sender_id)?
 423        };
 424        self.update_user_contacts(user_id).await?;
 425        response.send(proto::Ack {})?;
 426        Ok(())
 427    }
 428
 429    async fn update_user_contacts(self: &Arc<Server>, user_id: UserId) -> Result<()> {
 430        let contacts = self.app_state.db.get_contacts(user_id).await?;
 431        let store = self.store().await;
 432        let updated_contact = store.contact_for_user(user_id);
 433        for contact_user_id in contacts.current {
 434            for contact_conn_id in store.connection_ids_for_user(contact_user_id) {
 435                self.peer
 436                    .send(
 437                        contact_conn_id,
 438                        proto::UpdateContacts {
 439                            contacts: vec![updated_contact.clone()],
 440                            remove_contacts: Default::default(),
 441                            incoming_requests: Default::default(),
 442                            remove_incoming_requests: Default::default(),
 443                            outgoing_requests: Default::default(),
 444                            remove_outgoing_requests: Default::default(),
 445                        },
 446                    )
 447                    .trace_err();
 448            }
 449        }
 450        Ok(())
 451    }
 452
 453    async fn unshare_project(
 454        self: Arc<Server>,
 455        request: TypedEnvelope<proto::UnshareProject>,
 456    ) -> Result<()> {
 457        let project_id = request.payload.project_id;
 458        let project;
 459        {
 460            let mut state = self.store_mut().await;
 461            project = state.unshare_project(project_id, request.sender_id)?;
 462            broadcast(request.sender_id, project.connection_ids, |conn_id| {
 463                self.peer
 464                    .send(conn_id, proto::UnshareProject { project_id })
 465            });
 466        }
 467        self.update_user_contacts(project.host_user_id).await?;
 468        Ok(())
 469    }
 470
 471    async fn join_project(
 472        self: Arc<Server>,
 473        request: TypedEnvelope<proto::JoinProject>,
 474        response: Response<proto::JoinProject>,
 475    ) -> Result<()> {
 476        let project_id = request.payload.project_id;
 477        let response_payload;
 478        let host_user_id;
 479        {
 480            let state = &mut *self.store_mut().await;
 481            let user_id = state.user_id_for_connection(request.sender_id)?;
 482            let joined = state.join_project(request.sender_id, user_id, project_id)?;
 483            let share = joined.project.share()?;
 484            let peer_count = share.guests.len();
 485            let mut collaborators = Vec::with_capacity(peer_count);
 486            collaborators.push(proto::Collaborator {
 487                peer_id: joined.project.host_connection_id.0,
 488                replica_id: 0,
 489                user_id: joined.project.host_user_id.to_proto(),
 490            });
 491            let worktrees = share
 492                .worktrees
 493                .iter()
 494                .filter_map(|(id, shared_worktree)| {
 495                    let worktree = joined.project.worktrees.get(&id)?;
 496                    Some(proto::Worktree {
 497                        id: *id,
 498                        root_name: worktree.root_name.clone(),
 499                        entries: shared_worktree.entries.values().cloned().collect(),
 500                        diagnostic_summaries: shared_worktree
 501                            .diagnostic_summaries
 502                            .values()
 503                            .cloned()
 504                            .collect(),
 505                        visible: worktree.visible,
 506                        scan_id: shared_worktree.scan_id,
 507                    })
 508                })
 509                .collect();
 510            for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 511                if *peer_conn_id != request.sender_id {
 512                    collaborators.push(proto::Collaborator {
 513                        peer_id: peer_conn_id.0,
 514                        replica_id: *peer_replica_id as u32,
 515                        user_id: peer_user_id.to_proto(),
 516                    });
 517                }
 518            }
 519            response_payload = proto::JoinProjectResponse {
 520                worktrees,
 521                replica_id: joined.replica_id as u32,
 522                collaborators,
 523                language_servers: joined.project.language_servers.clone(),
 524            };
 525            host_user_id = joined.project.host_user_id;
 526            broadcast(
 527                request.sender_id,
 528                joined.project.connection_ids(),
 529                |conn_id| {
 530                    self.peer.send(
 531                        conn_id,
 532                        proto::AddProjectCollaborator {
 533                            project_id,
 534                            collaborator: Some(proto::Collaborator {
 535                                peer_id: request.sender_id.0,
 536                                replica_id: response_payload.replica_id,
 537                                user_id: user_id.to_proto(),
 538                            }),
 539                        },
 540                    )
 541                },
 542            );
 543        }
 544        self.update_user_contacts(host_user_id).await?;
 545        response.send(response_payload)?;
 546        Ok(())
 547    }
 548
 549    async fn leave_project(
 550        self: Arc<Server>,
 551        request: TypedEnvelope<proto::LeaveProject>,
 552    ) -> Result<()> {
 553        let sender_id = request.sender_id;
 554        let project_id = request.payload.project_id;
 555        let project;
 556        {
 557            let mut state = self.store_mut().await;
 558            project = state.leave_project(sender_id, project_id)?;
 559            broadcast(sender_id, project.connection_ids, |conn_id| {
 560                self.peer.send(
 561                    conn_id,
 562                    proto::RemoveProjectCollaborator {
 563                        project_id,
 564                        peer_id: sender_id.0,
 565                    },
 566                )
 567            });
 568        }
 569        self.update_user_contacts(project.host_user_id).await?;
 570        Ok(())
 571    }
 572
 573    async fn register_worktree(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::RegisterWorktree>,
 576        response: Response<proto::RegisterWorktree>,
 577    ) -> Result<()> {
 578        let host_user_id;
 579        {
 580            let mut state = self.store_mut().await;
 581            host_user_id = state.user_id_for_connection(request.sender_id)?;
 582
 583            let guest_connection_ids = state
 584                .read_project(request.payload.project_id, request.sender_id)?
 585                .guest_connection_ids();
 586            state.register_worktree(
 587                request.payload.project_id,
 588                request.payload.worktree_id,
 589                request.sender_id,
 590                Worktree {
 591                    root_name: request.payload.root_name.clone(),
 592                    visible: request.payload.visible,
 593                },
 594            )?;
 595
 596            broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 597                self.peer
 598                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 599            });
 600        }
 601        self.update_user_contacts(host_user_id).await?;
 602        response.send(proto::Ack {})?;
 603        Ok(())
 604    }
 605
 606    async fn unregister_worktree(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::UnregisterWorktree>,
 609    ) -> Result<()> {
 610        let host_user_id;
 611        let project_id = request.payload.project_id;
 612        let worktree_id = request.payload.worktree_id;
 613        {
 614            let mut state = self.store_mut().await;
 615            let (_, guest_connection_ids) =
 616                state.unregister_worktree(project_id, worktree_id, request.sender_id)?;
 617            host_user_id = state.user_id_for_connection(request.sender_id)?;
 618            broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 619                self.peer.send(
 620                    conn_id,
 621                    proto::UnregisterWorktree {
 622                        project_id,
 623                        worktree_id,
 624                    },
 625                )
 626            });
 627        }
 628        self.update_user_contacts(host_user_id).await?;
 629        Ok(())
 630    }
 631
 632    async fn update_worktree(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::UpdateWorktree>,
 635        response: Response<proto::UpdateWorktree>,
 636    ) -> Result<()> {
 637        let connection_ids = self.store_mut().await.update_worktree(
 638            request.sender_id,
 639            request.payload.project_id,
 640            request.payload.worktree_id,
 641            &request.payload.removed_entries,
 642            &request.payload.updated_entries,
 643            request.payload.scan_id,
 644        )?;
 645
 646        broadcast(request.sender_id, connection_ids, |connection_id| {
 647            self.peer
 648                .forward_send(request.sender_id, connection_id, request.payload.clone())
 649        });
 650        response.send(proto::Ack {})?;
 651        Ok(())
 652    }
 653
 654    async fn update_diagnostic_summary(
 655        self: Arc<Server>,
 656        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 657    ) -> Result<()> {
 658        let summary = request
 659            .payload
 660            .summary
 661            .clone()
 662            .ok_or_else(|| anyhow!("invalid summary"))?;
 663        let receiver_ids = self.store_mut().await.update_diagnostic_summary(
 664            request.payload.project_id,
 665            request.payload.worktree_id,
 666            request.sender_id,
 667            summary,
 668        )?;
 669
 670        broadcast(request.sender_id, receiver_ids, |connection_id| {
 671            self.peer
 672                .forward_send(request.sender_id, connection_id, request.payload.clone())
 673        });
 674        Ok(())
 675    }
 676
 677    async fn start_language_server(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::StartLanguageServer>,
 680    ) -> Result<()> {
 681        let receiver_ids = self.store_mut().await.start_language_server(
 682            request.payload.project_id,
 683            request.sender_id,
 684            request
 685                .payload
 686                .server
 687                .clone()
 688                .ok_or_else(|| anyhow!("invalid language server"))?,
 689        )?;
 690        broadcast(request.sender_id, receiver_ids, |connection_id| {
 691            self.peer
 692                .forward_send(request.sender_id, connection_id, request.payload.clone())
 693        });
 694        Ok(())
 695    }
 696
 697    async fn update_language_server(
 698        self: Arc<Server>,
 699        request: TypedEnvelope<proto::UpdateLanguageServer>,
 700    ) -> Result<()> {
 701        let receiver_ids = self
 702            .store()
 703            .await
 704            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 705        broadcast(request.sender_id, receiver_ids, |connection_id| {
 706            self.peer
 707                .forward_send(request.sender_id, connection_id, request.payload.clone())
 708        });
 709        Ok(())
 710    }
 711
 712    async fn forward_project_request<T>(
 713        self: Arc<Server>,
 714        request: TypedEnvelope<T>,
 715        response: Response<T>,
 716    ) -> Result<()>
 717    where
 718        T: EntityMessage + RequestMessage,
 719    {
 720        let host_connection_id = self
 721            .store()
 722            .await
 723            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 724            .host_connection_id;
 725
 726        response.send(
 727            self.peer
 728                .forward_request(request.sender_id, host_connection_id, request.payload)
 729                .await?,
 730        )?;
 731        Ok(())
 732    }
 733
 734    async fn save_buffer(
 735        self: Arc<Server>,
 736        request: TypedEnvelope<proto::SaveBuffer>,
 737        response: Response<proto::SaveBuffer>,
 738    ) -> Result<()> {
 739        let host = self
 740            .store()
 741            .await
 742            .read_project(request.payload.project_id, request.sender_id)?
 743            .host_connection_id;
 744        let response_payload = self
 745            .peer
 746            .forward_request(request.sender_id, host, request.payload.clone())
 747            .await?;
 748
 749        let mut guests = self
 750            .store()
 751            .await
 752            .read_project(request.payload.project_id, request.sender_id)?
 753            .connection_ids();
 754        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 755        broadcast(host, guests, |conn_id| {
 756            self.peer
 757                .forward_send(host, conn_id, response_payload.clone())
 758        });
 759        response.send(response_payload)?;
 760        Ok(())
 761    }
 762
 763    async fn update_buffer(
 764        self: Arc<Server>,
 765        request: TypedEnvelope<proto::UpdateBuffer>,
 766        response: Response<proto::UpdateBuffer>,
 767    ) -> Result<()> {
 768        let receiver_ids = self
 769            .store()
 770            .await
 771            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 772        broadcast(request.sender_id, receiver_ids, |connection_id| {
 773            self.peer
 774                .forward_send(request.sender_id, connection_id, request.payload.clone())
 775        });
 776        response.send(proto::Ack {})?;
 777        Ok(())
 778    }
 779
 780    async fn update_buffer_file(
 781        self: Arc<Server>,
 782        request: TypedEnvelope<proto::UpdateBufferFile>,
 783    ) -> Result<()> {
 784        let receiver_ids = self
 785            .store()
 786            .await
 787            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 788        broadcast(request.sender_id, receiver_ids, |connection_id| {
 789            self.peer
 790                .forward_send(request.sender_id, connection_id, request.payload.clone())
 791        });
 792        Ok(())
 793    }
 794
 795    async fn buffer_reloaded(
 796        self: Arc<Server>,
 797        request: TypedEnvelope<proto::BufferReloaded>,
 798    ) -> Result<()> {
 799        let receiver_ids = self
 800            .store()
 801            .await
 802            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 803        broadcast(request.sender_id, receiver_ids, |connection_id| {
 804            self.peer
 805                .forward_send(request.sender_id, connection_id, request.payload.clone())
 806        });
 807        Ok(())
 808    }
 809
 810    async fn buffer_saved(
 811        self: Arc<Server>,
 812        request: TypedEnvelope<proto::BufferSaved>,
 813    ) -> Result<()> {
 814        let receiver_ids = self
 815            .store()
 816            .await
 817            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 818        broadcast(request.sender_id, receiver_ids, |connection_id| {
 819            self.peer
 820                .forward_send(request.sender_id, connection_id, request.payload.clone())
 821        });
 822        Ok(())
 823    }
 824
 825    async fn follow(
 826        self: Arc<Self>,
 827        request: TypedEnvelope<proto::Follow>,
 828        response: Response<proto::Follow>,
 829    ) -> Result<()> {
 830        let leader_id = ConnectionId(request.payload.leader_id);
 831        let follower_id = request.sender_id;
 832        if !self
 833            .store()
 834            .await
 835            .project_connection_ids(request.payload.project_id, follower_id)?
 836            .contains(&leader_id)
 837        {
 838            Err(anyhow!("no such peer"))?;
 839        }
 840        let mut response_payload = self
 841            .peer
 842            .forward_request(request.sender_id, leader_id, request.payload)
 843            .await?;
 844        response_payload
 845            .views
 846            .retain(|view| view.leader_id != Some(follower_id.0));
 847        response.send(response_payload)?;
 848        Ok(())
 849    }
 850
 851    async fn unfollow(self: Arc<Self>, request: TypedEnvelope<proto::Unfollow>) -> Result<()> {
 852        let leader_id = ConnectionId(request.payload.leader_id);
 853        if !self
 854            .store()
 855            .await
 856            .project_connection_ids(request.payload.project_id, request.sender_id)?
 857            .contains(&leader_id)
 858        {
 859            Err(anyhow!("no such peer"))?;
 860        }
 861        self.peer
 862            .forward_send(request.sender_id, leader_id, request.payload)?;
 863        Ok(())
 864    }
 865
 866    async fn update_followers(
 867        self: Arc<Self>,
 868        request: TypedEnvelope<proto::UpdateFollowers>,
 869    ) -> Result<()> {
 870        let connection_ids = self
 871            .store()
 872            .await
 873            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 874        let leader_id = request
 875            .payload
 876            .variant
 877            .as_ref()
 878            .and_then(|variant| match variant {
 879                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 880                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 881                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 882            });
 883        for follower_id in &request.payload.follower_ids {
 884            let follower_id = ConnectionId(*follower_id);
 885            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 886                self.peer
 887                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 888            }
 889        }
 890        Ok(())
 891    }
 892
 893    async fn get_channels(
 894        self: Arc<Server>,
 895        request: TypedEnvelope<proto::GetChannels>,
 896        response: Response<proto::GetChannels>,
 897    ) -> Result<()> {
 898        let user_id = self
 899            .store()
 900            .await
 901            .user_id_for_connection(request.sender_id)?;
 902        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 903        response.send(proto::GetChannelsResponse {
 904            channels: channels
 905                .into_iter()
 906                .map(|chan| proto::Channel {
 907                    id: chan.id.to_proto(),
 908                    name: chan.name,
 909                })
 910                .collect(),
 911        })?;
 912        Ok(())
 913    }
 914
 915    async fn get_users(
 916        self: Arc<Server>,
 917        request: TypedEnvelope<proto::GetUsers>,
 918        response: Response<proto::GetUsers>,
 919    ) -> Result<()> {
 920        let user_ids = request
 921            .payload
 922            .user_ids
 923            .into_iter()
 924            .map(UserId::from_proto)
 925            .collect();
 926        let users = self
 927            .app_state
 928            .db
 929            .get_users_by_ids(user_ids)
 930            .await?
 931            .into_iter()
 932            .map(|user| proto::User {
 933                id: user.id.to_proto(),
 934                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 935                github_login: user.github_login,
 936            })
 937            .collect();
 938        response.send(proto::UsersResponse { users })?;
 939        Ok(())
 940    }
 941
 942    async fn fuzzy_search_users(
 943        self: Arc<Server>,
 944        request: TypedEnvelope<proto::FuzzySearchUsers>,
 945        response: Response<proto::FuzzySearchUsers>,
 946    ) -> Result<()> {
 947        let query = request.payload.query;
 948        let db = &self.app_state.db;
 949        let users = match query.len() {
 950            0 => vec![],
 951            1 | 2 => db
 952                .get_user_by_github_login(&query)
 953                .await?
 954                .into_iter()
 955                .collect(),
 956            _ => db.fuzzy_search_users(&query, 10).await?,
 957        };
 958        let users = users
 959            .into_iter()
 960            .map(|user| proto::User {
 961                id: user.id.to_proto(),
 962                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 963                github_login: user.github_login,
 964            })
 965            .collect();
 966        response.send(proto::UsersResponse { users })?;
 967        Ok(())
 968    }
 969
 970    async fn request_contact(
 971        self: Arc<Server>,
 972        request: TypedEnvelope<proto::RequestContact>,
 973        response: Response<proto::RequestContact>,
 974    ) -> Result<()> {
 975        let requester_id = self
 976            .store()
 977            .await
 978            .user_id_for_connection(request.sender_id)?;
 979        let responder_id = UserId::from_proto(request.payload.responder_id);
 980        self.app_state
 981            .db
 982            .send_contact_request(requester_id, responder_id)
 983            .await?;
 984
 985        // Update outgoing contact requests of requester
 986        let mut update = proto::UpdateContacts::default();
 987        update.outgoing_requests.push(responder_id.to_proto());
 988        for connection_id in self.store().await.connection_ids_for_user(requester_id) {
 989            self.peer.send(connection_id, update.clone())?;
 990        }
 991
 992        // Update incoming contact requests of responder
 993        let mut update = proto::UpdateContacts::default();
 994        update
 995            .incoming_requests
 996            .push(proto::IncomingContactRequest {
 997                requester_id: requester_id.to_proto(),
 998                should_notify: true,
 999            });
1000        for connection_id in self.store().await.connection_ids_for_user(responder_id) {
1001            self.peer.send(connection_id, update.clone())?;
1002        }
1003
1004        response.send(proto::Ack {})?;
1005        Ok(())
1006    }
1007
1008    async fn respond_to_contact_request(
1009        self: Arc<Server>,
1010        request: TypedEnvelope<proto::RespondToContactRequest>,
1011        response: Response<proto::RespondToContactRequest>,
1012    ) -> Result<()> {
1013        let responder_id = self
1014            .store()
1015            .await
1016            .user_id_for_connection(request.sender_id)?;
1017        let requester_id = UserId::from_proto(request.payload.requester_id);
1018        let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32;
1019        self.app_state
1020            .db
1021            .respond_to_contact_request(responder_id, requester_id, accept)
1022            .await?;
1023
1024        let store = self.store().await;
1025        // Update responder with new contact
1026        let mut update = proto::UpdateContacts::default();
1027        if accept {
1028            update.contacts.push(store.contact_for_user(requester_id));
1029        }
1030        update
1031            .remove_incoming_requests
1032            .push(requester_id.to_proto());
1033        for connection_id in store.connection_ids_for_user(responder_id) {
1034            self.peer.send(connection_id, update.clone())?;
1035        }
1036
1037        // Update requester with new contact
1038        let mut update = proto::UpdateContacts::default();
1039        if accept {
1040            update.contacts.push(store.contact_for_user(responder_id));
1041        }
1042        update
1043            .remove_outgoing_requests
1044            .push(responder_id.to_proto());
1045        for connection_id in store.connection_ids_for_user(requester_id) {
1046            self.peer.send(connection_id, update.clone())?;
1047        }
1048
1049        response.send(proto::Ack {})?;
1050        Ok(())
1051    }
1052
1053    // #[instrument(skip(self, state, user_ids))]
1054    // fn update_contacts_for_users<'a>(
1055    //     self: &Arc<Self>,
1056    //     state: &Store,
1057    //     user_ids: impl IntoIterator<Item = &'a UserId>,
1058    // ) {
1059    //     for user_id in user_ids {
1060    //         let contacts = state.contacts_for_user(*user_id);
1061    //         for connection_id in state.connection_ids_for_user(*user_id) {
1062    //             self.peer
1063    //                 .send(
1064    //                     connection_id,
1065    //                     proto::UpdateContacts {
1066    //                         contacts: contacts.clone(),
1067    //                         pending_requests_from_user_ids: Default::default(),
1068    //                         pending_requests_to_user_ids: Default::default(),
1069    //                     },
1070    //                 )
1071    //                 .trace_err();
1072    //         }
1073    //     }
1074    // }
1075
1076    async fn join_channel(
1077        self: Arc<Self>,
1078        request: TypedEnvelope<proto::JoinChannel>,
1079        response: Response<proto::JoinChannel>,
1080    ) -> Result<()> {
1081        let user_id = self
1082            .store()
1083            .await
1084            .user_id_for_connection(request.sender_id)?;
1085        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1086        if !self
1087            .app_state
1088            .db
1089            .can_user_access_channel(user_id, channel_id)
1090            .await?
1091        {
1092            Err(anyhow!("access denied"))?;
1093        }
1094
1095        self.store_mut()
1096            .await
1097            .join_channel(request.sender_id, channel_id);
1098        let messages = self
1099            .app_state
1100            .db
1101            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
1102            .await?
1103            .into_iter()
1104            .map(|msg| proto::ChannelMessage {
1105                id: msg.id.to_proto(),
1106                body: msg.body,
1107                timestamp: msg.sent_at.unix_timestamp() as u64,
1108                sender_id: msg.sender_id.to_proto(),
1109                nonce: Some(msg.nonce.as_u128().into()),
1110            })
1111            .collect::<Vec<_>>();
1112        response.send(proto::JoinChannelResponse {
1113            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1114            messages,
1115        })?;
1116        Ok(())
1117    }
1118
1119    async fn leave_channel(
1120        self: Arc<Self>,
1121        request: TypedEnvelope<proto::LeaveChannel>,
1122    ) -> Result<()> {
1123        let user_id = self
1124            .store()
1125            .await
1126            .user_id_for_connection(request.sender_id)?;
1127        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1128        if !self
1129            .app_state
1130            .db
1131            .can_user_access_channel(user_id, channel_id)
1132            .await?
1133        {
1134            Err(anyhow!("access denied"))?;
1135        }
1136
1137        self.store_mut()
1138            .await
1139            .leave_channel(request.sender_id, channel_id);
1140
1141        Ok(())
1142    }
1143
1144    async fn send_channel_message(
1145        self: Arc<Self>,
1146        request: TypedEnvelope<proto::SendChannelMessage>,
1147        response: Response<proto::SendChannelMessage>,
1148    ) -> Result<()> {
1149        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1150        let user_id;
1151        let connection_ids;
1152        {
1153            let state = self.store().await;
1154            user_id = state.user_id_for_connection(request.sender_id)?;
1155            connection_ids = state.channel_connection_ids(channel_id)?;
1156        }
1157
1158        // Validate the message body.
1159        let body = request.payload.body.trim().to_string();
1160        if body.len() > MAX_MESSAGE_LEN {
1161            return Err(anyhow!("message is too long"))?;
1162        }
1163        if body.is_empty() {
1164            return Err(anyhow!("message can't be blank"))?;
1165        }
1166
1167        let timestamp = OffsetDateTime::now_utc();
1168        let nonce = request
1169            .payload
1170            .nonce
1171            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1172
1173        let message_id = self
1174            .app_state
1175            .db
1176            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1177            .await?
1178            .to_proto();
1179        let message = proto::ChannelMessage {
1180            sender_id: user_id.to_proto(),
1181            id: message_id,
1182            body,
1183            timestamp: timestamp.unix_timestamp() as u64,
1184            nonce: Some(nonce),
1185        };
1186        broadcast(request.sender_id, connection_ids, |conn_id| {
1187            self.peer.send(
1188                conn_id,
1189                proto::ChannelMessageSent {
1190                    channel_id: channel_id.to_proto(),
1191                    message: Some(message.clone()),
1192                },
1193            )
1194        });
1195        response.send(proto::SendChannelMessageResponse {
1196            message: Some(message),
1197        })?;
1198        Ok(())
1199    }
1200
1201    async fn get_channel_messages(
1202        self: Arc<Self>,
1203        request: TypedEnvelope<proto::GetChannelMessages>,
1204        response: Response<proto::GetChannelMessages>,
1205    ) -> Result<()> {
1206        let user_id = self
1207            .store()
1208            .await
1209            .user_id_for_connection(request.sender_id)?;
1210        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1211        if !self
1212            .app_state
1213            .db
1214            .can_user_access_channel(user_id, channel_id)
1215            .await?
1216        {
1217            Err(anyhow!("access denied"))?;
1218        }
1219
1220        let messages = self
1221            .app_state
1222            .db
1223            .get_channel_messages(
1224                channel_id,
1225                MESSAGE_COUNT_PER_PAGE,
1226                Some(MessageId::from_proto(request.payload.before_message_id)),
1227            )
1228            .await?
1229            .into_iter()
1230            .map(|msg| proto::ChannelMessage {
1231                id: msg.id.to_proto(),
1232                body: msg.body,
1233                timestamp: msg.sent_at.unix_timestamp() as u64,
1234                sender_id: msg.sender_id.to_proto(),
1235                nonce: Some(msg.nonce.as_u128().into()),
1236            })
1237            .collect::<Vec<_>>();
1238        response.send(proto::GetChannelMessagesResponse {
1239            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1240            messages,
1241        })?;
1242        Ok(())
1243    }
1244
1245    async fn store<'a>(self: &'a Arc<Self>) -> StoreReadGuard<'a> {
1246        #[cfg(test)]
1247        tokio::task::yield_now().await;
1248        let guard = self.store.read().await;
1249        #[cfg(test)]
1250        tokio::task::yield_now().await;
1251        StoreReadGuard {
1252            guard,
1253            _not_send: PhantomData,
1254        }
1255    }
1256
1257    async fn store_mut<'a>(self: &'a Arc<Self>) -> StoreWriteGuard<'a> {
1258        #[cfg(test)]
1259        tokio::task::yield_now().await;
1260        let guard = self.store.write().await;
1261        #[cfg(test)]
1262        tokio::task::yield_now().await;
1263        StoreWriteGuard {
1264            guard,
1265            _not_send: PhantomData,
1266        }
1267    }
1268}
1269
1270impl<'a> Deref for StoreReadGuard<'a> {
1271    type Target = Store;
1272
1273    fn deref(&self) -> &Self::Target {
1274        &*self.guard
1275    }
1276}
1277
1278impl<'a> Deref for StoreWriteGuard<'a> {
1279    type Target = Store;
1280
1281    fn deref(&self) -> &Self::Target {
1282        &*self.guard
1283    }
1284}
1285
1286impl<'a> DerefMut for StoreWriteGuard<'a> {
1287    fn deref_mut(&mut self) -> &mut Self::Target {
1288        &mut *self.guard
1289    }
1290}
1291
1292impl<'a> Drop for StoreWriteGuard<'a> {
1293    fn drop(&mut self) {
1294        #[cfg(test)]
1295        self.check_invariants();
1296
1297        let metrics = self.metrics();
1298        tracing::info!(
1299            connections = metrics.connections,
1300            registered_projects = metrics.registered_projects,
1301            shared_projects = metrics.shared_projects,
1302            "metrics"
1303        );
1304    }
1305}
1306
1307impl Executor for RealExecutor {
1308    type Sleep = Sleep;
1309
1310    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1311        tokio::task::spawn(future);
1312    }
1313
1314    fn sleep(&self, duration: Duration) -> Self::Sleep {
1315        tokio::time::sleep(duration)
1316    }
1317}
1318
1319fn broadcast<F>(
1320    sender_id: ConnectionId,
1321    receiver_ids: impl IntoIterator<Item = ConnectionId>,
1322    mut f: F,
1323) where
1324    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1325{
1326    for receiver_id in receiver_ids {
1327        if receiver_id != sender_id {
1328            f(receiver_id).trace_err();
1329        }
1330    }
1331}
1332
1333lazy_static! {
1334    static ref ZED_PROTOCOL_VERSION: HeaderName = HeaderName::from_static("x-zed-protocol-version");
1335}
1336
1337pub struct ProtocolVersion(u32);
1338
1339impl Header for ProtocolVersion {
1340    fn name() -> &'static HeaderName {
1341        &ZED_PROTOCOL_VERSION
1342    }
1343
1344    fn decode<'i, I>(values: &mut I) -> Result<Self, axum::headers::Error>
1345    where
1346        Self: Sized,
1347        I: Iterator<Item = &'i axum::http::HeaderValue>,
1348    {
1349        let version = values
1350            .next()
1351            .ok_or_else(|| axum::headers::Error::invalid())?
1352            .to_str()
1353            .map_err(|_| axum::headers::Error::invalid())?
1354            .parse()
1355            .map_err(|_| axum::headers::Error::invalid())?;
1356        Ok(Self(version))
1357    }
1358
1359    fn encode<E: Extend<axum::http::HeaderValue>>(&self, values: &mut E) {
1360        values.extend([self.0.to_string().parse().unwrap()]);
1361    }
1362}
1363
1364pub fn routes(app_state: Arc<AppState>) -> Router<Body> {
1365    let server = Server::new(app_state.clone(), None);
1366    Router::new()
1367        .route("/rpc", get(handle_websocket_request))
1368        .layer(
1369            ServiceBuilder::new()
1370                .layer(Extension(app_state))
1371                .layer(middleware::from_fn(auth::validate_header))
1372                .layer(Extension(server)),
1373        )
1374}
1375
1376pub async fn handle_websocket_request(
1377    TypedHeader(ProtocolVersion(protocol_version)): TypedHeader<ProtocolVersion>,
1378    ConnectInfo(socket_address): ConnectInfo<SocketAddr>,
1379    Extension(server): Extension<Arc<Server>>,
1380    Extension(user_id): Extension<UserId>,
1381    ws: WebSocketUpgrade,
1382) -> axum::response::Response {
1383    if protocol_version != rpc::PROTOCOL_VERSION {
1384        return (
1385            StatusCode::UPGRADE_REQUIRED,
1386            "client must be upgraded".to_string(),
1387        )
1388            .into_response();
1389    }
1390    let socket_address = socket_address.to_string();
1391    ws.on_upgrade(move |socket| {
1392        use util::ResultExt;
1393        let socket = socket
1394            .map_ok(to_tungstenite_message)
1395            .err_into()
1396            .with(|message| async move { Ok(to_axum_message(message)) });
1397        let connection = Connection::new(Box::pin(socket));
1398        async move {
1399            server
1400                .handle_connection(connection, socket_address, user_id, None, RealExecutor)
1401                .await
1402                .log_err();
1403        }
1404    })
1405}
1406
1407fn to_axum_message(message: TungsteniteMessage) -> AxumMessage {
1408    match message {
1409        TungsteniteMessage::Text(payload) => AxumMessage::Text(payload),
1410        TungsteniteMessage::Binary(payload) => AxumMessage::Binary(payload),
1411        TungsteniteMessage::Ping(payload) => AxumMessage::Ping(payload),
1412        TungsteniteMessage::Pong(payload) => AxumMessage::Pong(payload),
1413        TungsteniteMessage::Close(frame) => AxumMessage::Close(frame.map(|frame| AxumCloseFrame {
1414            code: frame.code.into(),
1415            reason: frame.reason,
1416        })),
1417    }
1418}
1419
1420fn to_tungstenite_message(message: AxumMessage) -> TungsteniteMessage {
1421    match message {
1422        AxumMessage::Text(payload) => TungsteniteMessage::Text(payload),
1423        AxumMessage::Binary(payload) => TungsteniteMessage::Binary(payload),
1424        AxumMessage::Ping(payload) => TungsteniteMessage::Ping(payload),
1425        AxumMessage::Pong(payload) => TungsteniteMessage::Pong(payload),
1426        AxumMessage::Close(frame) => {
1427            TungsteniteMessage::Close(frame.map(|frame| TungsteniteCloseFrame {
1428                code: frame.code.into(),
1429                reason: frame.reason,
1430            }))
1431        }
1432    }
1433}
1434
1435pub trait ResultExt {
1436    type Ok;
1437
1438    fn trace_err(self) -> Option<Self::Ok>;
1439}
1440
1441impl<T, E> ResultExt for Result<T, E>
1442where
1443    E: std::fmt::Debug,
1444{
1445    type Ok = T;
1446
1447    fn trace_err(self) -> Option<T> {
1448        match self {
1449            Ok(value) => Some(value),
1450            Err(error) => {
1451                tracing::error!("{:?}", error);
1452                None
1453            }
1454        }
1455    }
1456}
1457
1458#[cfg(test)]
1459mod tests {
1460    use super::*;
1461    use crate::{
1462        db::{tests::TestDb, UserId},
1463        AppState,
1464    };
1465    use ::rpc::Peer;
1466    use client::{
1467        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1468        EstablishConnectionError, UserStore, RECEIVE_TIMEOUT,
1469    };
1470    use collections::{BTreeMap, HashSet};
1471    use editor::{
1472        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1473        ToOffset, ToggleCodeActions, Undo,
1474    };
1475    use gpui::{
1476        executor::{self, Deterministic},
1477        geometry::vector::vec2f,
1478        ModelHandle, TestAppContext, ViewHandle,
1479    };
1480    use language::{
1481        range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
1482        LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
1483    };
1484    use lsp::{self, FakeLanguageServer};
1485    use parking_lot::Mutex;
1486    use project::{
1487        fs::{FakeFs, Fs as _},
1488        search::SearchQuery,
1489        worktree::WorktreeHandle,
1490        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1491    };
1492    use rand::prelude::*;
1493    use rpc::PeerId;
1494    use serde_json::json;
1495    use settings::Settings;
1496    use sqlx::types::time::OffsetDateTime;
1497    use std::{
1498        env,
1499        ops::Deref,
1500        path::{Path, PathBuf},
1501        rc::Rc,
1502        sync::{
1503            atomic::{AtomicBool, Ordering::SeqCst},
1504            Arc,
1505        },
1506        time::Duration,
1507    };
1508    use theme::ThemeRegistry;
1509    use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams};
1510
1511    #[cfg(test)]
1512    #[ctor::ctor]
1513    fn init_logger() {
1514        if std::env::var("RUST_LOG").is_ok() {
1515            env_logger::init();
1516        }
1517    }
1518
1519    #[gpui::test(iterations = 10)]
1520    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1521        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1522        let lang_registry = Arc::new(LanguageRegistry::test());
1523        let fs = FakeFs::new(cx_a.background());
1524        cx_a.foreground().forbid_parking();
1525
1526        // Connect to a server as 2 clients.
1527        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1528        let client_a = server.create_client(cx_a, "user_a").await;
1529        let client_b = server.create_client(cx_b, "user_b").await;
1530
1531        // Share a project as client A
1532        fs.insert_tree(
1533            "/a",
1534            json!({
1535                ".zed.toml": r#"collaborators = ["user_b"]"#,
1536                "a.txt": "a-contents",
1537                "b.txt": "b-contents",
1538            }),
1539        )
1540        .await;
1541        let project_a = cx_a.update(|cx| {
1542            Project::local(
1543                client_a.clone(),
1544                client_a.user_store.clone(),
1545                lang_registry.clone(),
1546                fs.clone(),
1547                cx,
1548            )
1549        });
1550        let (worktree_a, _) = project_a
1551            .update(cx_a, |p, cx| {
1552                p.find_or_create_local_worktree("/a", true, cx)
1553            })
1554            .await
1555            .unwrap();
1556        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1557        worktree_a
1558            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1559            .await;
1560        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1561        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1562
1563        // Join that project as client B
1564        let project_b = Project::remote(
1565            project_id,
1566            client_b.clone(),
1567            client_b.user_store.clone(),
1568            lang_registry.clone(),
1569            fs.clone(),
1570            &mut cx_b.to_async(),
1571        )
1572        .await
1573        .unwrap();
1574
1575        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1576            assert_eq!(
1577                project
1578                    .collaborators()
1579                    .get(&client_a.peer_id)
1580                    .unwrap()
1581                    .user
1582                    .github_login,
1583                "user_a"
1584            );
1585            project.replica_id()
1586        });
1587        project_a
1588            .condition(&cx_a, |tree, _| {
1589                tree.collaborators()
1590                    .get(&client_b.peer_id)
1591                    .map_or(false, |collaborator| {
1592                        collaborator.replica_id == replica_id_b
1593                            && collaborator.user.github_login == "user_b"
1594                    })
1595            })
1596            .await;
1597
1598        // Open the same file as client B and client A.
1599        let buffer_b = project_b
1600            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1601            .await
1602            .unwrap();
1603        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1604        project_a.read_with(cx_a, |project, cx| {
1605            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1606        });
1607        let buffer_a = project_a
1608            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1609            .await
1610            .unwrap();
1611
1612        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1613
1614        // TODO
1615        // // Create a selection set as client B and see that selection set as client A.
1616        // buffer_a
1617        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1618        //     .await;
1619
1620        // Edit the buffer as client B and see that edit as client A.
1621        editor_b.update(cx_b, |editor, cx| {
1622            editor.handle_input(&Input("ok, ".into()), cx)
1623        });
1624        buffer_a
1625            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1626            .await;
1627
1628        // TODO
1629        // // Remove the selection set as client B, see those selections disappear as client A.
1630        cx_b.update(move |_| drop(editor_b));
1631        // buffer_a
1632        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1633        //     .await;
1634
1635        // Dropping the client B's project removes client B from client A's collaborators.
1636        cx_b.update(move |_| drop(project_b));
1637        project_a
1638            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1639            .await;
1640    }
1641
1642    #[gpui::test(iterations = 10)]
1643    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1644        let lang_registry = Arc::new(LanguageRegistry::test());
1645        let fs = FakeFs::new(cx_a.background());
1646        cx_a.foreground().forbid_parking();
1647
1648        // Connect to a server as 2 clients.
1649        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1650        let client_a = server.create_client(cx_a, "user_a").await;
1651        let client_b = server.create_client(cx_b, "user_b").await;
1652
1653        // Share a project as client A
1654        fs.insert_tree(
1655            "/a",
1656            json!({
1657                ".zed.toml": r#"collaborators = ["user_b"]"#,
1658                "a.txt": "a-contents",
1659                "b.txt": "b-contents",
1660            }),
1661        )
1662        .await;
1663        let project_a = cx_a.update(|cx| {
1664            Project::local(
1665                client_a.clone(),
1666                client_a.user_store.clone(),
1667                lang_registry.clone(),
1668                fs.clone(),
1669                cx,
1670            )
1671        });
1672        let (worktree_a, _) = project_a
1673            .update(cx_a, |p, cx| {
1674                p.find_or_create_local_worktree("/a", true, cx)
1675            })
1676            .await
1677            .unwrap();
1678        worktree_a
1679            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1680            .await;
1681        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1682        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1683        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1684        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1685
1686        // Join that project as client B
1687        let project_b = Project::remote(
1688            project_id,
1689            client_b.clone(),
1690            client_b.user_store.clone(),
1691            lang_registry.clone(),
1692            fs.clone(),
1693            &mut cx_b.to_async(),
1694        )
1695        .await
1696        .unwrap();
1697        project_b
1698            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1699            .await
1700            .unwrap();
1701
1702        // Unshare the project as client A
1703        project_a.update(cx_a, |project, cx| project.unshare(cx));
1704        project_b
1705            .condition(cx_b, |project, _| project.is_read_only())
1706            .await;
1707        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1708        cx_b.update(|_| {
1709            drop(project_b);
1710        });
1711
1712        // Share the project again and ensure guests can still join.
1713        project_a
1714            .update(cx_a, |project, cx| project.share(cx))
1715            .await
1716            .unwrap();
1717        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1718
1719        let project_b2 = Project::remote(
1720            project_id,
1721            client_b.clone(),
1722            client_b.user_store.clone(),
1723            lang_registry.clone(),
1724            fs.clone(),
1725            &mut cx_b.to_async(),
1726        )
1727        .await
1728        .unwrap();
1729        project_b2
1730            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1731            .await
1732            .unwrap();
1733    }
1734
1735    #[gpui::test(iterations = 10)]
1736    async fn test_host_disconnect(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1737        let lang_registry = Arc::new(LanguageRegistry::test());
1738        let fs = FakeFs::new(cx_a.background());
1739        cx_a.foreground().forbid_parking();
1740
1741        // Connect to a server as 2 clients.
1742        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1743        let client_a = server.create_client(cx_a, "user_a").await;
1744        let client_b = server.create_client(cx_b, "user_b").await;
1745
1746        // Share a project as client A
1747        fs.insert_tree(
1748            "/a",
1749            json!({
1750                ".zed.toml": r#"collaborators = ["user_b"]"#,
1751                "a.txt": "a-contents",
1752                "b.txt": "b-contents",
1753            }),
1754        )
1755        .await;
1756        let project_a = cx_a.update(|cx| {
1757            Project::local(
1758                client_a.clone(),
1759                client_a.user_store.clone(),
1760                lang_registry.clone(),
1761                fs.clone(),
1762                cx,
1763            )
1764        });
1765        let (worktree_a, _) = project_a
1766            .update(cx_a, |p, cx| {
1767                p.find_or_create_local_worktree("/a", true, cx)
1768            })
1769            .await
1770            .unwrap();
1771        worktree_a
1772            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1773            .await;
1774        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1775        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1776        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1777        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1778
1779        // Join that project as client B
1780        let project_b = Project::remote(
1781            project_id,
1782            client_b.clone(),
1783            client_b.user_store.clone(),
1784            lang_registry.clone(),
1785            fs.clone(),
1786            &mut cx_b.to_async(),
1787        )
1788        .await
1789        .unwrap();
1790        project_b
1791            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1792            .await
1793            .unwrap();
1794
1795        // Drop client A's connection. Collaborators should disappear and the project should not be shown as shared.
1796        server.disconnect_client(client_a.current_user_id(cx_a));
1797        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
1798        project_a
1799            .condition(cx_a, |project, _| project.collaborators().is_empty())
1800            .await;
1801        project_a.read_with(cx_a, |project, _| assert!(!project.is_shared()));
1802        project_b
1803            .condition(cx_b, |project, _| project.is_read_only())
1804            .await;
1805        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1806        cx_b.update(|_| {
1807            drop(project_b);
1808        });
1809
1810        // Await reconnection
1811        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1812
1813        // Share the project again and ensure guests can still join.
1814        project_a
1815            .update(cx_a, |project, cx| project.share(cx))
1816            .await
1817            .unwrap();
1818        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1819
1820        let project_b2 = Project::remote(
1821            project_id,
1822            client_b.clone(),
1823            client_b.user_store.clone(),
1824            lang_registry.clone(),
1825            fs.clone(),
1826            &mut cx_b.to_async(),
1827        )
1828        .await
1829        .unwrap();
1830        project_b2
1831            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1832            .await
1833            .unwrap();
1834    }
1835
1836    #[gpui::test(iterations = 10)]
1837    async fn test_propagate_saves_and_fs_changes(
1838        cx_a: &mut TestAppContext,
1839        cx_b: &mut TestAppContext,
1840        cx_c: &mut TestAppContext,
1841    ) {
1842        let lang_registry = Arc::new(LanguageRegistry::test());
1843        let fs = FakeFs::new(cx_a.background());
1844        cx_a.foreground().forbid_parking();
1845
1846        // Connect to a server as 3 clients.
1847        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1848        let client_a = server.create_client(cx_a, "user_a").await;
1849        let client_b = server.create_client(cx_b, "user_b").await;
1850        let client_c = server.create_client(cx_c, "user_c").await;
1851
1852        // Share a worktree as client A.
1853        fs.insert_tree(
1854            "/a",
1855            json!({
1856                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1857                "file1": "",
1858                "file2": ""
1859            }),
1860        )
1861        .await;
1862        let project_a = cx_a.update(|cx| {
1863            Project::local(
1864                client_a.clone(),
1865                client_a.user_store.clone(),
1866                lang_registry.clone(),
1867                fs.clone(),
1868                cx,
1869            )
1870        });
1871        let (worktree_a, _) = project_a
1872            .update(cx_a, |p, cx| {
1873                p.find_or_create_local_worktree("/a", true, cx)
1874            })
1875            .await
1876            .unwrap();
1877        worktree_a
1878            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1879            .await;
1880        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1881        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1882        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1883
1884        // Join that worktree as clients B and C.
1885        let project_b = Project::remote(
1886            project_id,
1887            client_b.clone(),
1888            client_b.user_store.clone(),
1889            lang_registry.clone(),
1890            fs.clone(),
1891            &mut cx_b.to_async(),
1892        )
1893        .await
1894        .unwrap();
1895        let project_c = Project::remote(
1896            project_id,
1897            client_c.clone(),
1898            client_c.user_store.clone(),
1899            lang_registry.clone(),
1900            fs.clone(),
1901            &mut cx_c.to_async(),
1902        )
1903        .await
1904        .unwrap();
1905        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1906        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1907
1908        // Open and edit a buffer as both guests B and C.
1909        let buffer_b = project_b
1910            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1911            .await
1912            .unwrap();
1913        let buffer_c = project_c
1914            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1915            .await
1916            .unwrap();
1917        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "i-am-b, ")], cx));
1918        buffer_c.update(cx_c, |buf, cx| buf.edit([(0..0, "i-am-c, ")], cx));
1919
1920        // Open and edit that buffer as the host.
1921        let buffer_a = project_a
1922            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1923            .await
1924            .unwrap();
1925
1926        buffer_a
1927            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1928            .await;
1929        buffer_a.update(cx_a, |buf, cx| {
1930            buf.edit([(buf.len()..buf.len(), "i-am-a")], cx)
1931        });
1932
1933        // Wait for edits to propagate
1934        buffer_a
1935            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1936            .await;
1937        buffer_b
1938            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1939            .await;
1940        buffer_c
1941            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1942            .await;
1943
1944        // Edit the buffer as the host and concurrently save as guest B.
1945        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1946        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "hi-a, ")], cx));
1947        save_b.await.unwrap();
1948        assert_eq!(
1949            fs.load("/a/file1".as_ref()).await.unwrap(),
1950            "hi-a, i-am-c, i-am-b, i-am-a"
1951        );
1952        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1953        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1954        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1955
1956        worktree_a.flush_fs_events(cx_a).await;
1957
1958        // Make changes on host's file system, see those changes on guest worktrees.
1959        fs.rename(
1960            "/a/file1".as_ref(),
1961            "/a/file1-renamed".as_ref(),
1962            Default::default(),
1963        )
1964        .await
1965        .unwrap();
1966
1967        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1968            .await
1969            .unwrap();
1970        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1971
1972        worktree_a
1973            .condition(&cx_a, |tree, _| {
1974                tree.paths()
1975                    .map(|p| p.to_string_lossy())
1976                    .collect::<Vec<_>>()
1977                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1978            })
1979            .await;
1980        worktree_b
1981            .condition(&cx_b, |tree, _| {
1982                tree.paths()
1983                    .map(|p| p.to_string_lossy())
1984                    .collect::<Vec<_>>()
1985                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1986            })
1987            .await;
1988        worktree_c
1989            .condition(&cx_c, |tree, _| {
1990                tree.paths()
1991                    .map(|p| p.to_string_lossy())
1992                    .collect::<Vec<_>>()
1993                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1994            })
1995            .await;
1996
1997        // Ensure buffer files are updated as well.
1998        buffer_a
1999            .condition(&cx_a, |buf, _| {
2000                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2001            })
2002            .await;
2003        buffer_b
2004            .condition(&cx_b, |buf, _| {
2005                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2006            })
2007            .await;
2008        buffer_c
2009            .condition(&cx_c, |buf, _| {
2010                buf.file().unwrap().path().to_str() == Some("file1-renamed")
2011            })
2012            .await;
2013    }
2014
2015    #[gpui::test(iterations = 10)]
2016    async fn test_fs_operations(
2017        executor: Arc<Deterministic>,
2018        cx_a: &mut TestAppContext,
2019        cx_b: &mut TestAppContext,
2020    ) {
2021        executor.forbid_parking();
2022        let fs = FakeFs::new(cx_a.background());
2023
2024        // Connect to a server as 2 clients.
2025        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2026        let mut client_a = server.create_client(cx_a, "user_a").await;
2027        let mut client_b = server.create_client(cx_b, "user_b").await;
2028
2029        // Share a project as client A
2030        fs.insert_tree(
2031            "/dir",
2032            json!({
2033                ".zed.toml": r#"collaborators = ["user_b"]"#,
2034                "a.txt": "a-contents",
2035                "b.txt": "b-contents",
2036            }),
2037        )
2038        .await;
2039
2040        let (project_a, worktree_id) = client_a.build_local_project(fs, "/dir", cx_a).await;
2041        let project_id = project_a.read_with(cx_a, |project, _| project.remote_id().unwrap());
2042        project_a
2043            .update(cx_a, |project, cx| project.share(cx))
2044            .await
2045            .unwrap();
2046
2047        let project_b = client_b.build_remote_project(project_id, cx_b).await;
2048
2049        let worktree_a =
2050            project_a.read_with(cx_a, |project, cx| project.worktrees(cx).next().unwrap());
2051        let worktree_b =
2052            project_b.read_with(cx_b, |project, cx| project.worktrees(cx).next().unwrap());
2053
2054        let entry = project_b
2055            .update(cx_b, |project, cx| {
2056                project
2057                    .create_entry((worktree_id, "c.txt"), false, cx)
2058                    .unwrap()
2059            })
2060            .await
2061            .unwrap();
2062        worktree_a.read_with(cx_a, |worktree, _| {
2063            assert_eq!(
2064                worktree
2065                    .paths()
2066                    .map(|p| p.to_string_lossy())
2067                    .collect::<Vec<_>>(),
2068                [".zed.toml", "a.txt", "b.txt", "c.txt"]
2069            );
2070        });
2071        worktree_b.read_with(cx_b, |worktree, _| {
2072            assert_eq!(
2073                worktree
2074                    .paths()
2075                    .map(|p| p.to_string_lossy())
2076                    .collect::<Vec<_>>(),
2077                [".zed.toml", "a.txt", "b.txt", "c.txt"]
2078            );
2079        });
2080
2081        project_b
2082            .update(cx_b, |project, cx| {
2083                project.rename_entry(entry.id, Path::new("d.txt"), cx)
2084            })
2085            .unwrap()
2086            .await
2087            .unwrap();
2088        worktree_a.read_with(cx_a, |worktree, _| {
2089            assert_eq!(
2090                worktree
2091                    .paths()
2092                    .map(|p| p.to_string_lossy())
2093                    .collect::<Vec<_>>(),
2094                [".zed.toml", "a.txt", "b.txt", "d.txt"]
2095            );
2096        });
2097        worktree_b.read_with(cx_b, |worktree, _| {
2098            assert_eq!(
2099                worktree
2100                    .paths()
2101                    .map(|p| p.to_string_lossy())
2102                    .collect::<Vec<_>>(),
2103                [".zed.toml", "a.txt", "b.txt", "d.txt"]
2104            );
2105        });
2106
2107        let dir_entry = project_b
2108            .update(cx_b, |project, cx| {
2109                project
2110                    .create_entry((worktree_id, "DIR"), true, cx)
2111                    .unwrap()
2112            })
2113            .await
2114            .unwrap();
2115        worktree_a.read_with(cx_a, |worktree, _| {
2116            assert_eq!(
2117                worktree
2118                    .paths()
2119                    .map(|p| p.to_string_lossy())
2120                    .collect::<Vec<_>>(),
2121                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
2122            );
2123        });
2124        worktree_b.read_with(cx_b, |worktree, _| {
2125            assert_eq!(
2126                worktree
2127                    .paths()
2128                    .map(|p| p.to_string_lossy())
2129                    .collect::<Vec<_>>(),
2130                [".zed.toml", "DIR", "a.txt", "b.txt", "d.txt"]
2131            );
2132        });
2133
2134        project_b
2135            .update(cx_b, |project, cx| {
2136                project.delete_entry(dir_entry.id, cx).unwrap()
2137            })
2138            .await
2139            .unwrap();
2140        worktree_a.read_with(cx_a, |worktree, _| {
2141            assert_eq!(
2142                worktree
2143                    .paths()
2144                    .map(|p| p.to_string_lossy())
2145                    .collect::<Vec<_>>(),
2146                [".zed.toml", "a.txt", "b.txt", "d.txt"]
2147            );
2148        });
2149        worktree_b.read_with(cx_b, |worktree, _| {
2150            assert_eq!(
2151                worktree
2152                    .paths()
2153                    .map(|p| p.to_string_lossy())
2154                    .collect::<Vec<_>>(),
2155                [".zed.toml", "a.txt", "b.txt", "d.txt"]
2156            );
2157        });
2158
2159        project_b
2160            .update(cx_b, |project, cx| {
2161                project.delete_entry(entry.id, cx).unwrap()
2162            })
2163            .await
2164            .unwrap();
2165        worktree_a.read_with(cx_a, |worktree, _| {
2166            assert_eq!(
2167                worktree
2168                    .paths()
2169                    .map(|p| p.to_string_lossy())
2170                    .collect::<Vec<_>>(),
2171                [".zed.toml", "a.txt", "b.txt"]
2172            );
2173        });
2174        worktree_b.read_with(cx_b, |worktree, _| {
2175            assert_eq!(
2176                worktree
2177                    .paths()
2178                    .map(|p| p.to_string_lossy())
2179                    .collect::<Vec<_>>(),
2180                [".zed.toml", "a.txt", "b.txt"]
2181            );
2182        });
2183    }
2184
2185    #[gpui::test(iterations = 10)]
2186    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2187        cx_a.foreground().forbid_parking();
2188        let lang_registry = Arc::new(LanguageRegistry::test());
2189        let fs = FakeFs::new(cx_a.background());
2190
2191        // Connect to a server as 2 clients.
2192        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2193        let client_a = server.create_client(cx_a, "user_a").await;
2194        let client_b = server.create_client(cx_b, "user_b").await;
2195
2196        // Share a project as client A
2197        fs.insert_tree(
2198            "/dir",
2199            json!({
2200                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2201                "a.txt": "a-contents",
2202            }),
2203        )
2204        .await;
2205
2206        let project_a = cx_a.update(|cx| {
2207            Project::local(
2208                client_a.clone(),
2209                client_a.user_store.clone(),
2210                lang_registry.clone(),
2211                fs.clone(),
2212                cx,
2213            )
2214        });
2215        let (worktree_a, _) = project_a
2216            .update(cx_a, |p, cx| {
2217                p.find_or_create_local_worktree("/dir", true, cx)
2218            })
2219            .await
2220            .unwrap();
2221        worktree_a
2222            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2223            .await;
2224        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2225        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2226        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2227
2228        // Join that project as client B
2229        let project_b = Project::remote(
2230            project_id,
2231            client_b.clone(),
2232            client_b.user_store.clone(),
2233            lang_registry.clone(),
2234            fs.clone(),
2235            &mut cx_b.to_async(),
2236        )
2237        .await
2238        .unwrap();
2239
2240        // Open a buffer as client B
2241        let buffer_b = project_b
2242            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2243            .await
2244            .unwrap();
2245
2246        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "world ")], cx));
2247        buffer_b.read_with(cx_b, |buf, _| {
2248            assert!(buf.is_dirty());
2249            assert!(!buf.has_conflict());
2250        });
2251
2252        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
2253        buffer_b
2254            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
2255            .await;
2256        buffer_b.read_with(cx_b, |buf, _| {
2257            assert!(!buf.has_conflict());
2258        });
2259
2260        buffer_b.update(cx_b, |buf, cx| buf.edit([(0..0, "hello ")], cx));
2261        buffer_b.read_with(cx_b, |buf, _| {
2262            assert!(buf.is_dirty());
2263            assert!(!buf.has_conflict());
2264        });
2265    }
2266
2267    #[gpui::test(iterations = 10)]
2268    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2269        cx_a.foreground().forbid_parking();
2270        let lang_registry = Arc::new(LanguageRegistry::test());
2271        let fs = FakeFs::new(cx_a.background());
2272
2273        // Connect to a server as 2 clients.
2274        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2275        let client_a = server.create_client(cx_a, "user_a").await;
2276        let client_b = server.create_client(cx_b, "user_b").await;
2277
2278        // Share a project as client A
2279        fs.insert_tree(
2280            "/dir",
2281            json!({
2282                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
2283                "a.txt": "a-contents",
2284            }),
2285        )
2286        .await;
2287
2288        let project_a = cx_a.update(|cx| {
2289            Project::local(
2290                client_a.clone(),
2291                client_a.user_store.clone(),
2292                lang_registry.clone(),
2293                fs.clone(),
2294                cx,
2295            )
2296        });
2297        let (worktree_a, _) = project_a
2298            .update(cx_a, |p, cx| {
2299                p.find_or_create_local_worktree("/dir", true, cx)
2300            })
2301            .await
2302            .unwrap();
2303        worktree_a
2304            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2305            .await;
2306        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2307        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2308        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2309
2310        // Join that project as client B
2311        let project_b = Project::remote(
2312            project_id,
2313            client_b.clone(),
2314            client_b.user_store.clone(),
2315            lang_registry.clone(),
2316            fs.clone(),
2317            &mut cx_b.to_async(),
2318        )
2319        .await
2320        .unwrap();
2321        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
2322
2323        // Open a buffer as client B
2324        let buffer_b = project_b
2325            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2326            .await
2327            .unwrap();
2328        buffer_b.read_with(cx_b, |buf, _| {
2329            assert!(!buf.is_dirty());
2330            assert!(!buf.has_conflict());
2331        });
2332
2333        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
2334            .await
2335            .unwrap();
2336        buffer_b
2337            .condition(&cx_b, |buf, _| {
2338                buf.text() == "new contents" && !buf.is_dirty()
2339            })
2340            .await;
2341        buffer_b.read_with(cx_b, |buf, _| {
2342            assert!(!buf.has_conflict());
2343        });
2344    }
2345
2346    #[gpui::test(iterations = 10)]
2347    async fn test_editing_while_guest_opens_buffer(
2348        cx_a: &mut TestAppContext,
2349        cx_b: &mut TestAppContext,
2350    ) {
2351        cx_a.foreground().forbid_parking();
2352        let lang_registry = Arc::new(LanguageRegistry::test());
2353        let fs = FakeFs::new(cx_a.background());
2354
2355        // Connect to a server as 2 clients.
2356        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2357        let client_a = server.create_client(cx_a, "user_a").await;
2358        let client_b = server.create_client(cx_b, "user_b").await;
2359
2360        // Share a project as client A
2361        fs.insert_tree(
2362            "/dir",
2363            json!({
2364                ".zed.toml": r#"collaborators = ["user_b"]"#,
2365                "a.txt": "a-contents",
2366            }),
2367        )
2368        .await;
2369        let project_a = cx_a.update(|cx| {
2370            Project::local(
2371                client_a.clone(),
2372                client_a.user_store.clone(),
2373                lang_registry.clone(),
2374                fs.clone(),
2375                cx,
2376            )
2377        });
2378        let (worktree_a, _) = project_a
2379            .update(cx_a, |p, cx| {
2380                p.find_or_create_local_worktree("/dir", true, cx)
2381            })
2382            .await
2383            .unwrap();
2384        worktree_a
2385            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2386            .await;
2387        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2388        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2389        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2390
2391        // Join that project as client B
2392        let project_b = Project::remote(
2393            project_id,
2394            client_b.clone(),
2395            client_b.user_store.clone(),
2396            lang_registry.clone(),
2397            fs.clone(),
2398            &mut cx_b.to_async(),
2399        )
2400        .await
2401        .unwrap();
2402
2403        // Open a buffer as client A
2404        let buffer_a = project_a
2405            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
2406            .await
2407            .unwrap();
2408
2409        // Start opening the same buffer as client B
2410        let buffer_b = cx_b
2411            .background()
2412            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2413
2414        // Edit the buffer as client A while client B is still opening it.
2415        cx_b.background().simulate_random_delay().await;
2416        buffer_a.update(cx_a, |buf, cx| buf.edit([(0..0, "X")], cx));
2417        cx_b.background().simulate_random_delay().await;
2418        buffer_a.update(cx_a, |buf, cx| buf.edit([(1..1, "Y")], cx));
2419
2420        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
2421        let buffer_b = buffer_b.await.unwrap();
2422        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
2423    }
2424
2425    #[gpui::test(iterations = 10)]
2426    async fn test_leaving_worktree_while_opening_buffer(
2427        cx_a: &mut TestAppContext,
2428        cx_b: &mut TestAppContext,
2429    ) {
2430        cx_a.foreground().forbid_parking();
2431        let lang_registry = Arc::new(LanguageRegistry::test());
2432        let fs = FakeFs::new(cx_a.background());
2433
2434        // Connect to a server as 2 clients.
2435        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2436        let client_a = server.create_client(cx_a, "user_a").await;
2437        let client_b = server.create_client(cx_b, "user_b").await;
2438
2439        // Share a project as client A
2440        fs.insert_tree(
2441            "/dir",
2442            json!({
2443                ".zed.toml": r#"collaborators = ["user_b"]"#,
2444                "a.txt": "a-contents",
2445            }),
2446        )
2447        .await;
2448        let project_a = cx_a.update(|cx| {
2449            Project::local(
2450                client_a.clone(),
2451                client_a.user_store.clone(),
2452                lang_registry.clone(),
2453                fs.clone(),
2454                cx,
2455            )
2456        });
2457        let (worktree_a, _) = project_a
2458            .update(cx_a, |p, cx| {
2459                p.find_or_create_local_worktree("/dir", true, cx)
2460            })
2461            .await
2462            .unwrap();
2463        worktree_a
2464            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2465            .await;
2466        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2467        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2468        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2469
2470        // Join that project as client B
2471        let project_b = Project::remote(
2472            project_id,
2473            client_b.clone(),
2474            client_b.user_store.clone(),
2475            lang_registry.clone(),
2476            fs.clone(),
2477            &mut cx_b.to_async(),
2478        )
2479        .await
2480        .unwrap();
2481
2482        // See that a guest has joined as client A.
2483        project_a
2484            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2485            .await;
2486
2487        // Begin opening a buffer as client B, but leave the project before the open completes.
2488        let buffer_b = cx_b
2489            .background()
2490            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2491        cx_b.update(|_| drop(project_b));
2492        drop(buffer_b);
2493
2494        // See that the guest has left.
2495        project_a
2496            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2497            .await;
2498    }
2499
2500    #[gpui::test(iterations = 10)]
2501    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2502        cx_a.foreground().forbid_parking();
2503        let lang_registry = Arc::new(LanguageRegistry::test());
2504        let fs = FakeFs::new(cx_a.background());
2505
2506        // Connect to a server as 2 clients.
2507        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2508        let client_a = server.create_client(cx_a, "user_a").await;
2509        let client_b = server.create_client(cx_b, "user_b").await;
2510
2511        // Share a project as client A
2512        fs.insert_tree(
2513            "/a",
2514            json!({
2515                ".zed.toml": r#"collaborators = ["user_b"]"#,
2516                "a.txt": "a-contents",
2517                "b.txt": "b-contents",
2518            }),
2519        )
2520        .await;
2521        let project_a = cx_a.update(|cx| {
2522            Project::local(
2523                client_a.clone(),
2524                client_a.user_store.clone(),
2525                lang_registry.clone(),
2526                fs.clone(),
2527                cx,
2528            )
2529        });
2530        let (worktree_a, _) = project_a
2531            .update(cx_a, |p, cx| {
2532                p.find_or_create_local_worktree("/a", true, cx)
2533            })
2534            .await
2535            .unwrap();
2536        worktree_a
2537            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2538            .await;
2539        let project_id = project_a
2540            .update(cx_a, |project, _| project.next_remote_id())
2541            .await;
2542        project_a
2543            .update(cx_a, |project, cx| project.share(cx))
2544            .await
2545            .unwrap();
2546
2547        // Join that project as client B
2548        let _project_b = Project::remote(
2549            project_id,
2550            client_b.clone(),
2551            client_b.user_store.clone(),
2552            lang_registry.clone(),
2553            fs.clone(),
2554            &mut cx_b.to_async(),
2555        )
2556        .await
2557        .unwrap();
2558
2559        // Client A sees that a guest has joined.
2560        project_a
2561            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2562            .await;
2563
2564        // Drop client B's connection and ensure client A observes client B leaving the project.
2565        client_b.disconnect(&cx_b.to_async()).unwrap();
2566        project_a
2567            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2568            .await;
2569
2570        // Rejoin the project as client B
2571        let _project_b = Project::remote(
2572            project_id,
2573            client_b.clone(),
2574            client_b.user_store.clone(),
2575            lang_registry.clone(),
2576            fs.clone(),
2577            &mut cx_b.to_async(),
2578        )
2579        .await
2580        .unwrap();
2581
2582        // Client A sees that a guest has re-joined.
2583        project_a
2584            .condition(cx_a, |p, _| p.collaborators().len() == 1)
2585            .await;
2586
2587        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
2588        client_b.wait_for_current_user(cx_b).await;
2589        server.disconnect_client(client_b.current_user_id(cx_b));
2590        cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
2591        project_a
2592            .condition(cx_a, |p, _| p.collaborators().len() == 0)
2593            .await;
2594    }
2595
2596    #[gpui::test(iterations = 10)]
2597    async fn test_collaborating_with_diagnostics(
2598        cx_a: &mut TestAppContext,
2599        cx_b: &mut TestAppContext,
2600    ) {
2601        cx_a.foreground().forbid_parking();
2602        let lang_registry = Arc::new(LanguageRegistry::test());
2603        let fs = FakeFs::new(cx_a.background());
2604
2605        // Set up a fake language server.
2606        let mut language = Language::new(
2607            LanguageConfig {
2608                name: "Rust".into(),
2609                path_suffixes: vec!["rs".to_string()],
2610                ..Default::default()
2611            },
2612            Some(tree_sitter_rust::language()),
2613        );
2614        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
2615        lang_registry.add(Arc::new(language));
2616
2617        // Connect to a server as 2 clients.
2618        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2619        let client_a = server.create_client(cx_a, "user_a").await;
2620        let client_b = server.create_client(cx_b, "user_b").await;
2621
2622        // Share a project as client A
2623        fs.insert_tree(
2624            "/a",
2625            json!({
2626                ".zed.toml": r#"collaborators = ["user_b"]"#,
2627                "a.rs": "let one = two",
2628                "other.rs": "",
2629            }),
2630        )
2631        .await;
2632        let project_a = cx_a.update(|cx| {
2633            Project::local(
2634                client_a.clone(),
2635                client_a.user_store.clone(),
2636                lang_registry.clone(),
2637                fs.clone(),
2638                cx,
2639            )
2640        });
2641        let (worktree_a, _) = project_a
2642            .update(cx_a, |p, cx| {
2643                p.find_or_create_local_worktree("/a", true, cx)
2644            })
2645            .await
2646            .unwrap();
2647        worktree_a
2648            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2649            .await;
2650        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2651        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2652        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2653
2654        // Cause the language server to start.
2655        let _ = cx_a
2656            .background()
2657            .spawn(project_a.update(cx_a, |project, cx| {
2658                project.open_buffer(
2659                    ProjectPath {
2660                        worktree_id,
2661                        path: Path::new("other.rs").into(),
2662                    },
2663                    cx,
2664                )
2665            }))
2666            .await
2667            .unwrap();
2668
2669        // Simulate a language server reporting errors for a file.
2670        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2671        fake_language_server
2672            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2673            .await;
2674        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2675            lsp::PublishDiagnosticsParams {
2676                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2677                version: None,
2678                diagnostics: vec![lsp::Diagnostic {
2679                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2680                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2681                    message: "message 1".to_string(),
2682                    ..Default::default()
2683                }],
2684            },
2685        );
2686
2687        // Wait for server to see the diagnostics update.
2688        server
2689            .condition(|store| {
2690                let worktree = store
2691                    .project(project_id)
2692                    .unwrap()
2693                    .share
2694                    .as_ref()
2695                    .unwrap()
2696                    .worktrees
2697                    .get(&worktree_id.to_proto())
2698                    .unwrap();
2699
2700                !worktree.diagnostic_summaries.is_empty()
2701            })
2702            .await;
2703
2704        // Join the worktree as client B.
2705        let project_b = Project::remote(
2706            project_id,
2707            client_b.clone(),
2708            client_b.user_store.clone(),
2709            lang_registry.clone(),
2710            fs.clone(),
2711            &mut cx_b.to_async(),
2712        )
2713        .await
2714        .unwrap();
2715
2716        project_b.read_with(cx_b, |project, cx| {
2717            assert_eq!(
2718                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2719                &[(
2720                    ProjectPath {
2721                        worktree_id,
2722                        path: Arc::from(Path::new("a.rs")),
2723                    },
2724                    DiagnosticSummary {
2725                        error_count: 1,
2726                        warning_count: 0,
2727                        ..Default::default()
2728                    },
2729                )]
2730            )
2731        });
2732
2733        // Simulate a language server reporting more errors for a file.
2734        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2735            lsp::PublishDiagnosticsParams {
2736                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2737                version: None,
2738                diagnostics: vec![
2739                    lsp::Diagnostic {
2740                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2741                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2742                        message: "message 1".to_string(),
2743                        ..Default::default()
2744                    },
2745                    lsp::Diagnostic {
2746                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2747                        range: lsp::Range::new(
2748                            lsp::Position::new(0, 10),
2749                            lsp::Position::new(0, 13),
2750                        ),
2751                        message: "message 2".to_string(),
2752                        ..Default::default()
2753                    },
2754                ],
2755            },
2756        );
2757
2758        // Client b gets the updated summaries
2759        project_b
2760            .condition(&cx_b, |project, cx| {
2761                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2762                    == &[(
2763                        ProjectPath {
2764                            worktree_id,
2765                            path: Arc::from(Path::new("a.rs")),
2766                        },
2767                        DiagnosticSummary {
2768                            error_count: 1,
2769                            warning_count: 1,
2770                            ..Default::default()
2771                        },
2772                    )]
2773            })
2774            .await;
2775
2776        // Open the file with the errors on client B. They should be present.
2777        let buffer_b = cx_b
2778            .background()
2779            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2780            .await
2781            .unwrap();
2782
2783        buffer_b.read_with(cx_b, |buffer, _| {
2784            assert_eq!(
2785                buffer
2786                    .snapshot()
2787                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2788                    .map(|entry| entry)
2789                    .collect::<Vec<_>>(),
2790                &[
2791                    DiagnosticEntry {
2792                        range: Point::new(0, 4)..Point::new(0, 7),
2793                        diagnostic: Diagnostic {
2794                            group_id: 0,
2795                            message: "message 1".to_string(),
2796                            severity: lsp::DiagnosticSeverity::ERROR,
2797                            is_primary: true,
2798                            ..Default::default()
2799                        }
2800                    },
2801                    DiagnosticEntry {
2802                        range: Point::new(0, 10)..Point::new(0, 13),
2803                        diagnostic: Diagnostic {
2804                            group_id: 1,
2805                            severity: lsp::DiagnosticSeverity::WARNING,
2806                            message: "message 2".to_string(),
2807                            is_primary: true,
2808                            ..Default::default()
2809                        }
2810                    }
2811                ]
2812            );
2813        });
2814
2815        // Simulate a language server reporting no errors for a file.
2816        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2817            lsp::PublishDiagnosticsParams {
2818                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2819                version: None,
2820                diagnostics: vec![],
2821            },
2822        );
2823        project_a
2824            .condition(cx_a, |project, cx| {
2825                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2826            })
2827            .await;
2828        project_b
2829            .condition(cx_b, |project, cx| {
2830                project.diagnostic_summaries(cx).collect::<Vec<_>>() == &[]
2831            })
2832            .await;
2833    }
2834
2835    #[gpui::test(iterations = 10)]
2836    async fn test_collaborating_with_completion(
2837        cx_a: &mut TestAppContext,
2838        cx_b: &mut TestAppContext,
2839    ) {
2840        cx_a.foreground().forbid_parking();
2841        let lang_registry = Arc::new(LanguageRegistry::test());
2842        let fs = FakeFs::new(cx_a.background());
2843
2844        // Set up a fake language server.
2845        let mut language = Language::new(
2846            LanguageConfig {
2847                name: "Rust".into(),
2848                path_suffixes: vec!["rs".to_string()],
2849                ..Default::default()
2850            },
2851            Some(tree_sitter_rust::language()),
2852        );
2853        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
2854            capabilities: lsp::ServerCapabilities {
2855                completion_provider: Some(lsp::CompletionOptions {
2856                    trigger_characters: Some(vec![".".to_string()]),
2857                    ..Default::default()
2858                }),
2859                ..Default::default()
2860            },
2861            ..Default::default()
2862        });
2863        lang_registry.add(Arc::new(language));
2864
2865        // Connect to a server as 2 clients.
2866        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2867        let client_a = server.create_client(cx_a, "user_a").await;
2868        let client_b = server.create_client(cx_b, "user_b").await;
2869
2870        // Share a project as client A
2871        fs.insert_tree(
2872            "/a",
2873            json!({
2874                ".zed.toml": r#"collaborators = ["user_b"]"#,
2875                "main.rs": "fn main() { a }",
2876                "other.rs": "",
2877            }),
2878        )
2879        .await;
2880        let project_a = cx_a.update(|cx| {
2881            Project::local(
2882                client_a.clone(),
2883                client_a.user_store.clone(),
2884                lang_registry.clone(),
2885                fs.clone(),
2886                cx,
2887            )
2888        });
2889        let (worktree_a, _) = project_a
2890            .update(cx_a, |p, cx| {
2891                p.find_or_create_local_worktree("/a", true, cx)
2892            })
2893            .await
2894            .unwrap();
2895        worktree_a
2896            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2897            .await;
2898        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2899        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2900        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2901
2902        // Join the worktree as client B.
2903        let project_b = Project::remote(
2904            project_id,
2905            client_b.clone(),
2906            client_b.user_store.clone(),
2907            lang_registry.clone(),
2908            fs.clone(),
2909            &mut cx_b.to_async(),
2910        )
2911        .await
2912        .unwrap();
2913
2914        // Open a file in an editor as the guest.
2915        let buffer_b = project_b
2916            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2917            .await
2918            .unwrap();
2919        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2920        let editor_b = cx_b.add_view(window_b, |cx| {
2921            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2922        });
2923
2924        let fake_language_server = fake_language_servers.next().await.unwrap();
2925        buffer_b
2926            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2927            .await;
2928
2929        // Type a completion trigger character as the guest.
2930        editor_b.update(cx_b, |editor, cx| {
2931            editor.select_ranges([13..13], None, cx);
2932            editor.handle_input(&Input(".".into()), cx);
2933            cx.focus(&editor_b);
2934        });
2935
2936        // Receive a completion request as the host's language server.
2937        // Return some completions from the host's language server.
2938        cx_a.foreground().start_waiting();
2939        fake_language_server
2940            .handle_request::<lsp::request::Completion, _, _>(|params, _| async move {
2941                assert_eq!(
2942                    params.text_document_position.text_document.uri,
2943                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2944                );
2945                assert_eq!(
2946                    params.text_document_position.position,
2947                    lsp::Position::new(0, 14),
2948                );
2949
2950                Ok(Some(lsp::CompletionResponse::Array(vec![
2951                    lsp::CompletionItem {
2952                        label: "first_method(…)".into(),
2953                        detail: Some("fn(&mut self, B) -> C".into()),
2954                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2955                            new_text: "first_method($1)".to_string(),
2956                            range: lsp::Range::new(
2957                                lsp::Position::new(0, 14),
2958                                lsp::Position::new(0, 14),
2959                            ),
2960                        })),
2961                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2962                        ..Default::default()
2963                    },
2964                    lsp::CompletionItem {
2965                        label: "second_method(…)".into(),
2966                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2967                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2968                            new_text: "second_method()".to_string(),
2969                            range: lsp::Range::new(
2970                                lsp::Position::new(0, 14),
2971                                lsp::Position::new(0, 14),
2972                            ),
2973                        })),
2974                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2975                        ..Default::default()
2976                    },
2977                ])))
2978            })
2979            .next()
2980            .await
2981            .unwrap();
2982        cx_a.foreground().finish_waiting();
2983
2984        // Open the buffer on the host.
2985        let buffer_a = project_a
2986            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2987            .await
2988            .unwrap();
2989        buffer_a
2990            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2991            .await;
2992
2993        // Confirm a completion on the guest.
2994        editor_b
2995            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2996            .await;
2997        editor_b.update(cx_b, |editor, cx| {
2998            editor.confirm_completion(&ConfirmCompletion { item_ix: Some(0) }, cx);
2999            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
3000        });
3001
3002        // Return a resolved completion from the host's language server.
3003        // The resolved completion has an additional text edit.
3004        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _, _>(
3005            |params, _| async move {
3006                assert_eq!(params.label, "first_method(…)");
3007                Ok(lsp::CompletionItem {
3008                    label: "first_method(…)".into(),
3009                    detail: Some("fn(&mut self, B) -> C".into()),
3010                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3011                        new_text: "first_method($1)".to_string(),
3012                        range: lsp::Range::new(
3013                            lsp::Position::new(0, 14),
3014                            lsp::Position::new(0, 14),
3015                        ),
3016                    })),
3017                    additional_text_edits: Some(vec![lsp::TextEdit {
3018                        new_text: "use d::SomeTrait;\n".to_string(),
3019                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3020                    }]),
3021                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
3022                    ..Default::default()
3023                })
3024            },
3025        );
3026
3027        // The additional edit is applied.
3028        buffer_a
3029            .condition(&cx_a, |buffer, _| {
3030                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3031            })
3032            .await;
3033        buffer_b
3034            .condition(&cx_b, |buffer, _| {
3035                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
3036            })
3037            .await;
3038    }
3039
3040    #[gpui::test(iterations = 10)]
3041    async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3042        cx_a.foreground().forbid_parking();
3043        let lang_registry = Arc::new(LanguageRegistry::test());
3044        let fs = FakeFs::new(cx_a.background());
3045
3046        // Connect to a server as 2 clients.
3047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3048        let client_a = server.create_client(cx_a, "user_a").await;
3049        let client_b = server.create_client(cx_b, "user_b").await;
3050
3051        // Share a project as client A
3052        fs.insert_tree(
3053            "/a",
3054            json!({
3055                ".zed.toml": r#"collaborators = ["user_b"]"#,
3056                "a.rs": "let one = 1;",
3057            }),
3058        )
3059        .await;
3060        let project_a = cx_a.update(|cx| {
3061            Project::local(
3062                client_a.clone(),
3063                client_a.user_store.clone(),
3064                lang_registry.clone(),
3065                fs.clone(),
3066                cx,
3067            )
3068        });
3069        let (worktree_a, _) = project_a
3070            .update(cx_a, |p, cx| {
3071                p.find_or_create_local_worktree("/a", true, cx)
3072            })
3073            .await
3074            .unwrap();
3075        worktree_a
3076            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3077            .await;
3078        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3079        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3080        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3081        let buffer_a = project_a
3082            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx))
3083            .await
3084            .unwrap();
3085
3086        // Join the worktree as client B.
3087        let project_b = Project::remote(
3088            project_id,
3089            client_b.clone(),
3090            client_b.user_store.clone(),
3091            lang_registry.clone(),
3092            fs.clone(),
3093            &mut cx_b.to_async(),
3094        )
3095        .await
3096        .unwrap();
3097
3098        let buffer_b = cx_b
3099            .background()
3100            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3101            .await
3102            .unwrap();
3103        buffer_b.update(cx_b, |buffer, cx| {
3104            buffer.edit([(4..7, "six")], cx);
3105            buffer.edit([(10..11, "6")], cx);
3106            assert_eq!(buffer.text(), "let six = 6;");
3107            assert!(buffer.is_dirty());
3108            assert!(!buffer.has_conflict());
3109        });
3110        buffer_a
3111            .condition(cx_a, |buffer, _| buffer.text() == "let six = 6;")
3112            .await;
3113
3114        fs.save(Path::new("/a/a.rs"), &Rope::from("let seven = 7;"))
3115            .await
3116            .unwrap();
3117        buffer_a
3118            .condition(cx_a, |buffer, _| buffer.has_conflict())
3119            .await;
3120        buffer_b
3121            .condition(cx_b, |buffer, _| buffer.has_conflict())
3122            .await;
3123
3124        project_b
3125            .update(cx_b, |project, cx| {
3126                project.reload_buffers(HashSet::from_iter([buffer_b.clone()]), true, cx)
3127            })
3128            .await
3129            .unwrap();
3130        buffer_a.read_with(cx_a, |buffer, _| {
3131            assert_eq!(buffer.text(), "let seven = 7;");
3132            assert!(!buffer.is_dirty());
3133            assert!(!buffer.has_conflict());
3134        });
3135        buffer_b.read_with(cx_b, |buffer, _| {
3136            assert_eq!(buffer.text(), "let seven = 7;");
3137            assert!(!buffer.is_dirty());
3138            assert!(!buffer.has_conflict());
3139        });
3140
3141        buffer_a.update(cx_a, |buffer, cx| {
3142            // Undoing on the host is a no-op when the reload was initiated by the guest.
3143            buffer.undo(cx);
3144            assert_eq!(buffer.text(), "let seven = 7;");
3145            assert!(!buffer.is_dirty());
3146            assert!(!buffer.has_conflict());
3147        });
3148        buffer_b.update(cx_b, |buffer, cx| {
3149            // Undoing on the guest rolls back the buffer to before it was reloaded but the conflict gets cleared.
3150            buffer.undo(cx);
3151            assert_eq!(buffer.text(), "let six = 6;");
3152            assert!(buffer.is_dirty());
3153            assert!(!buffer.has_conflict());
3154        });
3155    }
3156
3157    #[gpui::test(iterations = 10)]
3158    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3159        cx_a.foreground().forbid_parking();
3160        let lang_registry = Arc::new(LanguageRegistry::test());
3161        let fs = FakeFs::new(cx_a.background());
3162
3163        // Set up a fake language server.
3164        let mut language = Language::new(
3165            LanguageConfig {
3166                name: "Rust".into(),
3167                path_suffixes: vec!["rs".to_string()],
3168                ..Default::default()
3169            },
3170            Some(tree_sitter_rust::language()),
3171        );
3172        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3173        lang_registry.add(Arc::new(language));
3174
3175        // Connect to a server as 2 clients.
3176        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3177        let client_a = server.create_client(cx_a, "user_a").await;
3178        let client_b = server.create_client(cx_b, "user_b").await;
3179
3180        // Share a project as client A
3181        fs.insert_tree(
3182            "/a",
3183            json!({
3184                ".zed.toml": r#"collaborators = ["user_b"]"#,
3185                "a.rs": "let one = two",
3186            }),
3187        )
3188        .await;
3189        let project_a = cx_a.update(|cx| {
3190            Project::local(
3191                client_a.clone(),
3192                client_a.user_store.clone(),
3193                lang_registry.clone(),
3194                fs.clone(),
3195                cx,
3196            )
3197        });
3198        let (worktree_a, _) = project_a
3199            .update(cx_a, |p, cx| {
3200                p.find_or_create_local_worktree("/a", true, cx)
3201            })
3202            .await
3203            .unwrap();
3204        worktree_a
3205            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3206            .await;
3207        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3208        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3209        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3210
3211        // Join the worktree as client B.
3212        let project_b = Project::remote(
3213            project_id,
3214            client_b.clone(),
3215            client_b.user_store.clone(),
3216            lang_registry.clone(),
3217            fs.clone(),
3218            &mut cx_b.to_async(),
3219        )
3220        .await
3221        .unwrap();
3222
3223        let buffer_b = cx_b
3224            .background()
3225            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3226            .await
3227            .unwrap();
3228
3229        let fake_language_server = fake_language_servers.next().await.unwrap();
3230        fake_language_server.handle_request::<lsp::request::Formatting, _, _>(|_, _| async move {
3231            Ok(Some(vec![
3232                lsp::TextEdit {
3233                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
3234                    new_text: "h".to_string(),
3235                },
3236                lsp::TextEdit {
3237                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
3238                    new_text: "y".to_string(),
3239                },
3240            ]))
3241        });
3242
3243        project_b
3244            .update(cx_b, |project, cx| {
3245                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
3246            })
3247            .await
3248            .unwrap();
3249        assert_eq!(
3250            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
3251            "let honey = two"
3252        );
3253    }
3254
3255    #[gpui::test(iterations = 10)]
3256    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3257        cx_a.foreground().forbid_parking();
3258        let lang_registry = Arc::new(LanguageRegistry::test());
3259        let fs = FakeFs::new(cx_a.background());
3260        fs.insert_tree(
3261            "/root-1",
3262            json!({
3263                ".zed.toml": r#"collaborators = ["user_b"]"#,
3264                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
3265            }),
3266        )
3267        .await;
3268        fs.insert_tree(
3269            "/root-2",
3270            json!({
3271                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
3272            }),
3273        )
3274        .await;
3275
3276        // Set up a fake language server.
3277        let mut language = Language::new(
3278            LanguageConfig {
3279                name: "Rust".into(),
3280                path_suffixes: vec!["rs".to_string()],
3281                ..Default::default()
3282            },
3283            Some(tree_sitter_rust::language()),
3284        );
3285        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3286        lang_registry.add(Arc::new(language));
3287
3288        // Connect to a server as 2 clients.
3289        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3290        let client_a = server.create_client(cx_a, "user_a").await;
3291        let client_b = server.create_client(cx_b, "user_b").await;
3292
3293        // Share a project as client A
3294        let project_a = cx_a.update(|cx| {
3295            Project::local(
3296                client_a.clone(),
3297                client_a.user_store.clone(),
3298                lang_registry.clone(),
3299                fs.clone(),
3300                cx,
3301            )
3302        });
3303        let (worktree_a, _) = project_a
3304            .update(cx_a, |p, cx| {
3305                p.find_or_create_local_worktree("/root-1", true, cx)
3306            })
3307            .await
3308            .unwrap();
3309        worktree_a
3310            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3311            .await;
3312        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3313        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3314        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3315
3316        // Join the worktree as client B.
3317        let project_b = Project::remote(
3318            project_id,
3319            client_b.clone(),
3320            client_b.user_store.clone(),
3321            lang_registry.clone(),
3322            fs.clone(),
3323            &mut cx_b.to_async(),
3324        )
3325        .await
3326        .unwrap();
3327
3328        // Open the file on client B.
3329        let buffer_b = cx_b
3330            .background()
3331            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3332            .await
3333            .unwrap();
3334
3335        // Request the definition of a symbol as the guest.
3336        let fake_language_server = fake_language_servers.next().await.unwrap();
3337        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3338            |_, _| async move {
3339                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3340                    lsp::Location::new(
3341                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3342                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3343                    ),
3344                )))
3345            },
3346        );
3347
3348        let definitions_1 = project_b
3349            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
3350            .await
3351            .unwrap();
3352        cx_b.read(|cx| {
3353            assert_eq!(definitions_1.len(), 1);
3354            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3355            let target_buffer = definitions_1[0].buffer.read(cx);
3356            assert_eq!(
3357                target_buffer.text(),
3358                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3359            );
3360            assert_eq!(
3361                definitions_1[0].range.to_point(target_buffer),
3362                Point::new(0, 6)..Point::new(0, 9)
3363            );
3364        });
3365
3366        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
3367        // the previous call to `definition`.
3368        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3369            |_, _| async move {
3370                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3371                    lsp::Location::new(
3372                        lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
3373                        lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
3374                    ),
3375                )))
3376            },
3377        );
3378
3379        let definitions_2 = project_b
3380            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
3381            .await
3382            .unwrap();
3383        cx_b.read(|cx| {
3384            assert_eq!(definitions_2.len(), 1);
3385            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3386            let target_buffer = definitions_2[0].buffer.read(cx);
3387            assert_eq!(
3388                target_buffer.text(),
3389                "const TWO: usize = 2;\nconst THREE: usize = 3;"
3390            );
3391            assert_eq!(
3392                definitions_2[0].range.to_point(target_buffer),
3393                Point::new(1, 6)..Point::new(1, 11)
3394            );
3395        });
3396        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
3397    }
3398
3399    #[gpui::test(iterations = 10)]
3400    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3401        cx_a.foreground().forbid_parking();
3402        let lang_registry = Arc::new(LanguageRegistry::test());
3403        let fs = FakeFs::new(cx_a.background());
3404        fs.insert_tree(
3405            "/root-1",
3406            json!({
3407                ".zed.toml": r#"collaborators = ["user_b"]"#,
3408                "one.rs": "const ONE: usize = 1;",
3409                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
3410            }),
3411        )
3412        .await;
3413        fs.insert_tree(
3414            "/root-2",
3415            json!({
3416                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
3417            }),
3418        )
3419        .await;
3420
3421        // Set up a fake language server.
3422        let mut language = Language::new(
3423            LanguageConfig {
3424                name: "Rust".into(),
3425                path_suffixes: vec!["rs".to_string()],
3426                ..Default::default()
3427            },
3428            Some(tree_sitter_rust::language()),
3429        );
3430        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3431        lang_registry.add(Arc::new(language));
3432
3433        // Connect to a server as 2 clients.
3434        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3435        let client_a = server.create_client(cx_a, "user_a").await;
3436        let client_b = server.create_client(cx_b, "user_b").await;
3437
3438        // Share a project as client A
3439        let project_a = cx_a.update(|cx| {
3440            Project::local(
3441                client_a.clone(),
3442                client_a.user_store.clone(),
3443                lang_registry.clone(),
3444                fs.clone(),
3445                cx,
3446            )
3447        });
3448        let (worktree_a, _) = project_a
3449            .update(cx_a, |p, cx| {
3450                p.find_or_create_local_worktree("/root-1", true, cx)
3451            })
3452            .await
3453            .unwrap();
3454        worktree_a
3455            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3456            .await;
3457        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3458        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3459        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3460
3461        // Join the worktree as client B.
3462        let project_b = Project::remote(
3463            project_id,
3464            client_b.clone(),
3465            client_b.user_store.clone(),
3466            lang_registry.clone(),
3467            fs.clone(),
3468            &mut cx_b.to_async(),
3469        )
3470        .await
3471        .unwrap();
3472
3473        // Open the file on client B.
3474        let buffer_b = cx_b
3475            .background()
3476            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3477            .await
3478            .unwrap();
3479
3480        // Request references to a symbol as the guest.
3481        let fake_language_server = fake_language_servers.next().await.unwrap();
3482        fake_language_server.handle_request::<lsp::request::References, _, _>(
3483            |params, _| async move {
3484                assert_eq!(
3485                    params.text_document_position.text_document.uri.as_str(),
3486                    "file:///root-1/one.rs"
3487                );
3488                Ok(Some(vec![
3489                    lsp::Location {
3490                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3491                        range: lsp::Range::new(
3492                            lsp::Position::new(0, 24),
3493                            lsp::Position::new(0, 27),
3494                        ),
3495                    },
3496                    lsp::Location {
3497                        uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
3498                        range: lsp::Range::new(
3499                            lsp::Position::new(0, 35),
3500                            lsp::Position::new(0, 38),
3501                        ),
3502                    },
3503                    lsp::Location {
3504                        uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
3505                        range: lsp::Range::new(
3506                            lsp::Position::new(0, 37),
3507                            lsp::Position::new(0, 40),
3508                        ),
3509                    },
3510                ]))
3511            },
3512        );
3513
3514        let references = project_b
3515            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
3516            .await
3517            .unwrap();
3518        cx_b.read(|cx| {
3519            assert_eq!(references.len(), 3);
3520            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
3521
3522            let two_buffer = references[0].buffer.read(cx);
3523            let three_buffer = references[2].buffer.read(cx);
3524            assert_eq!(
3525                two_buffer.file().unwrap().path().as_ref(),
3526                Path::new("two.rs")
3527            );
3528            assert_eq!(references[1].buffer, references[0].buffer);
3529            assert_eq!(
3530                three_buffer.file().unwrap().full_path(cx),
3531                Path::new("three.rs")
3532            );
3533
3534            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
3535            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
3536            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
3537        });
3538    }
3539
3540    #[gpui::test(iterations = 10)]
3541    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3542        cx_a.foreground().forbid_parking();
3543        let lang_registry = Arc::new(LanguageRegistry::test());
3544        let fs = FakeFs::new(cx_a.background());
3545        fs.insert_tree(
3546            "/root-1",
3547            json!({
3548                ".zed.toml": r#"collaborators = ["user_b"]"#,
3549                "a": "hello world",
3550                "b": "goodnight moon",
3551                "c": "a world of goo",
3552                "d": "world champion of clown world",
3553            }),
3554        )
3555        .await;
3556        fs.insert_tree(
3557            "/root-2",
3558            json!({
3559                "e": "disney world is fun",
3560            }),
3561        )
3562        .await;
3563
3564        // Connect to a server as 2 clients.
3565        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3566        let client_a = server.create_client(cx_a, "user_a").await;
3567        let client_b = server.create_client(cx_b, "user_b").await;
3568
3569        // Share a project as client A
3570        let project_a = cx_a.update(|cx| {
3571            Project::local(
3572                client_a.clone(),
3573                client_a.user_store.clone(),
3574                lang_registry.clone(),
3575                fs.clone(),
3576                cx,
3577            )
3578        });
3579        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3580
3581        let (worktree_1, _) = project_a
3582            .update(cx_a, |p, cx| {
3583                p.find_or_create_local_worktree("/root-1", true, cx)
3584            })
3585            .await
3586            .unwrap();
3587        worktree_1
3588            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3589            .await;
3590        let (worktree_2, _) = project_a
3591            .update(cx_a, |p, cx| {
3592                p.find_or_create_local_worktree("/root-2", true, cx)
3593            })
3594            .await
3595            .unwrap();
3596        worktree_2
3597            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3598            .await;
3599
3600        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3601
3602        // Join the worktree as client B.
3603        let project_b = Project::remote(
3604            project_id,
3605            client_b.clone(),
3606            client_b.user_store.clone(),
3607            lang_registry.clone(),
3608            fs.clone(),
3609            &mut cx_b.to_async(),
3610        )
3611        .await
3612        .unwrap();
3613
3614        let results = project_b
3615            .update(cx_b, |project, cx| {
3616                project.search(SearchQuery::text("world", false, false), cx)
3617            })
3618            .await
3619            .unwrap();
3620
3621        let mut ranges_by_path = results
3622            .into_iter()
3623            .map(|(buffer, ranges)| {
3624                buffer.read_with(cx_b, |buffer, cx| {
3625                    let path = buffer.file().unwrap().full_path(cx);
3626                    let offset_ranges = ranges
3627                        .into_iter()
3628                        .map(|range| range.to_offset(buffer))
3629                        .collect::<Vec<_>>();
3630                    (path, offset_ranges)
3631                })
3632            })
3633            .collect::<Vec<_>>();
3634        ranges_by_path.sort_by_key(|(path, _)| path.clone());
3635
3636        assert_eq!(
3637            ranges_by_path,
3638            &[
3639                (PathBuf::from("root-1/a"), vec![6..11]),
3640                (PathBuf::from("root-1/c"), vec![2..7]),
3641                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
3642                (PathBuf::from("root-2/e"), vec![7..12]),
3643            ]
3644        );
3645    }
3646
3647    #[gpui::test(iterations = 10)]
3648    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3649        cx_a.foreground().forbid_parking();
3650        let lang_registry = Arc::new(LanguageRegistry::test());
3651        let fs = FakeFs::new(cx_a.background());
3652        fs.insert_tree(
3653            "/root-1",
3654            json!({
3655                ".zed.toml": r#"collaborators = ["user_b"]"#,
3656                "main.rs": "fn double(number: i32) -> i32 { number + number }",
3657            }),
3658        )
3659        .await;
3660
3661        // Set up a fake language server.
3662        let mut language = Language::new(
3663            LanguageConfig {
3664                name: "Rust".into(),
3665                path_suffixes: vec!["rs".to_string()],
3666                ..Default::default()
3667            },
3668            Some(tree_sitter_rust::language()),
3669        );
3670        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3671        lang_registry.add(Arc::new(language));
3672
3673        // Connect to a server as 2 clients.
3674        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3675        let client_a = server.create_client(cx_a, "user_a").await;
3676        let client_b = server.create_client(cx_b, "user_b").await;
3677
3678        // Share a project as client A
3679        let project_a = cx_a.update(|cx| {
3680            Project::local(
3681                client_a.clone(),
3682                client_a.user_store.clone(),
3683                lang_registry.clone(),
3684                fs.clone(),
3685                cx,
3686            )
3687        });
3688        let (worktree_a, _) = project_a
3689            .update(cx_a, |p, cx| {
3690                p.find_or_create_local_worktree("/root-1", true, cx)
3691            })
3692            .await
3693            .unwrap();
3694        worktree_a
3695            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3696            .await;
3697        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3698        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3699        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3700
3701        // Join the worktree as client B.
3702        let project_b = Project::remote(
3703            project_id,
3704            client_b.clone(),
3705            client_b.user_store.clone(),
3706            lang_registry.clone(),
3707            fs.clone(),
3708            &mut cx_b.to_async(),
3709        )
3710        .await
3711        .unwrap();
3712
3713        // Open the file on client B.
3714        let buffer_b = cx_b
3715            .background()
3716            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
3717            .await
3718            .unwrap();
3719
3720        // Request document highlights as the guest.
3721        let fake_language_server = fake_language_servers.next().await.unwrap();
3722        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>(
3723            |params, _| async move {
3724                assert_eq!(
3725                    params
3726                        .text_document_position_params
3727                        .text_document
3728                        .uri
3729                        .as_str(),
3730                    "file:///root-1/main.rs"
3731                );
3732                assert_eq!(
3733                    params.text_document_position_params.position,
3734                    lsp::Position::new(0, 34)
3735                );
3736                Ok(Some(vec![
3737                    lsp::DocumentHighlight {
3738                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3739                        range: lsp::Range::new(
3740                            lsp::Position::new(0, 10),
3741                            lsp::Position::new(0, 16),
3742                        ),
3743                    },
3744                    lsp::DocumentHighlight {
3745                        kind: Some(lsp::DocumentHighlightKind::READ),
3746                        range: lsp::Range::new(
3747                            lsp::Position::new(0, 32),
3748                            lsp::Position::new(0, 38),
3749                        ),
3750                    },
3751                    lsp::DocumentHighlight {
3752                        kind: Some(lsp::DocumentHighlightKind::READ),
3753                        range: lsp::Range::new(
3754                            lsp::Position::new(0, 41),
3755                            lsp::Position::new(0, 47),
3756                        ),
3757                    },
3758                ]))
3759            },
3760        );
3761
3762        let highlights = project_b
3763            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
3764            .await
3765            .unwrap();
3766        buffer_b.read_with(cx_b, |buffer, _| {
3767            let snapshot = buffer.snapshot();
3768
3769            let highlights = highlights
3770                .into_iter()
3771                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3772                .collect::<Vec<_>>();
3773            assert_eq!(
3774                highlights,
3775                &[
3776                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3777                    (lsp::DocumentHighlightKind::READ, 32..38),
3778                    (lsp::DocumentHighlightKind::READ, 41..47)
3779                ]
3780            )
3781        });
3782    }
3783
3784    #[gpui::test(iterations = 10)]
3785    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3786        cx_a.foreground().forbid_parking();
3787        let lang_registry = Arc::new(LanguageRegistry::test());
3788        let fs = FakeFs::new(cx_a.background());
3789        fs.insert_tree(
3790            "/code",
3791            json!({
3792                "crate-1": {
3793                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3794                    "one.rs": "const ONE: usize = 1;",
3795                },
3796                "crate-2": {
3797                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3798                },
3799                "private": {
3800                    "passwords.txt": "the-password",
3801                }
3802            }),
3803        )
3804        .await;
3805
3806        // Set up a fake language server.
3807        let mut language = Language::new(
3808            LanguageConfig {
3809                name: "Rust".into(),
3810                path_suffixes: vec!["rs".to_string()],
3811                ..Default::default()
3812            },
3813            Some(tree_sitter_rust::language()),
3814        );
3815        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3816        lang_registry.add(Arc::new(language));
3817
3818        // Connect to a server as 2 clients.
3819        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3820        let client_a = server.create_client(cx_a, "user_a").await;
3821        let client_b = server.create_client(cx_b, "user_b").await;
3822
3823        // Share a project as client A
3824        let project_a = cx_a.update(|cx| {
3825            Project::local(
3826                client_a.clone(),
3827                client_a.user_store.clone(),
3828                lang_registry.clone(),
3829                fs.clone(),
3830                cx,
3831            )
3832        });
3833        let (worktree_a, _) = project_a
3834            .update(cx_a, |p, cx| {
3835                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3836            })
3837            .await
3838            .unwrap();
3839        worktree_a
3840            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3841            .await;
3842        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3843        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3844        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3845
3846        // Join the worktree as client B.
3847        let project_b = Project::remote(
3848            project_id,
3849            client_b.clone(),
3850            client_b.user_store.clone(),
3851            lang_registry.clone(),
3852            fs.clone(),
3853            &mut cx_b.to_async(),
3854        )
3855        .await
3856        .unwrap();
3857
3858        // Cause the language server to start.
3859        let _buffer = cx_b
3860            .background()
3861            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3862            .await
3863            .unwrap();
3864
3865        let fake_language_server = fake_language_servers.next().await.unwrap();
3866        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _, _>(
3867            |_, _| async move {
3868                #[allow(deprecated)]
3869                Ok(Some(vec![lsp::SymbolInformation {
3870                    name: "TWO".into(),
3871                    location: lsp::Location {
3872                        uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3873                        range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3874                    },
3875                    kind: lsp::SymbolKind::CONSTANT,
3876                    tags: None,
3877                    container_name: None,
3878                    deprecated: None,
3879                }]))
3880            },
3881        );
3882
3883        // Request the definition of a symbol as the guest.
3884        let symbols = project_b
3885            .update(cx_b, |p, cx| p.symbols("two", cx))
3886            .await
3887            .unwrap();
3888        assert_eq!(symbols.len(), 1);
3889        assert_eq!(symbols[0].name, "TWO");
3890
3891        // Open one of the returned symbols.
3892        let buffer_b_2 = project_b
3893            .update(cx_b, |project, cx| {
3894                project.open_buffer_for_symbol(&symbols[0], cx)
3895            })
3896            .await
3897            .unwrap();
3898        buffer_b_2.read_with(cx_b, |buffer, _| {
3899            assert_eq!(
3900                buffer.file().unwrap().path().as_ref(),
3901                Path::new("../crate-2/two.rs")
3902            );
3903        });
3904
3905        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3906        let mut fake_symbol = symbols[0].clone();
3907        fake_symbol.path = Path::new("/code/secrets").into();
3908        let error = project_b
3909            .update(cx_b, |project, cx| {
3910                project.open_buffer_for_symbol(&fake_symbol, cx)
3911            })
3912            .await
3913            .unwrap_err();
3914        assert!(error.to_string().contains("invalid symbol signature"));
3915    }
3916
3917    #[gpui::test(iterations = 10)]
3918    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3919        cx_a: &mut TestAppContext,
3920        cx_b: &mut TestAppContext,
3921        mut rng: StdRng,
3922    ) {
3923        cx_a.foreground().forbid_parking();
3924        let lang_registry = Arc::new(LanguageRegistry::test());
3925        let fs = FakeFs::new(cx_a.background());
3926        fs.insert_tree(
3927            "/root",
3928            json!({
3929                ".zed.toml": r#"collaborators = ["user_b"]"#,
3930                "a.rs": "const ONE: usize = b::TWO;",
3931                "b.rs": "const TWO: usize = 2",
3932            }),
3933        )
3934        .await;
3935
3936        // Set up a fake language server.
3937        let mut language = Language::new(
3938            LanguageConfig {
3939                name: "Rust".into(),
3940                path_suffixes: vec!["rs".to_string()],
3941                ..Default::default()
3942            },
3943            Some(tree_sitter_rust::language()),
3944        );
3945        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
3946        lang_registry.add(Arc::new(language));
3947
3948        // Connect to a server as 2 clients.
3949        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3950        let client_a = server.create_client(cx_a, "user_a").await;
3951        let client_b = server.create_client(cx_b, "user_b").await;
3952
3953        // Share a project as client A
3954        let project_a = cx_a.update(|cx| {
3955            Project::local(
3956                client_a.clone(),
3957                client_a.user_store.clone(),
3958                lang_registry.clone(),
3959                fs.clone(),
3960                cx,
3961            )
3962        });
3963
3964        let (worktree_a, _) = project_a
3965            .update(cx_a, |p, cx| {
3966                p.find_or_create_local_worktree("/root", true, cx)
3967            })
3968            .await
3969            .unwrap();
3970        worktree_a
3971            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3972            .await;
3973        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3974        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3975        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3976
3977        // Join the worktree as client B.
3978        let project_b = Project::remote(
3979            project_id,
3980            client_b.clone(),
3981            client_b.user_store.clone(),
3982            lang_registry.clone(),
3983            fs.clone(),
3984            &mut cx_b.to_async(),
3985        )
3986        .await
3987        .unwrap();
3988
3989        let buffer_b1 = cx_b
3990            .background()
3991            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3992            .await
3993            .unwrap();
3994
3995        let fake_language_server = fake_language_servers.next().await.unwrap();
3996        fake_language_server.handle_request::<lsp::request::GotoDefinition, _, _>(
3997            |_, _| async move {
3998                Ok(Some(lsp::GotoDefinitionResponse::Scalar(
3999                    lsp::Location::new(
4000                        lsp::Url::from_file_path("/root/b.rs").unwrap(),
4001                        lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
4002                    ),
4003                )))
4004            },
4005        );
4006
4007        let definitions;
4008        let buffer_b2;
4009        if rng.gen() {
4010            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4011            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4012        } else {
4013            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
4014            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
4015        }
4016
4017        let buffer_b2 = buffer_b2.await.unwrap();
4018        let definitions = definitions.await.unwrap();
4019        assert_eq!(definitions.len(), 1);
4020        assert_eq!(definitions[0].buffer, buffer_b2);
4021    }
4022
4023    #[gpui::test(iterations = 10)]
4024    async fn test_collaborating_with_code_actions(
4025        cx_a: &mut TestAppContext,
4026        cx_b: &mut TestAppContext,
4027    ) {
4028        cx_a.foreground().forbid_parking();
4029        let lang_registry = Arc::new(LanguageRegistry::test());
4030        let fs = FakeFs::new(cx_a.background());
4031        cx_b.update(|cx| editor::init(cx));
4032
4033        // Set up a fake language server.
4034        let mut language = Language::new(
4035            LanguageConfig {
4036                name: "Rust".into(),
4037                path_suffixes: vec!["rs".to_string()],
4038                ..Default::default()
4039            },
4040            Some(tree_sitter_rust::language()),
4041        );
4042        let mut fake_language_servers = language.set_fake_lsp_adapter(Default::default());
4043        lang_registry.add(Arc::new(language));
4044
4045        // Connect to a server as 2 clients.
4046        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4047        let client_a = server.create_client(cx_a, "user_a").await;
4048        let client_b = server.create_client(cx_b, "user_b").await;
4049
4050        // Share a project as client A
4051        fs.insert_tree(
4052            "/a",
4053            json!({
4054                ".zed.toml": r#"collaborators = ["user_b"]"#,
4055                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
4056                "other.rs": "pub fn foo() -> usize { 4 }",
4057            }),
4058        )
4059        .await;
4060        let project_a = cx_a.update(|cx| {
4061            Project::local(
4062                client_a.clone(),
4063                client_a.user_store.clone(),
4064                lang_registry.clone(),
4065                fs.clone(),
4066                cx,
4067            )
4068        });
4069        let (worktree_a, _) = project_a
4070            .update(cx_a, |p, cx| {
4071                p.find_or_create_local_worktree("/a", true, cx)
4072            })
4073            .await
4074            .unwrap();
4075        worktree_a
4076            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4077            .await;
4078        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4079        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4080        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4081
4082        // Join the worktree as client B.
4083        let project_b = Project::remote(
4084            project_id,
4085            client_b.clone(),
4086            client_b.user_store.clone(),
4087            lang_registry.clone(),
4088            fs.clone(),
4089            &mut cx_b.to_async(),
4090        )
4091        .await
4092        .unwrap();
4093        let mut params = cx_b.update(WorkspaceParams::test);
4094        params.languages = lang_registry.clone();
4095        params.client = client_b.client.clone();
4096        params.user_store = client_b.user_store.clone();
4097        params.project = project_b;
4098
4099        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4100        let editor_b = workspace_b
4101            .update(cx_b, |workspace, cx| {
4102                workspace.open_path((worktree_id, "main.rs"), true, cx)
4103            })
4104            .await
4105            .unwrap()
4106            .downcast::<Editor>()
4107            .unwrap();
4108
4109        let mut fake_language_server = fake_language_servers.next().await.unwrap();
4110        fake_language_server
4111            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4112                assert_eq!(
4113                    params.text_document.uri,
4114                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4115                );
4116                assert_eq!(params.range.start, lsp::Position::new(0, 0));
4117                assert_eq!(params.range.end, lsp::Position::new(0, 0));
4118                Ok(None)
4119            })
4120            .next()
4121            .await;
4122
4123        // Move cursor to a location that contains code actions.
4124        editor_b.update(cx_b, |editor, cx| {
4125            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
4126            cx.focus(&editor_b);
4127        });
4128
4129        fake_language_server
4130            .handle_request::<lsp::request::CodeActionRequest, _, _>(|params, _| async move {
4131                assert_eq!(
4132                    params.text_document.uri,
4133                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4134                );
4135                assert_eq!(params.range.start, lsp::Position::new(1, 31));
4136                assert_eq!(params.range.end, lsp::Position::new(1, 31));
4137
4138                Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
4139                    lsp::CodeAction {
4140                        title: "Inline into all callers".to_string(),
4141                        edit: Some(lsp::WorkspaceEdit {
4142                            changes: Some(
4143                                [
4144                                    (
4145                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
4146                                        vec![lsp::TextEdit::new(
4147                                            lsp::Range::new(
4148                                                lsp::Position::new(1, 22),
4149                                                lsp::Position::new(1, 34),
4150                                            ),
4151                                            "4".to_string(),
4152                                        )],
4153                                    ),
4154                                    (
4155                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
4156                                        vec![lsp::TextEdit::new(
4157                                            lsp::Range::new(
4158                                                lsp::Position::new(0, 0),
4159                                                lsp::Position::new(0, 27),
4160                                            ),
4161                                            "".to_string(),
4162                                        )],
4163                                    ),
4164                                ]
4165                                .into_iter()
4166                                .collect(),
4167                            ),
4168                            ..Default::default()
4169                        }),
4170                        data: Some(json!({
4171                            "codeActionParams": {
4172                                "range": {
4173                                    "start": {"line": 1, "column": 31},
4174                                    "end": {"line": 1, "column": 31},
4175                                }
4176                            }
4177                        })),
4178                        ..Default::default()
4179                    },
4180                )]))
4181            })
4182            .next()
4183            .await;
4184
4185        // Toggle code actions and wait for them to display.
4186        editor_b.update(cx_b, |editor, cx| {
4187            editor.toggle_code_actions(
4188                &ToggleCodeActions {
4189                    deployed_from_indicator: false,
4190                },
4191                cx,
4192            );
4193        });
4194        editor_b
4195            .condition(&cx_b, |editor, _| editor.context_menu_visible())
4196            .await;
4197
4198        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
4199
4200        // Confirming the code action will trigger a resolve request.
4201        let confirm_action = workspace_b
4202            .update(cx_b, |workspace, cx| {
4203                Editor::confirm_code_action(workspace, &ConfirmCodeAction { item_ix: Some(0) }, cx)
4204            })
4205            .unwrap();
4206        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _, _>(
4207            |_, _| async move {
4208                Ok(lsp::CodeAction {
4209                    title: "Inline into all callers".to_string(),
4210                    edit: Some(lsp::WorkspaceEdit {
4211                        changes: Some(
4212                            [
4213                                (
4214                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
4215                                    vec![lsp::TextEdit::new(
4216                                        lsp::Range::new(
4217                                            lsp::Position::new(1, 22),
4218                                            lsp::Position::new(1, 34),
4219                                        ),
4220                                        "4".to_string(),
4221                                    )],
4222                                ),
4223                                (
4224                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
4225                                    vec![lsp::TextEdit::new(
4226                                        lsp::Range::new(
4227                                            lsp::Position::new(0, 0),
4228                                            lsp::Position::new(0, 27),
4229                                        ),
4230                                        "".to_string(),
4231                                    )],
4232                                ),
4233                            ]
4234                            .into_iter()
4235                            .collect(),
4236                        ),
4237                        ..Default::default()
4238                    }),
4239                    ..Default::default()
4240                })
4241            },
4242        );
4243
4244        // After the action is confirmed, an editor containing both modified files is opened.
4245        confirm_action.await.unwrap();
4246        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4247            workspace
4248                .active_item(cx)
4249                .unwrap()
4250                .downcast::<Editor>()
4251                .unwrap()
4252        });
4253        code_action_editor.update(cx_b, |editor, cx| {
4254            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4255            editor.undo(&Undo, cx);
4256            assert_eq!(
4257                editor.text(cx),
4258                "mod other;\nfn main() { let foo = other::foo(); }\npub fn foo() -> usize { 4 }"
4259            );
4260            editor.redo(&Redo, cx);
4261            assert_eq!(editor.text(cx), "mod other;\nfn main() { let foo = 4; }\n");
4262        });
4263    }
4264
4265    #[gpui::test(iterations = 10)]
4266    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4267        cx_a.foreground().forbid_parking();
4268        let lang_registry = Arc::new(LanguageRegistry::test());
4269        let fs = FakeFs::new(cx_a.background());
4270        cx_b.update(|cx| editor::init(cx));
4271
4272        // Set up a fake language server.
4273        let mut language = Language::new(
4274            LanguageConfig {
4275                name: "Rust".into(),
4276                path_suffixes: vec!["rs".to_string()],
4277                ..Default::default()
4278            },
4279            Some(tree_sitter_rust::language()),
4280        );
4281        let mut fake_language_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
4282            capabilities: lsp::ServerCapabilities {
4283                rename_provider: Some(lsp::OneOf::Right(lsp::RenameOptions {
4284                    prepare_provider: Some(true),
4285                    work_done_progress_options: Default::default(),
4286                })),
4287                ..Default::default()
4288            },
4289            ..Default::default()
4290        });
4291        lang_registry.add(Arc::new(language));
4292
4293        // Connect to a server as 2 clients.
4294        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4295        let client_a = server.create_client(cx_a, "user_a").await;
4296        let client_b = server.create_client(cx_b, "user_b").await;
4297
4298        // Share a project as client A
4299        fs.insert_tree(
4300            "/dir",
4301            json!({
4302                ".zed.toml": r#"collaborators = ["user_b"]"#,
4303                "one.rs": "const ONE: usize = 1;",
4304                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
4305            }),
4306        )
4307        .await;
4308        let project_a = cx_a.update(|cx| {
4309            Project::local(
4310                client_a.clone(),
4311                client_a.user_store.clone(),
4312                lang_registry.clone(),
4313                fs.clone(),
4314                cx,
4315            )
4316        });
4317        let (worktree_a, _) = project_a
4318            .update(cx_a, |p, cx| {
4319                p.find_or_create_local_worktree("/dir", true, cx)
4320            })
4321            .await
4322            .unwrap();
4323        worktree_a
4324            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4325            .await;
4326        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
4327        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
4328        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
4329
4330        // Join the worktree as client B.
4331        let project_b = Project::remote(
4332            project_id,
4333            client_b.clone(),
4334            client_b.user_store.clone(),
4335            lang_registry.clone(),
4336            fs.clone(),
4337            &mut cx_b.to_async(),
4338        )
4339        .await
4340        .unwrap();
4341        let mut params = cx_b.update(WorkspaceParams::test);
4342        params.languages = lang_registry.clone();
4343        params.client = client_b.client.clone();
4344        params.user_store = client_b.user_store.clone();
4345        params.project = project_b;
4346
4347        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
4348        let editor_b = workspace_b
4349            .update(cx_b, |workspace, cx| {
4350                workspace.open_path((worktree_id, "one.rs"), true, cx)
4351            })
4352            .await
4353            .unwrap()
4354            .downcast::<Editor>()
4355            .unwrap();
4356        let fake_language_server = fake_language_servers.next().await.unwrap();
4357
4358        // Move cursor to a location that can be renamed.
4359        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
4360            editor.select_ranges([7..7], None, cx);
4361            editor.rename(&Rename, cx).unwrap()
4362        });
4363
4364        fake_language_server
4365            .handle_request::<lsp::request::PrepareRenameRequest, _, _>(|params, _| async move {
4366                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
4367                assert_eq!(params.position, lsp::Position::new(0, 7));
4368                Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4369                    lsp::Position::new(0, 6),
4370                    lsp::Position::new(0, 9),
4371                ))))
4372            })
4373            .next()
4374            .await
4375            .unwrap();
4376        prepare_rename.await.unwrap();
4377        editor_b.update(cx_b, |editor, cx| {
4378            let rename = editor.pending_rename().unwrap();
4379            let buffer = editor.buffer().read(cx).snapshot(cx);
4380            assert_eq!(
4381                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
4382                6..9
4383            );
4384            rename.editor.update(cx, |rename_editor, cx| {
4385                rename_editor.buffer().update(cx, |rename_buffer, cx| {
4386                    rename_buffer.edit([(0..3, "THREE")], cx);
4387                });
4388            });
4389        });
4390
4391        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
4392            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
4393        });
4394        fake_language_server
4395            .handle_request::<lsp::request::Rename, _, _>(|params, _| async move {
4396                assert_eq!(
4397                    params.text_document_position.text_document.uri.as_str(),
4398                    "file:///dir/one.rs"
4399                );
4400                assert_eq!(
4401                    params.text_document_position.position,
4402                    lsp::Position::new(0, 6)
4403                );
4404                assert_eq!(params.new_name, "THREE");
4405                Ok(Some(lsp::WorkspaceEdit {
4406                    changes: Some(
4407                        [
4408                            (
4409                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
4410                                vec![lsp::TextEdit::new(
4411                                    lsp::Range::new(
4412                                        lsp::Position::new(0, 6),
4413                                        lsp::Position::new(0, 9),
4414                                    ),
4415                                    "THREE".to_string(),
4416                                )],
4417                            ),
4418                            (
4419                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
4420                                vec![
4421                                    lsp::TextEdit::new(
4422                                        lsp::Range::new(
4423                                            lsp::Position::new(0, 24),
4424                                            lsp::Position::new(0, 27),
4425                                        ),
4426                                        "THREE".to_string(),
4427                                    ),
4428                                    lsp::TextEdit::new(
4429                                        lsp::Range::new(
4430                                            lsp::Position::new(0, 35),
4431                                            lsp::Position::new(0, 38),
4432                                        ),
4433                                        "THREE".to_string(),
4434                                    ),
4435                                ],
4436                            ),
4437                        ]
4438                        .into_iter()
4439                        .collect(),
4440                    ),
4441                    ..Default::default()
4442                }))
4443            })
4444            .next()
4445            .await
4446            .unwrap();
4447        confirm_rename.await.unwrap();
4448
4449        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
4450            workspace
4451                .active_item(cx)
4452                .unwrap()
4453                .downcast::<Editor>()
4454                .unwrap()
4455        });
4456        rename_editor.update(cx_b, |editor, cx| {
4457            assert_eq!(
4458                editor.text(cx),
4459                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4460            );
4461            editor.undo(&Undo, cx);
4462            assert_eq!(
4463                editor.text(cx),
4464                "const ONE: usize = 1;\nconst TWO: usize = one::ONE + one::ONE;"
4465            );
4466            editor.redo(&Redo, cx);
4467            assert_eq!(
4468                editor.text(cx),
4469                "const THREE: usize = 1;\nconst TWO: usize = one::THREE + one::THREE;"
4470            );
4471        });
4472
4473        // Ensure temporary rename edits cannot be undone/redone.
4474        editor_b.update(cx_b, |editor, cx| {
4475            editor.undo(&Undo, cx);
4476            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4477            editor.undo(&Undo, cx);
4478            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
4479            editor.redo(&Redo, cx);
4480            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
4481        })
4482    }
4483
4484    #[gpui::test(iterations = 10)]
4485    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4486        cx_a.foreground().forbid_parking();
4487
4488        // Connect to a server as 2 clients.
4489        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4490        let client_a = server.create_client(cx_a, "user_a").await;
4491        let client_b = server.create_client(cx_b, "user_b").await;
4492
4493        // Create an org that includes these 2 users.
4494        let db = &server.app_state.db;
4495        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4496        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4497            .await
4498            .unwrap();
4499        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4500            .await
4501            .unwrap();
4502
4503        // Create a channel that includes all the users.
4504        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4505        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4506            .await
4507            .unwrap();
4508        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4509            .await
4510            .unwrap();
4511        db.create_channel_message(
4512            channel_id,
4513            client_b.current_user_id(&cx_b),
4514            "hello A, it's B.",
4515            OffsetDateTime::now_utc(),
4516            1,
4517        )
4518        .await
4519        .unwrap();
4520
4521        let channels_a = cx_a
4522            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4523        channels_a
4524            .condition(cx_a, |list, _| list.available_channels().is_some())
4525            .await;
4526        channels_a.read_with(cx_a, |list, _| {
4527            assert_eq!(
4528                list.available_channels().unwrap(),
4529                &[ChannelDetails {
4530                    id: channel_id.to_proto(),
4531                    name: "test-channel".to_string()
4532                }]
4533            )
4534        });
4535        let channel_a = channels_a.update(cx_a, |this, cx| {
4536            this.get_channel(channel_id.to_proto(), cx).unwrap()
4537        });
4538        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4539        channel_a
4540            .condition(&cx_a, |channel, _| {
4541                channel_messages(channel)
4542                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4543            })
4544            .await;
4545
4546        let channels_b = cx_b
4547            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4548        channels_b
4549            .condition(cx_b, |list, _| list.available_channels().is_some())
4550            .await;
4551        channels_b.read_with(cx_b, |list, _| {
4552            assert_eq!(
4553                list.available_channels().unwrap(),
4554                &[ChannelDetails {
4555                    id: channel_id.to_proto(),
4556                    name: "test-channel".to_string()
4557                }]
4558            )
4559        });
4560
4561        let channel_b = channels_b.update(cx_b, |this, cx| {
4562            this.get_channel(channel_id.to_proto(), cx).unwrap()
4563        });
4564        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4565        channel_b
4566            .condition(&cx_b, |channel, _| {
4567                channel_messages(channel)
4568                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4569            })
4570            .await;
4571
4572        channel_a
4573            .update(cx_a, |channel, cx| {
4574                channel
4575                    .send_message("oh, hi B.".to_string(), cx)
4576                    .unwrap()
4577                    .detach();
4578                let task = channel.send_message("sup".to_string(), cx).unwrap();
4579                assert_eq!(
4580                    channel_messages(channel),
4581                    &[
4582                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4583                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4584                        ("user_a".to_string(), "sup".to_string(), true)
4585                    ]
4586                );
4587                task
4588            })
4589            .await
4590            .unwrap();
4591
4592        channel_b
4593            .condition(&cx_b, |channel, _| {
4594                channel_messages(channel)
4595                    == [
4596                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4597                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4598                        ("user_a".to_string(), "sup".to_string(), false),
4599                    ]
4600            })
4601            .await;
4602
4603        assert_eq!(
4604            server
4605                .state()
4606                .await
4607                .channel(channel_id)
4608                .unwrap()
4609                .connection_ids
4610                .len(),
4611            2
4612        );
4613        cx_b.update(|_| drop(channel_b));
4614        server
4615            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
4616            .await;
4617
4618        cx_a.update(|_| drop(channel_a));
4619        server
4620            .condition(|state| state.channel(channel_id).is_none())
4621            .await;
4622    }
4623
4624    #[gpui::test(iterations = 10)]
4625    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
4626        cx_a.foreground().forbid_parking();
4627
4628        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4629        let client_a = server.create_client(cx_a, "user_a").await;
4630
4631        let db = &server.app_state.db;
4632        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4633        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4634        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4635            .await
4636            .unwrap();
4637        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4638            .await
4639            .unwrap();
4640
4641        let channels_a = cx_a
4642            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4643        channels_a
4644            .condition(cx_a, |list, _| list.available_channels().is_some())
4645            .await;
4646        let channel_a = channels_a.update(cx_a, |this, cx| {
4647            this.get_channel(channel_id.to_proto(), cx).unwrap()
4648        });
4649
4650        // Messages aren't allowed to be too long.
4651        channel_a
4652            .update(cx_a, |channel, cx| {
4653                let long_body = "this is long.\n".repeat(1024);
4654                channel.send_message(long_body, cx).unwrap()
4655            })
4656            .await
4657            .unwrap_err();
4658
4659        // Messages aren't allowed to be blank.
4660        channel_a.update(cx_a, |channel, cx| {
4661            channel.send_message(String::new(), cx).unwrap_err()
4662        });
4663
4664        // Leading and trailing whitespace are trimmed.
4665        channel_a
4666            .update(cx_a, |channel, cx| {
4667                channel
4668                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4669                    .unwrap()
4670            })
4671            .await
4672            .unwrap();
4673        assert_eq!(
4674            db.get_channel_messages(channel_id, 10, None)
4675                .await
4676                .unwrap()
4677                .iter()
4678                .map(|m| &m.body)
4679                .collect::<Vec<_>>(),
4680            &["surrounded by whitespace"]
4681        );
4682    }
4683
4684    #[gpui::test(iterations = 10)]
4685    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4686        cx_a.foreground().forbid_parking();
4687
4688        // Connect to a server as 2 clients.
4689        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4690        let client_a = server.create_client(cx_a, "user_a").await;
4691        let client_b = server.create_client(cx_b, "user_b").await;
4692        let mut status_b = client_b.status();
4693
4694        // Create an org that includes these 2 users.
4695        let db = &server.app_state.db;
4696        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4697        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4698            .await
4699            .unwrap();
4700        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4701            .await
4702            .unwrap();
4703
4704        // Create a channel that includes all the users.
4705        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4706        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4707            .await
4708            .unwrap();
4709        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4710            .await
4711            .unwrap();
4712        db.create_channel_message(
4713            channel_id,
4714            client_b.current_user_id(&cx_b),
4715            "hello A, it's B.",
4716            OffsetDateTime::now_utc(),
4717            2,
4718        )
4719        .await
4720        .unwrap();
4721
4722        let channels_a = cx_a
4723            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4724        channels_a
4725            .condition(cx_a, |list, _| list.available_channels().is_some())
4726            .await;
4727
4728        channels_a.read_with(cx_a, |list, _| {
4729            assert_eq!(
4730                list.available_channels().unwrap(),
4731                &[ChannelDetails {
4732                    id: channel_id.to_proto(),
4733                    name: "test-channel".to_string()
4734                }]
4735            )
4736        });
4737        let channel_a = channels_a.update(cx_a, |this, cx| {
4738            this.get_channel(channel_id.to_proto(), cx).unwrap()
4739        });
4740        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
4741        channel_a
4742            .condition(&cx_a, |channel, _| {
4743                channel_messages(channel)
4744                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4745            })
4746            .await;
4747
4748        let channels_b = cx_b
4749            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4750        channels_b
4751            .condition(cx_b, |list, _| list.available_channels().is_some())
4752            .await;
4753        channels_b.read_with(cx_b, |list, _| {
4754            assert_eq!(
4755                list.available_channels().unwrap(),
4756                &[ChannelDetails {
4757                    id: channel_id.to_proto(),
4758                    name: "test-channel".to_string()
4759                }]
4760            )
4761        });
4762
4763        let channel_b = channels_b.update(cx_b, |this, cx| {
4764            this.get_channel(channel_id.to_proto(), cx).unwrap()
4765        });
4766        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
4767        channel_b
4768            .condition(&cx_b, |channel, _| {
4769                channel_messages(channel)
4770                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4771            })
4772            .await;
4773
4774        // Disconnect client B, ensuring we can still access its cached channel data.
4775        server.forbid_connections();
4776        server.disconnect_client(client_b.current_user_id(&cx_b));
4777        cx_b.foreground().advance_clock(rpc::RECEIVE_TIMEOUT);
4778        while !matches!(
4779            status_b.next().await,
4780            Some(client::Status::ReconnectionError { .. })
4781        ) {}
4782
4783        channels_b.read_with(cx_b, |channels, _| {
4784            assert_eq!(
4785                channels.available_channels().unwrap(),
4786                [ChannelDetails {
4787                    id: channel_id.to_proto(),
4788                    name: "test-channel".to_string()
4789                }]
4790            )
4791        });
4792        channel_b.read_with(cx_b, |channel, _| {
4793            assert_eq!(
4794                channel_messages(channel),
4795                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4796            )
4797        });
4798
4799        // Send a message from client B while it is disconnected.
4800        channel_b
4801            .update(cx_b, |channel, cx| {
4802                let task = channel
4803                    .send_message("can you see this?".to_string(), cx)
4804                    .unwrap();
4805                assert_eq!(
4806                    channel_messages(channel),
4807                    &[
4808                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4809                        ("user_b".to_string(), "can you see this?".to_string(), true)
4810                    ]
4811                );
4812                task
4813            })
4814            .await
4815            .unwrap_err();
4816
4817        // Send a message from client A while B is disconnected.
4818        channel_a
4819            .update(cx_a, |channel, cx| {
4820                channel
4821                    .send_message("oh, hi B.".to_string(), cx)
4822                    .unwrap()
4823                    .detach();
4824                let task = channel.send_message("sup".to_string(), cx).unwrap();
4825                assert_eq!(
4826                    channel_messages(channel),
4827                    &[
4828                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4829                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4830                        ("user_a".to_string(), "sup".to_string(), true)
4831                    ]
4832                );
4833                task
4834            })
4835            .await
4836            .unwrap();
4837
4838        // Give client B a chance to reconnect.
4839        server.allow_connections();
4840        cx_b.foreground().advance_clock(Duration::from_secs(10));
4841
4842        // Verify that B sees the new messages upon reconnection, as well as the message client B
4843        // sent while offline.
4844        channel_b
4845            .condition(&cx_b, |channel, _| {
4846                channel_messages(channel)
4847                    == [
4848                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4849                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4850                        ("user_a".to_string(), "sup".to_string(), false),
4851                        ("user_b".to_string(), "can you see this?".to_string(), false),
4852                    ]
4853            })
4854            .await;
4855
4856        // Ensure client A and B can communicate normally after reconnection.
4857        channel_a
4858            .update(cx_a, |channel, cx| {
4859                channel.send_message("you online?".to_string(), cx).unwrap()
4860            })
4861            .await
4862            .unwrap();
4863        channel_b
4864            .condition(&cx_b, |channel, _| {
4865                channel_messages(channel)
4866                    == [
4867                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4868                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4869                        ("user_a".to_string(), "sup".to_string(), false),
4870                        ("user_b".to_string(), "can you see this?".to_string(), false),
4871                        ("user_a".to_string(), "you online?".to_string(), false),
4872                    ]
4873            })
4874            .await;
4875
4876        channel_b
4877            .update(cx_b, |channel, cx| {
4878                channel.send_message("yep".to_string(), cx).unwrap()
4879            })
4880            .await
4881            .unwrap();
4882        channel_a
4883            .condition(&cx_a, |channel, _| {
4884                channel_messages(channel)
4885                    == [
4886                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4887                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4888                        ("user_a".to_string(), "sup".to_string(), false),
4889                        ("user_b".to_string(), "can you see this?".to_string(), false),
4890                        ("user_a".to_string(), "you online?".to_string(), false),
4891                        ("user_b".to_string(), "yep".to_string(), false),
4892                    ]
4893            })
4894            .await;
4895    }
4896
4897    #[gpui::test(iterations = 10)]
4898    async fn test_contacts(
4899        deterministic: Arc<Deterministic>,
4900        cx_a: &mut TestAppContext,
4901        cx_b: &mut TestAppContext,
4902        cx_c: &mut TestAppContext,
4903    ) {
4904        cx_a.foreground().forbid_parking();
4905        let lang_registry = Arc::new(LanguageRegistry::test());
4906        let fs = FakeFs::new(cx_a.background());
4907
4908        // Connect to a server as 3 clients.
4909        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4910        let client_a = server.create_client(cx_a, "user_a").await;
4911        let client_b = server.create_client(cx_b, "user_b").await;
4912        let client_c = server.create_client(cx_c, "user_c").await;
4913        server
4914            .make_contacts(vec![
4915                (&client_a, cx_a),
4916                (&client_b, cx_b),
4917                (&client_c, cx_c),
4918            ])
4919            .await;
4920        deterministic.run_until_parked();
4921        client_a.user_store.read_with(cx_a, |store, _| {
4922            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
4923        });
4924        client_b.user_store.read_with(cx_b, |store, _| {
4925            assert_eq!(contacts(store), [("user_a", vec![]), ("user_c", vec![])])
4926        });
4927        client_c.user_store.read_with(cx_c, |store, _| {
4928            assert_eq!(contacts(store), [("user_a", vec![]), ("user_b", vec![])])
4929        });
4930
4931        // Share a worktree as client A.
4932        fs.create_dir(Path::new("/a")).await.unwrap();
4933
4934        let project_a = cx_a.update(|cx| {
4935            Project::local(
4936                client_a.clone(),
4937                client_a.user_store.clone(),
4938                lang_registry.clone(),
4939                fs.clone(),
4940                cx,
4941            )
4942        });
4943        let (worktree_a, _) = project_a
4944            .update(cx_a, |p, cx| {
4945                p.find_or_create_local_worktree("/a", true, cx)
4946            })
4947            .await
4948            .unwrap();
4949        worktree_a
4950            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4951            .await;
4952
4953        deterministic.run_until_parked();
4954        client_a.user_store.read_with(cx_a, |store, _| {
4955            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
4956        });
4957        client_b.user_store.read_with(cx_b, |store, _| {
4958            assert_eq!(
4959                contacts(store),
4960                [("user_a", vec![("a", false, vec![])]), ("user_c", vec![])]
4961            )
4962        });
4963        client_c.user_store.read_with(cx_c, |store, _| {
4964            assert_eq!(
4965                contacts(store),
4966                [("user_a", vec![("a", false, vec![])]), ("user_b", vec![])]
4967            )
4968        });
4969
4970        let project_id = project_a
4971            .update(cx_a, |project, _| project.next_remote_id())
4972            .await;
4973        project_a
4974            .update(cx_a, |project, cx| project.share(cx))
4975            .await
4976            .unwrap();
4977        deterministic.run_until_parked();
4978        client_a.user_store.read_with(cx_a, |store, _| {
4979            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
4980        });
4981        client_b.user_store.read_with(cx_b, |store, _| {
4982            assert_eq!(
4983                contacts(store),
4984                [("user_a", vec![("a", true, vec![])]), ("user_c", vec![])]
4985            )
4986        });
4987        client_c.user_store.read_with(cx_c, |store, _| {
4988            assert_eq!(
4989                contacts(store),
4990                [("user_a", vec![("a", true, vec![])]), ("user_b", vec![])]
4991            )
4992        });
4993
4994        let _project_b = Project::remote(
4995            project_id,
4996            client_b.clone(),
4997            client_b.user_store.clone(),
4998            lang_registry.clone(),
4999            fs.clone(),
5000            &mut cx_b.to_async(),
5001        )
5002        .await
5003        .unwrap();
5004        deterministic.run_until_parked();
5005        client_a.user_store.read_with(cx_a, |store, _| {
5006            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5007        });
5008        client_b.user_store.read_with(cx_b, |store, _| {
5009            assert_eq!(
5010                contacts(store),
5011                [
5012                    ("user_a", vec![("a", true, vec!["user_b"])]),
5013                    ("user_c", vec![])
5014                ]
5015            )
5016        });
5017        client_c.user_store.read_with(cx_c, |store, _| {
5018            assert_eq!(
5019                contacts(store),
5020                [
5021                    ("user_a", vec![("a", true, vec!["user_b"])]),
5022                    ("user_b", vec![])
5023                ]
5024            )
5025        });
5026
5027        project_a
5028            .condition(&cx_a, |project, _| {
5029                project.collaborators().contains_key(&client_b.peer_id)
5030            })
5031            .await;
5032
5033        cx_a.update(move |_| drop(project_a));
5034        deterministic.run_until_parked();
5035        client_a.user_store.read_with(cx_a, |store, _| {
5036            assert_eq!(contacts(store), [("user_b", vec![]), ("user_c", vec![])])
5037        });
5038        client_b.user_store.read_with(cx_b, |store, _| {
5039            assert_eq!(contacts(store), [("user_a", vec![]), ("user_c", vec![])])
5040        });
5041        client_c.user_store.read_with(cx_c, |store, _| {
5042            assert_eq!(contacts(store), [("user_a", vec![]), ("user_b", vec![])])
5043        });
5044
5045        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, bool, Vec<&str>)>)> {
5046            user_store
5047                .contacts()
5048                .iter()
5049                .map(|contact| {
5050                    let worktrees = contact
5051                        .projects
5052                        .iter()
5053                        .map(|p| {
5054                            (
5055                                p.worktree_root_names[0].as_str(),
5056                                p.is_shared,
5057                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
5058                            )
5059                        })
5060                        .collect();
5061                    (contact.user.github_login.as_str(), worktrees)
5062                })
5063                .collect()
5064        }
5065    }
5066
5067    #[gpui::test(iterations = 10)]
5068    async fn test_contact_requests(
5069        executor: Arc<Deterministic>,
5070        cx_a: &mut TestAppContext,
5071        cx_a2: &mut TestAppContext,
5072        cx_b: &mut TestAppContext,
5073        cx_b2: &mut TestAppContext,
5074        cx_c: &mut TestAppContext,
5075        cx_c2: &mut TestAppContext,
5076    ) {
5077        cx_a.foreground().forbid_parking();
5078
5079        // Connect to a server as 3 clients.
5080        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5081        let client_a = server.create_client(cx_a, "user_a").await;
5082        let client_a2 = server.create_client(cx_a2, "user_a").await;
5083        let client_b = server.create_client(cx_b, "user_b").await;
5084        let client_b2 = server.create_client(cx_b2, "user_b").await;
5085        let client_c = server.create_client(cx_c, "user_c").await;
5086        let client_c2 = server.create_client(cx_c2, "user_c").await;
5087
5088        assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap());
5089        assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap());
5090        assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap());
5091
5092        // User A and User C request that user B become their contact.
5093        client_a
5094            .user_store
5095            .read_with(cx_a, |store, _| {
5096                store.request_contact(client_b.user_id().unwrap())
5097            })
5098            .await
5099            .unwrap();
5100        client_c
5101            .user_store
5102            .read_with(cx_c, |store, _| {
5103                store.request_contact(client_b.user_id().unwrap())
5104            })
5105            .await
5106            .unwrap();
5107        executor.run_until_parked();
5108
5109        // All users see the pending request appear in all their clients.
5110        assert_eq!(
5111            client_a.summarize_contacts(&cx_a).outgoing_requests,
5112            &["user_b"]
5113        );
5114        assert_eq!(
5115            client_a2.summarize_contacts(&cx_a2).outgoing_requests,
5116            &["user_b"]
5117        );
5118        assert_eq!(
5119            client_b.summarize_contacts(&cx_b).incoming_requests,
5120            &["user_a", "user_c"]
5121        );
5122        assert_eq!(
5123            client_b2.summarize_contacts(&cx_b2).incoming_requests,
5124            &["user_a", "user_c"]
5125        );
5126        assert_eq!(
5127            client_c.summarize_contacts(&cx_c).outgoing_requests,
5128            &["user_b"]
5129        );
5130        assert_eq!(
5131            client_c2.summarize_contacts(&cx_c2).outgoing_requests,
5132            &["user_b"]
5133        );
5134
5135        // Contact requests are present upon connecting (tested here via disconnect/reconnect)
5136        disconnect_and_reconnect(&client_a, cx_a).await;
5137        disconnect_and_reconnect(&client_b, cx_b).await;
5138        disconnect_and_reconnect(&client_c, cx_c).await;
5139        executor.run_until_parked();
5140        assert_eq!(
5141            client_a.summarize_contacts(&cx_a).outgoing_requests,
5142            &["user_b"]
5143        );
5144        assert_eq!(
5145            client_b.summarize_contacts(&cx_b).incoming_requests,
5146            &["user_a", "user_c"]
5147        );
5148        assert_eq!(
5149            client_c.summarize_contacts(&cx_c).outgoing_requests,
5150            &["user_b"]
5151        );
5152
5153        // User B accepts the request from user A.
5154        client_b
5155            .user_store
5156            .read_with(cx_b, |store, _| {
5157                store.respond_to_contact_request(client_a.user_id().unwrap(), true)
5158            })
5159            .await
5160            .unwrap();
5161
5162        executor.run_until_parked();
5163
5164        // User B sees user A as their contact now in all client, and the incoming request from them is removed.
5165        let contacts_b = client_b.summarize_contacts(&cx_b);
5166        assert_eq!(contacts_b.current, &["user_a"]);
5167        assert_eq!(contacts_b.incoming_requests, &["user_c"]);
5168        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5169        assert_eq!(contacts_b2.current, &["user_a"]);
5170        assert_eq!(contacts_b2.incoming_requests, &["user_c"]);
5171
5172        // User A sees user B as their contact now in all clients, and the outgoing request to them is removed.
5173        let contacts_a = client_a.summarize_contacts(&cx_a);
5174        assert_eq!(contacts_a.current, &["user_b"]);
5175        assert!(contacts_a.outgoing_requests.is_empty());
5176        let contacts_a2 = client_a2.summarize_contacts(&cx_a2);
5177        assert_eq!(contacts_a2.current, &["user_b"]);
5178        assert!(contacts_a2.outgoing_requests.is_empty());
5179
5180        // Contacts are present upon connecting (tested here via disconnect/reconnect)
5181        disconnect_and_reconnect(&client_a, cx_a).await;
5182        disconnect_and_reconnect(&client_b, cx_b).await;
5183        disconnect_and_reconnect(&client_c, cx_c).await;
5184        executor.run_until_parked();
5185        assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5186        assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5187        assert_eq!(
5188            client_b.summarize_contacts(&cx_b).incoming_requests,
5189            &["user_c"]
5190        );
5191        assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5192        assert_eq!(
5193            client_c.summarize_contacts(&cx_c).outgoing_requests,
5194            &["user_b"]
5195        );
5196
5197        // User B rejects the request from user C.
5198        client_b
5199            .user_store
5200            .read_with(cx_b, |store, _| {
5201                store.respond_to_contact_request(client_c.user_id().unwrap(), false)
5202            })
5203            .await
5204            .unwrap();
5205
5206        executor.run_until_parked();
5207
5208        // User B doesn't see user C as their contact, and the incoming request from them is removed.
5209        let contacts_b = client_b.summarize_contacts(&cx_b);
5210        assert_eq!(contacts_b.current, &["user_a"]);
5211        assert!(contacts_b.incoming_requests.is_empty());
5212        let contacts_b2 = client_b2.summarize_contacts(&cx_b2);
5213        assert_eq!(contacts_b2.current, &["user_a"]);
5214        assert!(contacts_b2.incoming_requests.is_empty());
5215
5216        // User C doesn't see user B as their contact, and the outgoing request to them is removed.
5217        let contacts_c = client_c.summarize_contacts(&cx_c);
5218        assert!(contacts_c.current.is_empty());
5219        assert!(contacts_c.outgoing_requests.is_empty());
5220        let contacts_c2 = client_c2.summarize_contacts(&cx_c2);
5221        assert!(contacts_c2.current.is_empty());
5222        assert!(contacts_c2.outgoing_requests.is_empty());
5223
5224        // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect)
5225        disconnect_and_reconnect(&client_a, cx_a).await;
5226        disconnect_and_reconnect(&client_b, cx_b).await;
5227        disconnect_and_reconnect(&client_c, cx_c).await;
5228        executor.run_until_parked();
5229        assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]);
5230        assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]);
5231        assert!(client_b
5232            .summarize_contacts(&cx_b)
5233            .incoming_requests
5234            .is_empty());
5235        assert!(client_c.summarize_contacts(&cx_c).current.is_empty());
5236        assert!(client_c
5237            .summarize_contacts(&cx_c)
5238            .outgoing_requests
5239            .is_empty());
5240
5241        async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) {
5242            client.disconnect(&cx.to_async()).unwrap();
5243            client.clear_contacts(cx);
5244            client
5245                .authenticate_and_connect(false, &cx.to_async())
5246                .await
5247                .unwrap();
5248        }
5249    }
5250
5251    #[gpui::test(iterations = 10)]
5252    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5253        cx_a.foreground().forbid_parking();
5254        let fs = FakeFs::new(cx_a.background());
5255
5256        // 2 clients connect to a server.
5257        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5258        let mut client_a = server.create_client(cx_a, "user_a").await;
5259        let mut client_b = server.create_client(cx_b, "user_b").await;
5260        cx_a.update(editor::init);
5261        cx_b.update(editor::init);
5262
5263        // Client A shares a project.
5264        fs.insert_tree(
5265            "/a",
5266            json!({
5267                ".zed.toml": r#"collaborators = ["user_b"]"#,
5268                "1.txt": "one",
5269                "2.txt": "two",
5270                "3.txt": "three",
5271            }),
5272        )
5273        .await;
5274        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5275        project_a
5276            .update(cx_a, |project, cx| project.share(cx))
5277            .await
5278            .unwrap();
5279
5280        // Client B joins the project.
5281        let project_b = client_b
5282            .build_remote_project(
5283                project_a
5284                    .read_with(cx_a, |project, _| project.remote_id())
5285                    .unwrap(),
5286                cx_b,
5287            )
5288            .await;
5289
5290        // Client A opens some editors.
5291        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5292        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5293        let editor_a1 = workspace_a
5294            .update(cx_a, |workspace, cx| {
5295                workspace.open_path((worktree_id, "1.txt"), true, cx)
5296            })
5297            .await
5298            .unwrap()
5299            .downcast::<Editor>()
5300            .unwrap();
5301        let editor_a2 = workspace_a
5302            .update(cx_a, |workspace, cx| {
5303                workspace.open_path((worktree_id, "2.txt"), true, cx)
5304            })
5305            .await
5306            .unwrap()
5307            .downcast::<Editor>()
5308            .unwrap();
5309
5310        // Client B opens an editor.
5311        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5312        let editor_b1 = workspace_b
5313            .update(cx_b, |workspace, cx| {
5314                workspace.open_path((worktree_id, "1.txt"), true, cx)
5315            })
5316            .await
5317            .unwrap()
5318            .downcast::<Editor>()
5319            .unwrap();
5320
5321        let client_a_id = project_b.read_with(cx_b, |project, _| {
5322            project.collaborators().values().next().unwrap().peer_id
5323        });
5324        let client_b_id = project_a.read_with(cx_a, |project, _| {
5325            project.collaborators().values().next().unwrap().peer_id
5326        });
5327
5328        // When client B starts following client A, all visible view states are replicated to client B.
5329        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
5330        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
5331        workspace_b
5332            .update(cx_b, |workspace, cx| {
5333                workspace
5334                    .toggle_follow(&ToggleFollow(client_a_id), cx)
5335                    .unwrap()
5336            })
5337            .await
5338            .unwrap();
5339        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5340            workspace
5341                .active_item(cx)
5342                .unwrap()
5343                .downcast::<Editor>()
5344                .unwrap()
5345        });
5346        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
5347        assert_eq!(
5348            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
5349            Some((worktree_id, "2.txt").into())
5350        );
5351        assert_eq!(
5352            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5353            vec![2..3]
5354        );
5355        assert_eq!(
5356            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
5357            vec![0..1]
5358        );
5359
5360        // When client A activates a different editor, client B does so as well.
5361        workspace_a.update(cx_a, |workspace, cx| {
5362            workspace.activate_item(&editor_a1, cx)
5363        });
5364        workspace_b
5365            .condition(cx_b, |workspace, cx| {
5366                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5367            })
5368            .await;
5369
5370        // When client A navigates back and forth, client B does so as well.
5371        workspace_a
5372            .update(cx_a, |workspace, cx| {
5373                workspace::Pane::go_back(workspace, None, cx)
5374            })
5375            .await;
5376        workspace_b
5377            .condition(cx_b, |workspace, cx| {
5378                workspace.active_item(cx).unwrap().id() == editor_b2.id()
5379            })
5380            .await;
5381
5382        workspace_a
5383            .update(cx_a, |workspace, cx| {
5384                workspace::Pane::go_forward(workspace, None, cx)
5385            })
5386            .await;
5387        workspace_b
5388            .condition(cx_b, |workspace, cx| {
5389                workspace.active_item(cx).unwrap().id() == editor_b1.id()
5390            })
5391            .await;
5392
5393        // Changes to client A's editor are reflected on client B.
5394        editor_a1.update(cx_a, |editor, cx| {
5395            editor.select_ranges([1..1, 2..2], None, cx);
5396        });
5397        editor_b1
5398            .condition(cx_b, |editor, cx| {
5399                editor.selected_ranges(cx) == vec![1..1, 2..2]
5400            })
5401            .await;
5402
5403        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
5404        editor_b1
5405            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
5406            .await;
5407
5408        editor_a1.update(cx_a, |editor, cx| {
5409            editor.select_ranges([3..3], None, cx);
5410            editor.set_scroll_position(vec2f(0., 100.), cx);
5411        });
5412        editor_b1
5413            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
5414            .await;
5415
5416        // After unfollowing, client B stops receiving updates from client A.
5417        workspace_b.update(cx_b, |workspace, cx| {
5418            workspace.unfollow(&workspace.active_pane().clone(), cx)
5419        });
5420        workspace_a.update(cx_a, |workspace, cx| {
5421            workspace.activate_item(&editor_a2, cx)
5422        });
5423        cx_a.foreground().run_until_parked();
5424        assert_eq!(
5425            workspace_b.read_with(cx_b, |workspace, cx| workspace
5426                .active_item(cx)
5427                .unwrap()
5428                .id()),
5429            editor_b1.id()
5430        );
5431
5432        // Client A starts following client B.
5433        workspace_a
5434            .update(cx_a, |workspace, cx| {
5435                workspace
5436                    .toggle_follow(&ToggleFollow(client_b_id), cx)
5437                    .unwrap()
5438            })
5439            .await
5440            .unwrap();
5441        assert_eq!(
5442            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5443            Some(client_b_id)
5444        );
5445        assert_eq!(
5446            workspace_a.read_with(cx_a, |workspace, cx| workspace
5447                .active_item(cx)
5448                .unwrap()
5449                .id()),
5450            editor_a1.id()
5451        );
5452
5453        // Following interrupts when client B disconnects.
5454        client_b.disconnect(&cx_b.to_async()).unwrap();
5455        cx_a.foreground().run_until_parked();
5456        assert_eq!(
5457            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
5458            None
5459        );
5460    }
5461
5462    #[gpui::test(iterations = 10)]
5463    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5464        cx_a.foreground().forbid_parking();
5465        let fs = FakeFs::new(cx_a.background());
5466
5467        // 2 clients connect to a server.
5468        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5469        let mut client_a = server.create_client(cx_a, "user_a").await;
5470        let mut client_b = server.create_client(cx_b, "user_b").await;
5471        cx_a.update(editor::init);
5472        cx_b.update(editor::init);
5473
5474        // Client A shares a project.
5475        fs.insert_tree(
5476            "/a",
5477            json!({
5478                ".zed.toml": r#"collaborators = ["user_b"]"#,
5479                "1.txt": "one",
5480                "2.txt": "two",
5481                "3.txt": "three",
5482                "4.txt": "four",
5483            }),
5484        )
5485        .await;
5486        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5487        project_a
5488            .update(cx_a, |project, cx| project.share(cx))
5489            .await
5490            .unwrap();
5491
5492        // Client B joins the project.
5493        let project_b = client_b
5494            .build_remote_project(
5495                project_a
5496                    .read_with(cx_a, |project, _| project.remote_id())
5497                    .unwrap(),
5498                cx_b,
5499            )
5500            .await;
5501
5502        // Client A opens some editors.
5503        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5504        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
5505        let _editor_a1 = workspace_a
5506            .update(cx_a, |workspace, cx| {
5507                workspace.open_path((worktree_id, "1.txt"), true, cx)
5508            })
5509            .await
5510            .unwrap()
5511            .downcast::<Editor>()
5512            .unwrap();
5513
5514        // Client B opens an editor.
5515        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5516        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5517        let _editor_b1 = workspace_b
5518            .update(cx_b, |workspace, cx| {
5519                workspace.open_path((worktree_id, "2.txt"), true, cx)
5520            })
5521            .await
5522            .unwrap()
5523            .downcast::<Editor>()
5524            .unwrap();
5525
5526        // Clients A and B follow each other in split panes
5527        workspace_a
5528            .update(cx_a, |workspace, cx| {
5529                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5530                assert_ne!(*workspace.active_pane(), pane_a1);
5531                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
5532                workspace
5533                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5534                    .unwrap()
5535            })
5536            .await
5537            .unwrap();
5538        workspace_b
5539            .update(cx_b, |workspace, cx| {
5540                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
5541                assert_ne!(*workspace.active_pane(), pane_b1);
5542                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
5543                workspace
5544                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
5545                    .unwrap()
5546            })
5547            .await
5548            .unwrap();
5549
5550        workspace_a
5551            .update(cx_a, |workspace, cx| {
5552                workspace.activate_next_pane(cx);
5553                assert_eq!(*workspace.active_pane(), pane_a1);
5554                workspace.open_path((worktree_id, "3.txt"), true, cx)
5555            })
5556            .await
5557            .unwrap();
5558        workspace_b
5559            .update(cx_b, |workspace, cx| {
5560                workspace.activate_next_pane(cx);
5561                assert_eq!(*workspace.active_pane(), pane_b1);
5562                workspace.open_path((worktree_id, "4.txt"), true, cx)
5563            })
5564            .await
5565            .unwrap();
5566        cx_a.foreground().run_until_parked();
5567
5568        // Ensure leader updates don't change the active pane of followers
5569        workspace_a.read_with(cx_a, |workspace, _| {
5570            assert_eq!(*workspace.active_pane(), pane_a1);
5571        });
5572        workspace_b.read_with(cx_b, |workspace, _| {
5573            assert_eq!(*workspace.active_pane(), pane_b1);
5574        });
5575
5576        // Ensure peers following each other doesn't cause an infinite loop.
5577        assert_eq!(
5578            workspace_a.read_with(cx_a, |workspace, cx| workspace
5579                .active_item(cx)
5580                .unwrap()
5581                .project_path(cx)),
5582            Some((worktree_id, "3.txt").into())
5583        );
5584        workspace_a.update(cx_a, |workspace, cx| {
5585            assert_eq!(
5586                workspace.active_item(cx).unwrap().project_path(cx),
5587                Some((worktree_id, "3.txt").into())
5588            );
5589            workspace.activate_next_pane(cx);
5590            assert_eq!(
5591                workspace.active_item(cx).unwrap().project_path(cx),
5592                Some((worktree_id, "4.txt").into())
5593            );
5594        });
5595        workspace_b.update(cx_b, |workspace, cx| {
5596            assert_eq!(
5597                workspace.active_item(cx).unwrap().project_path(cx),
5598                Some((worktree_id, "4.txt").into())
5599            );
5600            workspace.activate_next_pane(cx);
5601            assert_eq!(
5602                workspace.active_item(cx).unwrap().project_path(cx),
5603                Some((worktree_id, "3.txt").into())
5604            );
5605        });
5606    }
5607
5608    #[gpui::test(iterations = 10)]
5609    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
5610        cx_a.foreground().forbid_parking();
5611        let fs = FakeFs::new(cx_a.background());
5612
5613        // 2 clients connect to a server.
5614        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
5615        let mut client_a = server.create_client(cx_a, "user_a").await;
5616        let mut client_b = server.create_client(cx_b, "user_b").await;
5617        cx_a.update(editor::init);
5618        cx_b.update(editor::init);
5619
5620        // Client A shares a project.
5621        fs.insert_tree(
5622            "/a",
5623            json!({
5624                ".zed.toml": r#"collaborators = ["user_b"]"#,
5625                "1.txt": "one",
5626                "2.txt": "two",
5627                "3.txt": "three",
5628            }),
5629        )
5630        .await;
5631        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
5632        project_a
5633            .update(cx_a, |project, cx| project.share(cx))
5634            .await
5635            .unwrap();
5636
5637        // Client B joins the project.
5638        let project_b = client_b
5639            .build_remote_project(
5640                project_a
5641                    .read_with(cx_a, |project, _| project.remote_id())
5642                    .unwrap(),
5643                cx_b,
5644            )
5645            .await;
5646
5647        // Client A opens some editors.
5648        let workspace_a = client_a.build_workspace(&project_a, cx_a);
5649        let _editor_a1 = workspace_a
5650            .update(cx_a, |workspace, cx| {
5651                workspace.open_path((worktree_id, "1.txt"), true, cx)
5652            })
5653            .await
5654            .unwrap()
5655            .downcast::<Editor>()
5656            .unwrap();
5657
5658        // Client B starts following client A.
5659        let workspace_b = client_b.build_workspace(&project_b, cx_b);
5660        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
5661        let leader_id = project_b.read_with(cx_b, |project, _| {
5662            project.collaborators().values().next().unwrap().peer_id
5663        });
5664        workspace_b
5665            .update(cx_b, |workspace, cx| {
5666                workspace
5667                    .toggle_follow(&ToggleFollow(leader_id), cx)
5668                    .unwrap()
5669            })
5670            .await
5671            .unwrap();
5672        assert_eq!(
5673            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5674            Some(leader_id)
5675        );
5676        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
5677            workspace
5678                .active_item(cx)
5679                .unwrap()
5680                .downcast::<Editor>()
5681                .unwrap()
5682        });
5683
5684        // When client B moves, it automatically stops following client A.
5685        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
5686        assert_eq!(
5687            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5688            None
5689        );
5690
5691        workspace_b
5692            .update(cx_b, |workspace, cx| {
5693                workspace
5694                    .toggle_follow(&ToggleFollow(leader_id), cx)
5695                    .unwrap()
5696            })
5697            .await
5698            .unwrap();
5699        assert_eq!(
5700            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5701            Some(leader_id)
5702        );
5703
5704        // When client B edits, it automatically stops following client A.
5705        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
5706        assert_eq!(
5707            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5708            None
5709        );
5710
5711        workspace_b
5712            .update(cx_b, |workspace, cx| {
5713                workspace
5714                    .toggle_follow(&ToggleFollow(leader_id), cx)
5715                    .unwrap()
5716            })
5717            .await
5718            .unwrap();
5719        assert_eq!(
5720            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5721            Some(leader_id)
5722        );
5723
5724        // When client B scrolls, it automatically stops following client A.
5725        editor_b2.update(cx_b, |editor, cx| {
5726            editor.set_scroll_position(vec2f(0., 3.), cx)
5727        });
5728        assert_eq!(
5729            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5730            None
5731        );
5732
5733        workspace_b
5734            .update(cx_b, |workspace, cx| {
5735                workspace
5736                    .toggle_follow(&ToggleFollow(leader_id), cx)
5737                    .unwrap()
5738            })
5739            .await
5740            .unwrap();
5741        assert_eq!(
5742            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5743            Some(leader_id)
5744        );
5745
5746        // When client B activates a different pane, it continues following client A in the original pane.
5747        workspace_b.update(cx_b, |workspace, cx| {
5748            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
5749        });
5750        assert_eq!(
5751            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5752            Some(leader_id)
5753        );
5754
5755        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
5756        assert_eq!(
5757            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5758            Some(leader_id)
5759        );
5760
5761        // When client B activates a different item in the original pane, it automatically stops following client A.
5762        workspace_b
5763            .update(cx_b, |workspace, cx| {
5764                workspace.open_path((worktree_id, "2.txt"), true, cx)
5765            })
5766            .await
5767            .unwrap();
5768        assert_eq!(
5769            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
5770            None
5771        );
5772    }
5773
5774    #[gpui::test(iterations = 100)]
5775    async fn test_random_collaboration(
5776        cx: &mut TestAppContext,
5777        deterministic: Arc<Deterministic>,
5778        rng: StdRng,
5779    ) {
5780        cx.foreground().forbid_parking();
5781        let max_peers = env::var("MAX_PEERS")
5782            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
5783            .unwrap_or(5);
5784        assert!(max_peers <= 5);
5785
5786        let max_operations = env::var("OPERATIONS")
5787            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
5788            .unwrap_or(10);
5789
5790        let rng = Arc::new(Mutex::new(rng));
5791
5792        let guest_lang_registry = Arc::new(LanguageRegistry::test());
5793        let host_language_registry = Arc::new(LanguageRegistry::test());
5794
5795        let fs = FakeFs::new(cx.background());
5796        fs.insert_tree(
5797            "/_collab",
5798            json!({
5799                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4"]"#
5800            }),
5801        )
5802        .await;
5803
5804        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
5805        let mut clients = Vec::new();
5806        let mut user_ids = Vec::new();
5807        let mut op_start_signals = Vec::new();
5808        let files = Arc::new(Mutex::new(Vec::new()));
5809
5810        let mut next_entity_id = 100000;
5811        let mut host_cx = TestAppContext::new(
5812            cx.foreground_platform(),
5813            cx.platform(),
5814            deterministic.build_foreground(next_entity_id),
5815            deterministic.build_background(),
5816            cx.font_cache(),
5817            cx.leak_detector(),
5818            next_entity_id,
5819        );
5820        let host = server.create_client(&mut host_cx, "host").await;
5821        let host_project = host_cx.update(|cx| {
5822            Project::local(
5823                host.client.clone(),
5824                host.user_store.clone(),
5825                host_language_registry.clone(),
5826                fs.clone(),
5827                cx,
5828            )
5829        });
5830        let host_project_id = host_project
5831            .update(&mut host_cx, |p, _| p.next_remote_id())
5832            .await;
5833
5834        let (collab_worktree, _) = host_project
5835            .update(&mut host_cx, |project, cx| {
5836                project.find_or_create_local_worktree("/_collab", true, cx)
5837            })
5838            .await
5839            .unwrap();
5840        collab_worktree
5841            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
5842            .await;
5843        host_project
5844            .update(&mut host_cx, |project, cx| project.share(cx))
5845            .await
5846            .unwrap();
5847
5848        // Set up fake language servers.
5849        let mut language = Language::new(
5850            LanguageConfig {
5851                name: "Rust".into(),
5852                path_suffixes: vec!["rs".to_string()],
5853                ..Default::default()
5854            },
5855            None,
5856        );
5857        let _fake_servers = language.set_fake_lsp_adapter(FakeLspAdapter {
5858            name: "the-fake-language-server",
5859            capabilities: lsp::LanguageServer::full_capabilities(),
5860            initializer: Some(Box::new({
5861                let rng = rng.clone();
5862                let files = files.clone();
5863                let project = host_project.downgrade();
5864                move |fake_server: &mut FakeLanguageServer| {
5865                    fake_server.handle_request::<lsp::request::Completion, _, _>(
5866                        |_, _| async move {
5867                            Ok(Some(lsp::CompletionResponse::Array(vec![
5868                                lsp::CompletionItem {
5869                                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5870                                        range: lsp::Range::new(
5871                                            lsp::Position::new(0, 0),
5872                                            lsp::Position::new(0, 0),
5873                                        ),
5874                                        new_text: "the-new-text".to_string(),
5875                                    })),
5876                                    ..Default::default()
5877                                },
5878                            ])))
5879                        },
5880                    );
5881
5882                    fake_server.handle_request::<lsp::request::CodeActionRequest, _, _>(
5883                        |_, _| async move {
5884                            Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction(
5885                                lsp::CodeAction {
5886                                    title: "the-code-action".to_string(),
5887                                    ..Default::default()
5888                                },
5889                            )]))
5890                        },
5891                    );
5892
5893                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _, _>(
5894                        |params, _| async move {
5895                            Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5896                                params.position,
5897                                params.position,
5898                            ))))
5899                        },
5900                    );
5901
5902                    fake_server.handle_request::<lsp::request::GotoDefinition, _, _>({
5903                        let files = files.clone();
5904                        let rng = rng.clone();
5905                        move |_, _| {
5906                            let files = files.clone();
5907                            let rng = rng.clone();
5908                            async move {
5909                                let files = files.lock();
5910                                let mut rng = rng.lock();
5911                                let count = rng.gen_range::<usize, _>(1..3);
5912                                let files = (0..count)
5913                                    .map(|_| files.choose(&mut *rng).unwrap())
5914                                    .collect::<Vec<_>>();
5915                                log::info!("LSP: Returning definitions in files {:?}", &files);
5916                                Ok(Some(lsp::GotoDefinitionResponse::Array(
5917                                    files
5918                                        .into_iter()
5919                                        .map(|file| lsp::Location {
5920                                            uri: lsp::Url::from_file_path(file).unwrap(),
5921                                            range: Default::default(),
5922                                        })
5923                                        .collect(),
5924                                )))
5925                            }
5926                        }
5927                    });
5928
5929                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _, _>({
5930                        let rng = rng.clone();
5931                        let project = project.clone();
5932                        move |params, mut cx| {
5933                            let highlights = if let Some(project) = project.upgrade(&cx) {
5934                                project.update(&mut cx, |project, cx| {
5935                                    let path = params
5936                                        .text_document_position_params
5937                                        .text_document
5938                                        .uri
5939                                        .to_file_path()
5940                                        .unwrap();
5941                                    let (worktree, relative_path) =
5942                                        project.find_local_worktree(&path, cx)?;
5943                                    let project_path =
5944                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5945                                    let buffer =
5946                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5947
5948                                    let mut highlights = Vec::new();
5949                                    let highlight_count = rng.lock().gen_range(1..=5);
5950                                    let mut prev_end = 0;
5951                                    for _ in 0..highlight_count {
5952                                        let range =
5953                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5954
5955                                        highlights.push(lsp::DocumentHighlight {
5956                                            range: range_to_lsp(range.to_point_utf16(buffer)),
5957                                            kind: Some(lsp::DocumentHighlightKind::READ),
5958                                        });
5959                                        prev_end = range.end;
5960                                    }
5961                                    Some(highlights)
5962                                })
5963                            } else {
5964                                None
5965                            };
5966                            async move { Ok(highlights) }
5967                        }
5968                    });
5969                }
5970            })),
5971            ..Default::default()
5972        });
5973        host_language_registry.add(Arc::new(language));
5974
5975        let op_start_signal = futures::channel::mpsc::unbounded();
5976        user_ids.push(host.current_user_id(&host_cx));
5977        op_start_signals.push(op_start_signal.0);
5978        clients.push(host_cx.foreground().spawn(host.simulate_host(
5979            host_project,
5980            files,
5981            op_start_signal.1,
5982            rng.clone(),
5983            host_cx,
5984        )));
5985
5986        let disconnect_host_at = if rng.lock().gen_bool(0.2) {
5987            rng.lock().gen_range(0..max_operations)
5988        } else {
5989            max_operations
5990        };
5991        let mut available_guests = vec![
5992            "guest-1".to_string(),
5993            "guest-2".to_string(),
5994            "guest-3".to_string(),
5995            "guest-4".to_string(),
5996        ];
5997        let mut operations = 0;
5998        while operations < max_operations {
5999            if operations == disconnect_host_at {
6000                server.disconnect_client(user_ids[0]);
6001                cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6002                drop(op_start_signals);
6003                let mut clients = futures::future::join_all(clients).await;
6004                cx.foreground().run_until_parked();
6005
6006                let (host, mut host_cx, host_err) = clients.remove(0);
6007                if let Some(host_err) = host_err {
6008                    log::error!("host error - {}", host_err);
6009                }
6010                host.project
6011                    .as_ref()
6012                    .unwrap()
6013                    .read_with(&host_cx, |project, _| assert!(!project.is_shared()));
6014                for (guest, mut guest_cx, guest_err) in clients {
6015                    if let Some(guest_err) = guest_err {
6016                        log::error!("{} error - {}", guest.username, guest_err);
6017                    }
6018                    // TODO
6019                    // let contacts = server
6020                    //     .store
6021                    //     .read()
6022                    //     .await
6023                    //     .contacts_for_user(guest.current_user_id(&guest_cx));
6024                    // assert!(!contacts
6025                    //     .iter()
6026                    //     .flat_map(|contact| &contact.projects)
6027                    //     .any(|project| project.id == host_project_id));
6028                    guest
6029                        .project
6030                        .as_ref()
6031                        .unwrap()
6032                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6033                    guest_cx.update(|_| drop(guest));
6034                }
6035                host_cx.update(|_| drop(host));
6036
6037                return;
6038            }
6039
6040            let distribution = rng.lock().gen_range(0..100);
6041            match distribution {
6042                0..=19 if !available_guests.is_empty() => {
6043                    let guest_ix = rng.lock().gen_range(0..available_guests.len());
6044                    let guest_username = available_guests.remove(guest_ix);
6045                    log::info!("Adding new connection for {}", guest_username);
6046                    next_entity_id += 100000;
6047                    let mut guest_cx = TestAppContext::new(
6048                        cx.foreground_platform(),
6049                        cx.platform(),
6050                        deterministic.build_foreground(next_entity_id),
6051                        deterministic.build_background(),
6052                        cx.font_cache(),
6053                        cx.leak_detector(),
6054                        next_entity_id,
6055                    );
6056                    let guest = server.create_client(&mut guest_cx, &guest_username).await;
6057                    let guest_project = Project::remote(
6058                        host_project_id,
6059                        guest.client.clone(),
6060                        guest.user_store.clone(),
6061                        guest_lang_registry.clone(),
6062                        FakeFs::new(cx.background()),
6063                        &mut guest_cx.to_async(),
6064                    )
6065                    .await
6066                    .unwrap();
6067                    let op_start_signal = futures::channel::mpsc::unbounded();
6068                    user_ids.push(guest.current_user_id(&guest_cx));
6069                    op_start_signals.push(op_start_signal.0);
6070                    clients.push(guest_cx.foreground().spawn(guest.simulate_guest(
6071                        guest_username.clone(),
6072                        guest_project,
6073                        op_start_signal.1,
6074                        rng.clone(),
6075                        guest_cx,
6076                    )));
6077
6078                    log::info!("Added connection for {}", guest_username);
6079                    operations += 1;
6080                }
6081                20..=29 if clients.len() > 1 => {
6082                    log::info!("Removing guest");
6083                    let guest_ix = rng.lock().gen_range(1..clients.len());
6084                    let removed_guest_id = user_ids.remove(guest_ix);
6085                    let guest = clients.remove(guest_ix);
6086                    op_start_signals.remove(guest_ix);
6087                    server.disconnect_client(removed_guest_id);
6088                    cx.foreground().advance_clock(RECEIVE_TIMEOUT);
6089                    let (guest, mut guest_cx, guest_err) = guest.await;
6090                    if let Some(guest_err) = guest_err {
6091                        log::error!("{} error - {}", guest.username, guest_err);
6092                    }
6093                    guest
6094                        .project
6095                        .as_ref()
6096                        .unwrap()
6097                        .read_with(&guest_cx, |project, _| assert!(project.is_read_only()));
6098                    // TODO
6099                    // for user_id in &user_ids {
6100                    //     for contact in server.store.read().await.contacts_for_user(*user_id) {
6101                    //         assert_ne!(
6102                    //             contact.user_id, removed_guest_id.0 as u64,
6103                    //             "removed guest is still a contact of another peer"
6104                    //         );
6105                    //         for project in contact.projects {
6106                    //             for project_guest_id in project.guests {
6107                    //                 assert_ne!(
6108                    //                     project_guest_id, removed_guest_id.0 as u64,
6109                    //                     "removed guest appears as still participating on a project"
6110                    //                 );
6111                    //             }
6112                    //         }
6113                    //     }
6114                    // }
6115
6116                    log::info!("{} removed", guest.username);
6117                    available_guests.push(guest.username.clone());
6118                    guest_cx.update(|_| drop(guest));
6119
6120                    operations += 1;
6121                }
6122                _ => {
6123                    while operations < max_operations && rng.lock().gen_bool(0.7) {
6124                        op_start_signals
6125                            .choose(&mut *rng.lock())
6126                            .unwrap()
6127                            .unbounded_send(())
6128                            .unwrap();
6129                        operations += 1;
6130                    }
6131
6132                    if rng.lock().gen_bool(0.8) {
6133                        cx.foreground().run_until_parked();
6134                    }
6135                }
6136            }
6137        }
6138
6139        drop(op_start_signals);
6140        let mut clients = futures::future::join_all(clients).await;
6141        cx.foreground().run_until_parked();
6142
6143        let (host_client, mut host_cx, host_err) = clients.remove(0);
6144        if let Some(host_err) = host_err {
6145            panic!("host error - {}", host_err);
6146        }
6147        let host_project = host_client.project.as_ref().unwrap();
6148        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
6149            project
6150                .worktrees(cx)
6151                .map(|worktree| {
6152                    let snapshot = worktree.read(cx).snapshot();
6153                    (snapshot.id(), snapshot)
6154                })
6155                .collect::<BTreeMap<_, _>>()
6156        });
6157
6158        host_client
6159            .project
6160            .as_ref()
6161            .unwrap()
6162            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
6163
6164        for (guest_client, mut guest_cx, guest_err) in clients.into_iter() {
6165            if let Some(guest_err) = guest_err {
6166                panic!("{} error - {}", guest_client.username, guest_err);
6167            }
6168            let worktree_snapshots =
6169                guest_client
6170                    .project
6171                    .as_ref()
6172                    .unwrap()
6173                    .read_with(&guest_cx, |project, cx| {
6174                        project
6175                            .worktrees(cx)
6176                            .map(|worktree| {
6177                                let worktree = worktree.read(cx);
6178                                (worktree.id(), worktree.snapshot())
6179                            })
6180                            .collect::<BTreeMap<_, _>>()
6181                    });
6182
6183            assert_eq!(
6184                worktree_snapshots.keys().collect::<Vec<_>>(),
6185                host_worktree_snapshots.keys().collect::<Vec<_>>(),
6186                "{} has different worktrees than the host",
6187                guest_client.username
6188            );
6189            for (id, host_snapshot) in &host_worktree_snapshots {
6190                let guest_snapshot = &worktree_snapshots[id];
6191                assert_eq!(
6192                    guest_snapshot.root_name(),
6193                    host_snapshot.root_name(),
6194                    "{} has different root name than the host for worktree {}",
6195                    guest_client.username,
6196                    id
6197                );
6198                assert_eq!(
6199                    guest_snapshot.entries(false).collect::<Vec<_>>(),
6200                    host_snapshot.entries(false).collect::<Vec<_>>(),
6201                    "{} has different snapshot than the host for worktree {}",
6202                    guest_client.username,
6203                    id
6204                );
6205                assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id());
6206            }
6207
6208            guest_client
6209                .project
6210                .as_ref()
6211                .unwrap()
6212                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
6213
6214            for guest_buffer in &guest_client.buffers {
6215                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
6216                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
6217                    project.buffer_for_id(buffer_id, cx).expect(&format!(
6218                        "host does not have buffer for guest:{}, peer:{}, id:{}",
6219                        guest_client.username, guest_client.peer_id, buffer_id
6220                    ))
6221                });
6222                let path = host_buffer
6223                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
6224
6225                assert_eq!(
6226                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
6227                    0,
6228                    "{}, buffer {}, path {:?} has deferred operations",
6229                    guest_client.username,
6230                    buffer_id,
6231                    path,
6232                );
6233                assert_eq!(
6234                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
6235                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
6236                    "{}, buffer {}, path {:?}, differs from the host's buffer",
6237                    guest_client.username,
6238                    buffer_id,
6239                    path
6240                );
6241            }
6242
6243            guest_cx.update(|_| drop(guest_client));
6244        }
6245
6246        host_cx.update(|_| drop(host_client));
6247    }
6248
6249    struct TestServer {
6250        peer: Arc<Peer>,
6251        app_state: Arc<AppState>,
6252        server: Arc<Server>,
6253        foreground: Rc<executor::Foreground>,
6254        notifications: mpsc::UnboundedReceiver<()>,
6255        connection_killers: Arc<Mutex<HashMap<UserId, Arc<AtomicBool>>>>,
6256        forbid_connections: Arc<AtomicBool>,
6257        _test_db: TestDb,
6258    }
6259
6260    impl TestServer {
6261        async fn start(
6262            foreground: Rc<executor::Foreground>,
6263            background: Arc<executor::Background>,
6264        ) -> Self {
6265            let test_db = TestDb::fake(background);
6266            let app_state = Self::build_app_state(&test_db).await;
6267            let peer = Peer::new();
6268            let notifications = mpsc::unbounded();
6269            let server = Server::new(app_state.clone(), Some(notifications.0));
6270            Self {
6271                peer,
6272                app_state,
6273                server,
6274                foreground,
6275                notifications: notifications.1,
6276                connection_killers: Default::default(),
6277                forbid_connections: Default::default(),
6278                _test_db: test_db,
6279            }
6280        }
6281
6282        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
6283            cx.update(|cx| {
6284                let settings = Settings::test(cx);
6285                cx.set_global(settings);
6286            });
6287
6288            let http = FakeHttpClient::with_404_response();
6289            let user_id =
6290                if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await {
6291                    user.id
6292                } else {
6293                    self.app_state.db.create_user(name, false).await.unwrap()
6294                };
6295            let client_name = name.to_string();
6296            let mut client = Client::new(http.clone());
6297            let server = self.server.clone();
6298            let connection_killers = self.connection_killers.clone();
6299            let forbid_connections = self.forbid_connections.clone();
6300            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
6301
6302            Arc::get_mut(&mut client)
6303                .unwrap()
6304                .override_authenticate(move |cx| {
6305                    cx.spawn(|_| async move {
6306                        let access_token = "the-token".to_string();
6307                        Ok(Credentials {
6308                            user_id: user_id.0 as u64,
6309                            access_token,
6310                        })
6311                    })
6312                })
6313                .override_establish_connection(move |credentials, cx| {
6314                    assert_eq!(credentials.user_id, user_id.0 as u64);
6315                    assert_eq!(credentials.access_token, "the-token");
6316
6317                    let server = server.clone();
6318                    let connection_killers = connection_killers.clone();
6319                    let forbid_connections = forbid_connections.clone();
6320                    let client_name = client_name.clone();
6321                    let connection_id_tx = connection_id_tx.clone();
6322                    cx.spawn(move |cx| async move {
6323                        if forbid_connections.load(SeqCst) {
6324                            Err(EstablishConnectionError::other(anyhow!(
6325                                "server is forbidding connections"
6326                            )))
6327                        } else {
6328                            let (client_conn, server_conn, killed) =
6329                                Connection::in_memory(cx.background());
6330                            connection_killers.lock().insert(user_id, killed);
6331                            cx.background()
6332                                .spawn(server.handle_connection(
6333                                    server_conn,
6334                                    client_name,
6335                                    user_id,
6336                                    Some(connection_id_tx),
6337                                    cx.background(),
6338                                ))
6339                                .detach();
6340                            Ok(client_conn)
6341                        }
6342                    })
6343                });
6344
6345            client
6346                .authenticate_and_connect(false, &cx.to_async())
6347                .await
6348                .unwrap();
6349
6350            Channel::init(&client);
6351            Project::init(&client);
6352            cx.update(|cx| {
6353                workspace::init(&client, cx);
6354            });
6355
6356            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
6357            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
6358
6359            let client = TestClient {
6360                client,
6361                peer_id,
6362                username: name.to_string(),
6363                user_store,
6364                language_registry: Arc::new(LanguageRegistry::test()),
6365                project: Default::default(),
6366                buffers: Default::default(),
6367            };
6368            client.wait_for_current_user(cx).await;
6369            client
6370        }
6371
6372        fn disconnect_client(&self, user_id: UserId) {
6373            self.connection_killers
6374                .lock()
6375                .remove(&user_id)
6376                .unwrap()
6377                .store(true, SeqCst);
6378        }
6379
6380        fn forbid_connections(&self) {
6381            self.forbid_connections.store(true, SeqCst);
6382        }
6383
6384        fn allow_connections(&self) {
6385            self.forbid_connections.store(false, SeqCst);
6386        }
6387
6388        async fn make_contacts(&self, mut clients: Vec<(&TestClient, &mut TestAppContext)>) {
6389            while let Some((client_a, cx_a)) = clients.pop() {
6390                for (client_b, cx_b) in &mut clients {
6391                    client_a
6392                        .user_store
6393                        .update(cx_a, |store, _| {
6394                            store.request_contact(client_b.user_id().unwrap())
6395                        })
6396                        .await
6397                        .unwrap();
6398                    cx_a.foreground().run_until_parked();
6399                    client_b
6400                        .user_store
6401                        .update(*cx_b, |store, _| {
6402                            store.respond_to_contact_request(client_a.user_id().unwrap(), true)
6403                        })
6404                        .await
6405                        .unwrap();
6406                }
6407            }
6408        }
6409
6410        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
6411            Arc::new(AppState {
6412                db: test_db.db().clone(),
6413                api_token: Default::default(),
6414            })
6415        }
6416
6417        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
6418            self.server.store.read().await
6419        }
6420
6421        async fn condition<F>(&mut self, mut predicate: F)
6422        where
6423            F: FnMut(&Store) -> bool,
6424        {
6425            assert!(
6426                self.foreground.parking_forbidden(),
6427                "you must call forbid_parking to use server conditions so we don't block indefinitely"
6428            );
6429            while !(predicate)(&*self.server.store.read().await) {
6430                self.foreground.start_waiting();
6431                self.notifications.next().await;
6432                self.foreground.finish_waiting();
6433            }
6434        }
6435    }
6436
6437    impl Deref for TestServer {
6438        type Target = Server;
6439
6440        fn deref(&self) -> &Self::Target {
6441            &self.server
6442        }
6443    }
6444
6445    impl Drop for TestServer {
6446        fn drop(&mut self) {
6447            self.peer.reset();
6448        }
6449    }
6450
6451    struct TestClient {
6452        client: Arc<Client>,
6453        username: String,
6454        pub peer_id: PeerId,
6455        pub user_store: ModelHandle<UserStore>,
6456        language_registry: Arc<LanguageRegistry>,
6457        project: Option<ModelHandle<Project>>,
6458        buffers: HashSet<ModelHandle<language::Buffer>>,
6459    }
6460
6461    impl Deref for TestClient {
6462        type Target = Arc<Client>;
6463
6464        fn deref(&self) -> &Self::Target {
6465            &self.client
6466        }
6467    }
6468
6469    struct ContactsSummary {
6470        pub current: Vec<String>,
6471        pub outgoing_requests: Vec<String>,
6472        pub incoming_requests: Vec<String>,
6473    }
6474
6475    impl TestClient {
6476        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
6477            UserId::from_proto(
6478                self.user_store
6479                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
6480            )
6481        }
6482
6483        async fn wait_for_current_user(&self, cx: &TestAppContext) {
6484            let mut authed_user = self
6485                .user_store
6486                .read_with(cx, |user_store, _| user_store.watch_current_user());
6487            while authed_user.next().await.unwrap().is_none() {}
6488        }
6489
6490        fn clear_contacts(&self, cx: &mut TestAppContext) {
6491            self.user_store.update(cx, |store, _| {
6492                store.clear_contacts();
6493            });
6494        }
6495
6496        fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary {
6497            self.user_store.read_with(cx, |store, _| ContactsSummary {
6498                current: store
6499                    .contacts()
6500                    .iter()
6501                    .map(|contact| contact.user.github_login.clone())
6502                    .collect(),
6503                outgoing_requests: store
6504                    .outgoing_contact_requests()
6505                    .iter()
6506                    .map(|user| user.github_login.clone())
6507                    .collect(),
6508                incoming_requests: store
6509                    .incoming_contact_requests()
6510                    .iter()
6511                    .map(|user| user.github_login.clone())
6512                    .collect(),
6513            })
6514        }
6515
6516        async fn build_local_project(
6517            &mut self,
6518            fs: Arc<FakeFs>,
6519            root_path: impl AsRef<Path>,
6520            cx: &mut TestAppContext,
6521        ) -> (ModelHandle<Project>, WorktreeId) {
6522            let project = cx.update(|cx| {
6523                Project::local(
6524                    self.client.clone(),
6525                    self.user_store.clone(),
6526                    self.language_registry.clone(),
6527                    fs,
6528                    cx,
6529                )
6530            });
6531            self.project = Some(project.clone());
6532            let (worktree, _) = project
6533                .update(cx, |p, cx| {
6534                    p.find_or_create_local_worktree(root_path, true, cx)
6535                })
6536                .await
6537                .unwrap();
6538            worktree
6539                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
6540                .await;
6541            project
6542                .update(cx, |project, _| project.next_remote_id())
6543                .await;
6544            (project, worktree.read_with(cx, |tree, _| tree.id()))
6545        }
6546
6547        async fn build_remote_project(
6548            &mut self,
6549            project_id: u64,
6550            cx: &mut TestAppContext,
6551        ) -> ModelHandle<Project> {
6552            let project = Project::remote(
6553                project_id,
6554                self.client.clone(),
6555                self.user_store.clone(),
6556                self.language_registry.clone(),
6557                FakeFs::new(cx.background()),
6558                &mut cx.to_async(),
6559            )
6560            .await
6561            .unwrap();
6562            self.project = Some(project.clone());
6563            project
6564        }
6565
6566        fn build_workspace(
6567            &self,
6568            project: &ModelHandle<Project>,
6569            cx: &mut TestAppContext,
6570        ) -> ViewHandle<Workspace> {
6571            let (window_id, _) = cx.add_window(|_| EmptyView);
6572            cx.add_view(window_id, |cx| {
6573                let fs = project.read(cx).fs().clone();
6574                Workspace::new(
6575                    &WorkspaceParams {
6576                        fs,
6577                        project: project.clone(),
6578                        user_store: self.user_store.clone(),
6579                        languages: self.language_registry.clone(),
6580                        themes: ThemeRegistry::new((), cx.font_cache().clone()),
6581                        channel_list: cx.add_model(|cx| {
6582                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
6583                        }),
6584                        client: self.client.clone(),
6585                    },
6586                    cx,
6587                )
6588            })
6589        }
6590
6591        async fn simulate_host(
6592            mut self,
6593            project: ModelHandle<Project>,
6594            files: Arc<Mutex<Vec<PathBuf>>>,
6595            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6596            rng: Arc<Mutex<StdRng>>,
6597            mut cx: TestAppContext,
6598        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6599            async fn simulate_host_internal(
6600                client: &mut TestClient,
6601                project: ModelHandle<Project>,
6602                files: Arc<Mutex<Vec<PathBuf>>>,
6603                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6604                rng: Arc<Mutex<StdRng>>,
6605                cx: &mut TestAppContext,
6606            ) -> anyhow::Result<()> {
6607                let fs = project.read_with(cx, |project, _| project.fs().clone());
6608
6609                while op_start_signal.next().await.is_some() {
6610                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
6611                    match distribution {
6612                        0..=20 if !files.lock().is_empty() => {
6613                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6614                            let mut path = path.as_path();
6615                            while let Some(parent_path) = path.parent() {
6616                                path = parent_path;
6617                                if rng.lock().gen() {
6618                                    break;
6619                                }
6620                            }
6621
6622                            log::info!("Host: find/create local worktree {:?}", path);
6623                            let find_or_create_worktree = project.update(cx, |project, cx| {
6624                                project.find_or_create_local_worktree(path, true, cx)
6625                            });
6626                            if rng.lock().gen() {
6627                                cx.background().spawn(find_or_create_worktree).detach();
6628                            } else {
6629                                find_or_create_worktree.await?;
6630                            }
6631                        }
6632                        10..=80 if !files.lock().is_empty() => {
6633                            let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6634                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
6635                                let (worktree, path) = project
6636                                    .update(cx, |project, cx| {
6637                                        project.find_or_create_local_worktree(
6638                                            file.clone(),
6639                                            true,
6640                                            cx,
6641                                        )
6642                                    })
6643                                    .await?;
6644                                let project_path =
6645                                    worktree.read_with(cx, |worktree, _| (worktree.id(), path));
6646                                log::info!(
6647                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
6648                                    file,
6649                                    project_path.0,
6650                                    project_path.1
6651                                );
6652                                let buffer = project
6653                                    .update(cx, |project, cx| project.open_buffer(project_path, cx))
6654                                    .await
6655                                    .unwrap();
6656                                client.buffers.insert(buffer.clone());
6657                                buffer
6658                            } else {
6659                                client
6660                                    .buffers
6661                                    .iter()
6662                                    .choose(&mut *rng.lock())
6663                                    .unwrap()
6664                                    .clone()
6665                            };
6666
6667                            if rng.lock().gen_bool(0.1) {
6668                                cx.update(|cx| {
6669                                    log::info!(
6670                                        "Host: dropping buffer {:?}",
6671                                        buffer.read(cx).file().unwrap().full_path(cx)
6672                                    );
6673                                    client.buffers.remove(&buffer);
6674                                    drop(buffer);
6675                                });
6676                            } else {
6677                                buffer.update(cx, |buffer, cx| {
6678                                    log::info!(
6679                                        "Host: updating buffer {:?} ({})",
6680                                        buffer.file().unwrap().full_path(cx),
6681                                        buffer.remote_id()
6682                                    );
6683
6684                                    if rng.lock().gen_bool(0.7) {
6685                                        buffer.randomly_edit(&mut *rng.lock(), 5, cx);
6686                                    } else {
6687                                        buffer.randomly_undo_redo(&mut *rng.lock(), cx);
6688                                    }
6689                                });
6690                            }
6691                        }
6692                        _ => loop {
6693                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
6694                            let mut path = PathBuf::new();
6695                            path.push("/");
6696                            for _ in 0..path_component_count {
6697                                let letter = rng.lock().gen_range(b'a'..=b'z');
6698                                path.push(std::str::from_utf8(&[letter]).unwrap());
6699                            }
6700                            path.set_extension("rs");
6701                            let parent_path = path.parent().unwrap();
6702
6703                            log::info!("Host: creating file {:?}", path,);
6704
6705                            if fs.create_dir(&parent_path).await.is_ok()
6706                                && fs.create_file(&path, Default::default()).await.is_ok()
6707                            {
6708                                files.lock().push(path);
6709                                break;
6710                            } else {
6711                                log::info!("Host: cannot create file");
6712                            }
6713                        },
6714                    }
6715
6716                    cx.background().simulate_random_delay().await;
6717                }
6718
6719                Ok(())
6720            }
6721
6722            let result = simulate_host_internal(
6723                &mut self,
6724                project.clone(),
6725                files,
6726                op_start_signal,
6727                rng,
6728                &mut cx,
6729            )
6730            .await;
6731            log::info!("Host done");
6732            self.project = Some(project);
6733            (self, cx, result.err())
6734        }
6735
6736        pub async fn simulate_guest(
6737            mut self,
6738            guest_username: String,
6739            project: ModelHandle<Project>,
6740            op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6741            rng: Arc<Mutex<StdRng>>,
6742            mut cx: TestAppContext,
6743        ) -> (Self, TestAppContext, Option<anyhow::Error>) {
6744            async fn simulate_guest_internal(
6745                client: &mut TestClient,
6746                guest_username: &str,
6747                project: ModelHandle<Project>,
6748                mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>,
6749                rng: Arc<Mutex<StdRng>>,
6750                cx: &mut TestAppContext,
6751            ) -> anyhow::Result<()> {
6752                while op_start_signal.next().await.is_some() {
6753                    let buffer = if client.buffers.is_empty() || rng.lock().gen() {
6754                        let worktree = if let Some(worktree) =
6755                            project.read_with(cx, |project, cx| {
6756                                project
6757                                    .worktrees(&cx)
6758                                    .filter(|worktree| {
6759                                        let worktree = worktree.read(cx);
6760                                        worktree.is_visible()
6761                                            && worktree.entries(false).any(|e| e.is_file())
6762                                    })
6763                                    .choose(&mut *rng.lock())
6764                            }) {
6765                            worktree
6766                        } else {
6767                            cx.background().simulate_random_delay().await;
6768                            continue;
6769                        };
6770
6771                        let (worktree_root_name, project_path) =
6772                            worktree.read_with(cx, |worktree, _| {
6773                                let entry = worktree
6774                                    .entries(false)
6775                                    .filter(|e| e.is_file())
6776                                    .choose(&mut *rng.lock())
6777                                    .unwrap();
6778                                (
6779                                    worktree.root_name().to_string(),
6780                                    (worktree.id(), entry.path.clone()),
6781                                )
6782                            });
6783                        log::info!(
6784                            "{}: opening path {:?} in worktree {} ({})",
6785                            guest_username,
6786                            project_path.1,
6787                            project_path.0,
6788                            worktree_root_name,
6789                        );
6790                        let buffer = project
6791                            .update(cx, |project, cx| {
6792                                project.open_buffer(project_path.clone(), cx)
6793                            })
6794                            .await?;
6795                        log::info!(
6796                            "{}: opened path {:?} in worktree {} ({}) with buffer id {}",
6797                            guest_username,
6798                            project_path.1,
6799                            project_path.0,
6800                            worktree_root_name,
6801                            buffer.read_with(cx, |buffer, _| buffer.remote_id())
6802                        );
6803                        client.buffers.insert(buffer.clone());
6804                        buffer
6805                    } else {
6806                        client
6807                            .buffers
6808                            .iter()
6809                            .choose(&mut *rng.lock())
6810                            .unwrap()
6811                            .clone()
6812                    };
6813
6814                    let choice = rng.lock().gen_range(0..100);
6815                    match choice {
6816                        0..=9 => {
6817                            cx.update(|cx| {
6818                                log::info!(
6819                                    "{}: dropping buffer {:?}",
6820                                    guest_username,
6821                                    buffer.read(cx).file().unwrap().full_path(cx)
6822                                );
6823                                client.buffers.remove(&buffer);
6824                                drop(buffer);
6825                            });
6826                        }
6827                        10..=19 => {
6828                            let completions = project.update(cx, |project, cx| {
6829                                log::info!(
6830                                    "{}: requesting completions for buffer {} ({:?})",
6831                                    guest_username,
6832                                    buffer.read(cx).remote_id(),
6833                                    buffer.read(cx).file().unwrap().full_path(cx)
6834                                );
6835                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6836                                project.completions(&buffer, offset, cx)
6837                            });
6838                            let completions = cx.background().spawn(async move {
6839                                completions
6840                                    .await
6841                                    .map_err(|err| anyhow!("completions request failed: {:?}", err))
6842                            });
6843                            if rng.lock().gen_bool(0.3) {
6844                                log::info!("{}: detaching completions request", guest_username);
6845                                cx.update(|cx| completions.detach_and_log_err(cx));
6846                            } else {
6847                                completions.await?;
6848                            }
6849                        }
6850                        20..=29 => {
6851                            let code_actions = project.update(cx, |project, cx| {
6852                                log::info!(
6853                                    "{}: requesting code actions for buffer {} ({:?})",
6854                                    guest_username,
6855                                    buffer.read(cx).remote_id(),
6856                                    buffer.read(cx).file().unwrap().full_path(cx)
6857                                );
6858                                let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
6859                                project.code_actions(&buffer, range, cx)
6860                            });
6861                            let code_actions = cx.background().spawn(async move {
6862                                code_actions.await.map_err(|err| {
6863                                    anyhow!("code actions request failed: {:?}", err)
6864                                })
6865                            });
6866                            if rng.lock().gen_bool(0.3) {
6867                                log::info!("{}: detaching code actions request", guest_username);
6868                                cx.update(|cx| code_actions.detach_and_log_err(cx));
6869                            } else {
6870                                code_actions.await?;
6871                            }
6872                        }
6873                        30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => {
6874                            let (requested_version, save) = buffer.update(cx, |buffer, cx| {
6875                                log::info!(
6876                                    "{}: saving buffer {} ({:?})",
6877                                    guest_username,
6878                                    buffer.remote_id(),
6879                                    buffer.file().unwrap().full_path(cx)
6880                                );
6881                                (buffer.version(), buffer.save(cx))
6882                            });
6883                            let save = cx.background().spawn(async move {
6884                                let (saved_version, _) = save
6885                                    .await
6886                                    .map_err(|err| anyhow!("save request failed: {:?}", err))?;
6887                                assert!(saved_version.observed_all(&requested_version));
6888                                Ok::<_, anyhow::Error>(())
6889                            });
6890                            if rng.lock().gen_bool(0.3) {
6891                                log::info!("{}: detaching save request", guest_username);
6892                                cx.update(|cx| save.detach_and_log_err(cx));
6893                            } else {
6894                                save.await?;
6895                            }
6896                        }
6897                        40..=44 => {
6898                            let prepare_rename = project.update(cx, |project, cx| {
6899                                log::info!(
6900                                    "{}: preparing rename for buffer {} ({:?})",
6901                                    guest_username,
6902                                    buffer.read(cx).remote_id(),
6903                                    buffer.read(cx).file().unwrap().full_path(cx)
6904                                );
6905                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6906                                project.prepare_rename(buffer, offset, cx)
6907                            });
6908                            let prepare_rename = cx.background().spawn(async move {
6909                                prepare_rename.await.map_err(|err| {
6910                                    anyhow!("prepare rename request failed: {:?}", err)
6911                                })
6912                            });
6913                            if rng.lock().gen_bool(0.3) {
6914                                log::info!("{}: detaching prepare rename request", guest_username);
6915                                cx.update(|cx| prepare_rename.detach_and_log_err(cx));
6916                            } else {
6917                                prepare_rename.await?;
6918                            }
6919                        }
6920                        45..=49 => {
6921                            let definitions = project.update(cx, |project, cx| {
6922                                log::info!(
6923                                    "{}: requesting definitions for buffer {} ({:?})",
6924                                    guest_username,
6925                                    buffer.read(cx).remote_id(),
6926                                    buffer.read(cx).file().unwrap().full_path(cx)
6927                                );
6928                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6929                                project.definition(&buffer, offset, cx)
6930                            });
6931                            let definitions = cx.background().spawn(async move {
6932                                definitions
6933                                    .await
6934                                    .map_err(|err| anyhow!("definitions request failed: {:?}", err))
6935                            });
6936                            if rng.lock().gen_bool(0.3) {
6937                                log::info!("{}: detaching definitions request", guest_username);
6938                                cx.update(|cx| definitions.detach_and_log_err(cx));
6939                            } else {
6940                                client
6941                                    .buffers
6942                                    .extend(definitions.await?.into_iter().map(|loc| loc.buffer));
6943                            }
6944                        }
6945                        50..=54 => {
6946                            let highlights = project.update(cx, |project, cx| {
6947                                log::info!(
6948                                    "{}: requesting highlights for buffer {} ({:?})",
6949                                    guest_username,
6950                                    buffer.read(cx).remote_id(),
6951                                    buffer.read(cx).file().unwrap().full_path(cx)
6952                                );
6953                                let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
6954                                project.document_highlights(&buffer, offset, cx)
6955                            });
6956                            let highlights = cx.background().spawn(async move {
6957                                highlights
6958                                    .await
6959                                    .map_err(|err| anyhow!("highlights request failed: {:?}", err))
6960                            });
6961                            if rng.lock().gen_bool(0.3) {
6962                                log::info!("{}: detaching highlights request", guest_username);
6963                                cx.update(|cx| highlights.detach_and_log_err(cx));
6964                            } else {
6965                                highlights.await?;
6966                            }
6967                        }
6968                        55..=59 => {
6969                            let search = project.update(cx, |project, cx| {
6970                                let query = rng.lock().gen_range('a'..='z');
6971                                log::info!("{}: project-wide search {:?}", guest_username, query);
6972                                project.search(SearchQuery::text(query, false, false), cx)
6973                            });
6974                            let search = cx.background().spawn(async move {
6975                                search
6976                                    .await
6977                                    .map_err(|err| anyhow!("search request failed: {:?}", err))
6978                            });
6979                            if rng.lock().gen_bool(0.3) {
6980                                log::info!("{}: detaching search request", guest_username);
6981                                cx.update(|cx| search.detach_and_log_err(cx));
6982                            } else {
6983                                client.buffers.extend(search.await?.into_keys());
6984                            }
6985                        }
6986                        60..=69 => {
6987                            let worktree = project
6988                                .read_with(cx, |project, cx| {
6989                                    project
6990                                        .worktrees(&cx)
6991                                        .filter(|worktree| {
6992                                            let worktree = worktree.read(cx);
6993                                            worktree.is_visible()
6994                                                && worktree.entries(false).any(|e| e.is_file())
6995                                                && worktree
6996                                                    .root_entry()
6997                                                    .map_or(false, |e| e.is_dir())
6998                                        })
6999                                        .choose(&mut *rng.lock())
7000                                })
7001                                .unwrap();
7002                            let (worktree_id, worktree_root_name) = worktree
7003                                .read_with(cx, |worktree, _| {
7004                                    (worktree.id(), worktree.root_name().to_string())
7005                                });
7006
7007                            let mut new_name = String::new();
7008                            for _ in 0..10 {
7009                                let letter = rng.lock().gen_range('a'..='z');
7010                                new_name.push(letter);
7011                            }
7012                            let mut new_path = PathBuf::new();
7013                            new_path.push(new_name);
7014                            new_path.set_extension("rs");
7015                            log::info!(
7016                                "{}: creating {:?} in worktree {} ({})",
7017                                guest_username,
7018                                new_path,
7019                                worktree_id,
7020                                worktree_root_name,
7021                            );
7022                            project
7023                                .update(cx, |project, cx| {
7024                                    project.create_entry((worktree_id, new_path), false, cx)
7025                                })
7026                                .unwrap()
7027                                .await?;
7028                        }
7029                        _ => {
7030                            buffer.update(cx, |buffer, cx| {
7031                                log::info!(
7032                                    "{}: updating buffer {} ({:?})",
7033                                    guest_username,
7034                                    buffer.remote_id(),
7035                                    buffer.file().unwrap().full_path(cx)
7036                                );
7037                                if rng.lock().gen_bool(0.7) {
7038                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx);
7039                                } else {
7040                                    buffer.randomly_undo_redo(&mut *rng.lock(), cx);
7041                                }
7042                            });
7043                        }
7044                    }
7045                    cx.background().simulate_random_delay().await;
7046                }
7047                Ok(())
7048            }
7049
7050            let result = simulate_guest_internal(
7051                &mut self,
7052                &guest_username,
7053                project.clone(),
7054                op_start_signal,
7055                rng,
7056                &mut cx,
7057            )
7058            .await;
7059            log::info!("{}: done", guest_username);
7060
7061            self.project = Some(project);
7062            (self, cx, result.err())
7063        }
7064    }
7065
7066    impl Drop for TestClient {
7067        fn drop(&mut self) {
7068            self.client.tear_down();
7069        }
7070    }
7071
7072    impl Executor for Arc<gpui::executor::Background> {
7073        type Sleep = gpui::executor::Timer;
7074
7075        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
7076            self.spawn(future).detach();
7077        }
7078
7079        fn sleep(&self, duration: Duration) -> Self::Sleep {
7080            self.as_ref().timer(duration)
7081        }
7082    }
7083
7084    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
7085        channel
7086            .messages()
7087            .cursor::<()>()
7088            .map(|m| {
7089                (
7090                    m.sender.github_login.clone(),
7091                    m.body.clone(),
7092                    m.is_pending(),
7093                )
7094            })
7095            .collect()
7096    }
7097
7098    struct EmptyView;
7099
7100    impl gpui::Entity for EmptyView {
7101        type Event = ();
7102    }
7103
7104    impl gpui::View for EmptyView {
7105        fn ui_name() -> &'static str {
7106            "empty view"
7107        }
7108
7109        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
7110            gpui::Element::boxed(gpui::elements::Empty)
7111        }
7112    }
7113}