rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  81            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  82            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  83            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  84            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  85            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  86            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  87            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  88            .add_request_handler(
  89                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  90            )
  91            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
  92            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
  93            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
  94            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
  95            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
  96            .add_message_handler(Server::close_buffer)
  97            .add_request_handler(Server::update_buffer)
  98            .add_message_handler(Server::update_buffer_file)
  99            .add_message_handler(Server::buffer_reloaded)
 100            .add_message_handler(Server::buffer_saved)
 101            .add_request_handler(Server::save_buffer)
 102            .add_request_handler(Server::get_channels)
 103            .add_request_handler(Server::get_users)
 104            .add_request_handler(Server::join_channel)
 105            .add_message_handler(Server::leave_channel)
 106            .add_request_handler(Server::send_channel_message)
 107            .add_request_handler(Server::get_channel_messages);
 108
 109        Arc::new(server)
 110    }
 111
 112    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 113    where
 114        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 115        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 116        M: EnvelopedMessage,
 117    {
 118        let prev_handler = self.handlers.insert(
 119            TypeId::of::<M>(),
 120            Box::new(move |server, envelope| {
 121                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 122                (handler)(server, *envelope).boxed()
 123            }),
 124        );
 125        if prev_handler.is_some() {
 126            panic!("registered a handler for the same message twice");
 127        }
 128        self
 129    }
 130
 131    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 132    where
 133        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 134        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 135        M: RequestMessage,
 136    {
 137        self.add_message_handler(move |server, envelope| {
 138            let receipt = envelope.receipt();
 139            let response = (handler)(server.clone(), envelope);
 140            async move {
 141                match response.await {
 142                    Ok(response) => {
 143                        server.peer.respond(receipt, response)?;
 144                        Ok(())
 145                    }
 146                    Err(error) => {
 147                        server.peer.respond_with_error(
 148                            receipt,
 149                            proto::Error {
 150                                message: error.to_string(),
 151                            },
 152                        )?;
 153                        Err(error)
 154                    }
 155                }
 156            }
 157        })
 158    }
 159
 160    pub fn handle_connection<E: Executor>(
 161        self: &Arc<Self>,
 162        connection: Connection,
 163        addr: String,
 164        user_id: UserId,
 165        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 166        executor: E,
 167    ) -> impl Future<Output = ()> {
 168        let mut this = self.clone();
 169        async move {
 170            let (connection_id, handle_io, mut incoming_rx) =
 171                this.peer.add_connection(connection).await;
 172
 173            if let Some(send_connection_id) = send_connection_id.as_mut() {
 174                let _ = send_connection_id.send(connection_id).await;
 175            }
 176
 177            this.state_mut().add_connection(connection_id, user_id);
 178            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 179                log::error!("error updating contacts for {:?}: {}", user_id, err);
 180            }
 181
 182            let handle_io = handle_io.fuse();
 183            futures::pin_mut!(handle_io);
 184            loop {
 185                let next_message = incoming_rx.next().fuse();
 186                futures::pin_mut!(next_message);
 187                futures::select_biased! {
 188                    result = handle_io => {
 189                        if let Err(err) = result {
 190                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 191                        }
 192                        break;
 193                    }
 194                    message = next_message => {
 195                        if let Some(message) = message {
 196                            let start_time = Instant::now();
 197                            let type_name = message.payload_type_name();
 198                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 199                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 200                                let notifications = this.notifications.clone();
 201                                let is_background = message.is_background();
 202                                let handle_message = (handler)(this.clone(), message);
 203                                let handle_message = async move {
 204                                    if let Err(err) = handle_message.await {
 205                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 206                                    } else {
 207                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 208                                    }
 209                                    if let Some(mut notifications) = notifications {
 210                                        let _ = notifications.send(()).await;
 211                                    }
 212                                };
 213                                if is_background {
 214                                    executor.spawn_detached(handle_message);
 215                                } else {
 216                                    handle_message.await;
 217                                }
 218                            } else {
 219                                log::warn!("unhandled message: {}", type_name);
 220                            }
 221                        } else {
 222                            log::info!("rpc connection closed {:?}", addr);
 223                            break;
 224                        }
 225                    }
 226                }
 227            }
 228
 229            if let Err(err) = this.sign_out(connection_id).await {
 230                log::error!("error signing out connection {:?} - {:?}", addr, err);
 231            }
 232        }
 233    }
 234
 235    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 236        self.peer.disconnect(connection_id);
 237        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 238
 239        for (project_id, project) in removed_connection.hosted_projects {
 240            if let Some(share) = project.share {
 241                broadcast(
 242                    connection_id,
 243                    share.guests.keys().copied().collect(),
 244                    |conn_id| {
 245                        self.peer
 246                            .send(conn_id, proto::UnshareProject { project_id })
 247                    },
 248                )?;
 249            }
 250        }
 251
 252        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 253            broadcast(connection_id, peer_ids, |conn_id| {
 254                self.peer.send(
 255                    conn_id,
 256                    proto::RemoveProjectCollaborator {
 257                        project_id,
 258                        peer_id: connection_id.0,
 259                    },
 260                )
 261            })?;
 262        }
 263
 264        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 265        Ok(())
 266    }
 267
 268    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 269        Ok(proto::Ack {})
 270    }
 271
 272    async fn register_project(
 273        mut self: Arc<Server>,
 274        request: TypedEnvelope<proto::RegisterProject>,
 275    ) -> tide::Result<proto::RegisterProjectResponse> {
 276        let project_id = {
 277            let mut state = self.state_mut();
 278            let user_id = state.user_id_for_connection(request.sender_id)?;
 279            state.register_project(request.sender_id, user_id)
 280        };
 281        Ok(proto::RegisterProjectResponse { project_id })
 282    }
 283
 284    async fn unregister_project(
 285        mut self: Arc<Server>,
 286        request: TypedEnvelope<proto::UnregisterProject>,
 287    ) -> tide::Result<()> {
 288        let project = self
 289            .state_mut()
 290            .unregister_project(request.payload.project_id, request.sender_id)?;
 291        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 292        Ok(())
 293    }
 294
 295    async fn share_project(
 296        mut self: Arc<Server>,
 297        request: TypedEnvelope<proto::ShareProject>,
 298    ) -> tide::Result<proto::Ack> {
 299        self.state_mut()
 300            .share_project(request.payload.project_id, request.sender_id);
 301        Ok(proto::Ack {})
 302    }
 303
 304    async fn unshare_project(
 305        mut self: Arc<Server>,
 306        request: TypedEnvelope<proto::UnshareProject>,
 307    ) -> tide::Result<()> {
 308        let project_id = request.payload.project_id;
 309        let project = self
 310            .state_mut()
 311            .unshare_project(project_id, request.sender_id)?;
 312
 313        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 314            self.peer
 315                .send(conn_id, proto::UnshareProject { project_id })
 316        })?;
 317        self.update_contacts_for_users(&project.authorized_user_ids)?;
 318        Ok(())
 319    }
 320
 321    async fn join_project(
 322        mut self: Arc<Server>,
 323        request: TypedEnvelope<proto::JoinProject>,
 324    ) -> tide::Result<proto::JoinProjectResponse> {
 325        let project_id = request.payload.project_id;
 326
 327        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 328        let (response, connection_ids, contact_user_ids) = self
 329            .state_mut()
 330            .join_project(request.sender_id, user_id, project_id)
 331            .and_then(|joined| {
 332                let share = joined.project.share()?;
 333                let peer_count = share.guests.len();
 334                let mut collaborators = Vec::with_capacity(peer_count);
 335                collaborators.push(proto::Collaborator {
 336                    peer_id: joined.project.host_connection_id.0,
 337                    replica_id: 0,
 338                    user_id: joined.project.host_user_id.to_proto(),
 339                });
 340                let worktrees = share
 341                    .worktrees
 342                    .iter()
 343                    .filter_map(|(id, shared_worktree)| {
 344                        let worktree = joined.project.worktrees.get(&id)?;
 345                        Some(proto::Worktree {
 346                            id: *id,
 347                            root_name: worktree.root_name.clone(),
 348                            entries: shared_worktree.entries.values().cloned().collect(),
 349                            diagnostic_summaries: shared_worktree
 350                                .diagnostic_summaries
 351                                .values()
 352                                .cloned()
 353                                .collect(),
 354                            weak: worktree.weak,
 355                        })
 356                    })
 357                    .collect();
 358                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 359                    if *peer_conn_id != request.sender_id {
 360                        collaborators.push(proto::Collaborator {
 361                            peer_id: peer_conn_id.0,
 362                            replica_id: *peer_replica_id as u32,
 363                            user_id: peer_user_id.to_proto(),
 364                        });
 365                    }
 366                }
 367                let response = proto::JoinProjectResponse {
 368                    worktrees,
 369                    replica_id: joined.replica_id as u32,
 370                    collaborators,
 371                };
 372                let connection_ids = joined.project.connection_ids();
 373                let contact_user_ids = joined.project.authorized_user_ids();
 374                Ok((response, connection_ids, contact_user_ids))
 375            })?;
 376
 377        broadcast(request.sender_id, connection_ids, |conn_id| {
 378            self.peer.send(
 379                conn_id,
 380                proto::AddProjectCollaborator {
 381                    project_id,
 382                    collaborator: Some(proto::Collaborator {
 383                        peer_id: request.sender_id.0,
 384                        replica_id: response.replica_id,
 385                        user_id: user_id.to_proto(),
 386                    }),
 387                },
 388            )
 389        })?;
 390        self.update_contacts_for_users(&contact_user_ids)?;
 391        Ok(response)
 392    }
 393
 394    async fn leave_project(
 395        mut self: Arc<Server>,
 396        request: TypedEnvelope<proto::LeaveProject>,
 397    ) -> tide::Result<()> {
 398        let sender_id = request.sender_id;
 399        let project_id = request.payload.project_id;
 400        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 401
 402        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 403            self.peer.send(
 404                conn_id,
 405                proto::RemoveProjectCollaborator {
 406                    project_id,
 407                    peer_id: sender_id.0,
 408                },
 409            )
 410        })?;
 411        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 412
 413        Ok(())
 414    }
 415
 416    async fn register_worktree(
 417        mut self: Arc<Server>,
 418        request: TypedEnvelope<proto::RegisterWorktree>,
 419    ) -> tide::Result<proto::Ack> {
 420        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 421
 422        let mut contact_user_ids = HashSet::default();
 423        contact_user_ids.insert(host_user_id);
 424        for github_login in &request.payload.authorized_logins {
 425            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 426            contact_user_ids.insert(contact_user_id);
 427        }
 428
 429        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 430        let guest_connection_ids;
 431        {
 432            let mut state = self.state_mut();
 433            guest_connection_ids = state
 434                .read_project(request.payload.project_id, request.sender_id)?
 435                .guest_connection_ids();
 436            state.register_worktree(
 437                request.payload.project_id,
 438                request.payload.worktree_id,
 439                request.sender_id,
 440                Worktree {
 441                    authorized_user_ids: contact_user_ids.clone(),
 442                    root_name: request.payload.root_name.clone(),
 443                    weak: request.payload.weak,
 444                },
 445            )?;
 446        }
 447        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 448            self.peer
 449                .forward_send(request.sender_id, connection_id, request.payload.clone())
 450        })?;
 451        self.update_contacts_for_users(&contact_user_ids)?;
 452        Ok(proto::Ack {})
 453    }
 454
 455    async fn unregister_worktree(
 456        mut self: Arc<Server>,
 457        request: TypedEnvelope<proto::UnregisterWorktree>,
 458    ) -> tide::Result<()> {
 459        let project_id = request.payload.project_id;
 460        let worktree_id = request.payload.worktree_id;
 461        let (worktree, guest_connection_ids) =
 462            self.state_mut()
 463                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 464        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 465            self.peer.send(
 466                conn_id,
 467                proto::UnregisterWorktree {
 468                    project_id,
 469                    worktree_id,
 470                },
 471            )
 472        })?;
 473        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 474        Ok(())
 475    }
 476
 477    async fn update_worktree(
 478        mut self: Arc<Server>,
 479        request: TypedEnvelope<proto::UpdateWorktree>,
 480    ) -> tide::Result<proto::Ack> {
 481        let connection_ids = self.state_mut().update_worktree(
 482            request.sender_id,
 483            request.payload.project_id,
 484            request.payload.worktree_id,
 485            &request.payload.removed_entries,
 486            &request.payload.updated_entries,
 487        )?;
 488
 489        broadcast(request.sender_id, connection_ids, |connection_id| {
 490            self.peer
 491                .forward_send(request.sender_id, connection_id, request.payload.clone())
 492        })?;
 493
 494        Ok(proto::Ack {})
 495    }
 496
 497    async fn update_diagnostic_summary(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 500    ) -> tide::Result<()> {
 501        let summary = request
 502            .payload
 503            .summary
 504            .clone()
 505            .ok_or_else(|| anyhow!("invalid summary"))?;
 506        let receiver_ids = self.state_mut().update_diagnostic_summary(
 507            request.payload.project_id,
 508            request.payload.worktree_id,
 509            request.sender_id,
 510            summary,
 511        )?;
 512
 513        broadcast(request.sender_id, receiver_ids, |connection_id| {
 514            self.peer
 515                .forward_send(request.sender_id, connection_id, request.payload.clone())
 516        })?;
 517        Ok(())
 518    }
 519
 520    async fn disk_based_diagnostics_updating(
 521        self: Arc<Server>,
 522        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 523    ) -> tide::Result<()> {
 524        let receiver_ids = self
 525            .state()
 526            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 527        broadcast(request.sender_id, receiver_ids, |connection_id| {
 528            self.peer
 529                .forward_send(request.sender_id, connection_id, request.payload.clone())
 530        })?;
 531        Ok(())
 532    }
 533
 534    async fn disk_based_diagnostics_updated(
 535        self: Arc<Server>,
 536        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 537    ) -> tide::Result<()> {
 538        let receiver_ids = self
 539            .state()
 540            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 541        broadcast(request.sender_id, receiver_ids, |connection_id| {
 542            self.peer
 543                .forward_send(request.sender_id, connection_id, request.payload.clone())
 544        })?;
 545        Ok(())
 546    }
 547
 548    async fn forward_project_request<T>(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<T>,
 551    ) -> tide::Result<T::Response>
 552    where
 553        T: EntityMessage + RequestMessage,
 554    {
 555        let host_connection_id = self
 556            .state()
 557            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 558            .host_connection_id;
 559        Ok(self
 560            .peer
 561            .forward_request(request.sender_id, host_connection_id, request.payload)
 562            .await?)
 563    }
 564
 565    async fn close_buffer(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::CloseBuffer>,
 568    ) -> tide::Result<()> {
 569        let host_connection_id = self
 570            .state()
 571            .read_project(request.payload.project_id, request.sender_id)?
 572            .host_connection_id;
 573        self.peer
 574            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 575        Ok(())
 576    }
 577
 578    async fn save_buffer(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::SaveBuffer>,
 581    ) -> tide::Result<proto::BufferSaved> {
 582        let host;
 583        let mut guests;
 584        {
 585            let state = self.state();
 586            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 587            host = project.host_connection_id;
 588            guests = project.guest_connection_ids()
 589        }
 590
 591        let response = self
 592            .peer
 593            .forward_request(request.sender_id, host, request.payload.clone())
 594            .await?;
 595
 596        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 597        broadcast(host, guests, |conn_id| {
 598            self.peer.forward_send(host, conn_id, response.clone())
 599        })?;
 600
 601        Ok(response)
 602    }
 603
 604    async fn update_buffer(
 605        self: Arc<Server>,
 606        request: TypedEnvelope<proto::UpdateBuffer>,
 607    ) -> tide::Result<proto::Ack> {
 608        let receiver_ids = self
 609            .state()
 610            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 611        broadcast(request.sender_id, receiver_ids, |connection_id| {
 612            self.peer
 613                .forward_send(request.sender_id, connection_id, request.payload.clone())
 614        })?;
 615        Ok(proto::Ack {})
 616    }
 617
 618    async fn update_buffer_file(
 619        self: Arc<Server>,
 620        request: TypedEnvelope<proto::UpdateBufferFile>,
 621    ) -> tide::Result<()> {
 622        let receiver_ids = self
 623            .state()
 624            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 625        broadcast(request.sender_id, receiver_ids, |connection_id| {
 626            self.peer
 627                .forward_send(request.sender_id, connection_id, request.payload.clone())
 628        })?;
 629        Ok(())
 630    }
 631
 632    async fn buffer_reloaded(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::BufferReloaded>,
 635    ) -> tide::Result<()> {
 636        let receiver_ids = self
 637            .state()
 638            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 639        broadcast(request.sender_id, receiver_ids, |connection_id| {
 640            self.peer
 641                .forward_send(request.sender_id, connection_id, request.payload.clone())
 642        })?;
 643        Ok(())
 644    }
 645
 646    async fn buffer_saved(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::BufferSaved>,
 649    ) -> tide::Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 653        broadcast(request.sender_id, receiver_ids, |connection_id| {
 654            self.peer
 655                .forward_send(request.sender_id, connection_id, request.payload.clone())
 656        })?;
 657        Ok(())
 658    }
 659
 660    async fn get_channels(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::GetChannels>,
 663    ) -> tide::Result<proto::GetChannelsResponse> {
 664        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 665        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 666        Ok(proto::GetChannelsResponse {
 667            channels: channels
 668                .into_iter()
 669                .map(|chan| proto::Channel {
 670                    id: chan.id.to_proto(),
 671                    name: chan.name,
 672                })
 673                .collect(),
 674        })
 675    }
 676
 677    async fn get_users(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::GetUsers>,
 680    ) -> tide::Result<proto::GetUsersResponse> {
 681        let user_ids = request
 682            .payload
 683            .user_ids
 684            .into_iter()
 685            .map(UserId::from_proto)
 686            .collect();
 687        let users = self
 688            .app_state
 689            .db
 690            .get_users_by_ids(user_ids)
 691            .await?
 692            .into_iter()
 693            .map(|user| proto::User {
 694                id: user.id.to_proto(),
 695                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 696                github_login: user.github_login,
 697            })
 698            .collect();
 699        Ok(proto::GetUsersResponse { users })
 700    }
 701
 702    fn update_contacts_for_users<'a>(
 703        self: &Arc<Server>,
 704        user_ids: impl IntoIterator<Item = &'a UserId>,
 705    ) -> anyhow::Result<()> {
 706        let mut result = Ok(());
 707        let state = self.state();
 708        for user_id in user_ids {
 709            let contacts = state.contacts_for_user(*user_id);
 710            for connection_id in state.connection_ids_for_user(*user_id) {
 711                if let Err(error) = self.peer.send(
 712                    connection_id,
 713                    proto::UpdateContacts {
 714                        contacts: contacts.clone(),
 715                    },
 716                ) {
 717                    result = Err(error);
 718                }
 719            }
 720        }
 721        result
 722    }
 723
 724    async fn join_channel(
 725        mut self: Arc<Self>,
 726        request: TypedEnvelope<proto::JoinChannel>,
 727    ) -> tide::Result<proto::JoinChannelResponse> {
 728        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 729        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 730        if !self
 731            .app_state
 732            .db
 733            .can_user_access_channel(user_id, channel_id)
 734            .await?
 735        {
 736            Err(anyhow!("access denied"))?;
 737        }
 738
 739        self.state_mut().join_channel(request.sender_id, channel_id);
 740        let messages = self
 741            .app_state
 742            .db
 743            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 744            .await?
 745            .into_iter()
 746            .map(|msg| proto::ChannelMessage {
 747                id: msg.id.to_proto(),
 748                body: msg.body,
 749                timestamp: msg.sent_at.unix_timestamp() as u64,
 750                sender_id: msg.sender_id.to_proto(),
 751                nonce: Some(msg.nonce.as_u128().into()),
 752            })
 753            .collect::<Vec<_>>();
 754        Ok(proto::JoinChannelResponse {
 755            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 756            messages,
 757        })
 758    }
 759
 760    async fn leave_channel(
 761        mut self: Arc<Self>,
 762        request: TypedEnvelope<proto::LeaveChannel>,
 763    ) -> tide::Result<()> {
 764        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 765        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 766        if !self
 767            .app_state
 768            .db
 769            .can_user_access_channel(user_id, channel_id)
 770            .await?
 771        {
 772            Err(anyhow!("access denied"))?;
 773        }
 774
 775        self.state_mut()
 776            .leave_channel(request.sender_id, channel_id);
 777
 778        Ok(())
 779    }
 780
 781    async fn send_channel_message(
 782        self: Arc<Self>,
 783        request: TypedEnvelope<proto::SendChannelMessage>,
 784    ) -> tide::Result<proto::SendChannelMessageResponse> {
 785        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 786        let user_id;
 787        let connection_ids;
 788        {
 789            let state = self.state();
 790            user_id = state.user_id_for_connection(request.sender_id)?;
 791            connection_ids = state.channel_connection_ids(channel_id)?;
 792        }
 793
 794        // Validate the message body.
 795        let body = request.payload.body.trim().to_string();
 796        if body.len() > MAX_MESSAGE_LEN {
 797            return Err(anyhow!("message is too long"))?;
 798        }
 799        if body.is_empty() {
 800            return Err(anyhow!("message can't be blank"))?;
 801        }
 802
 803        let timestamp = OffsetDateTime::now_utc();
 804        let nonce = request
 805            .payload
 806            .nonce
 807            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 808
 809        let message_id = self
 810            .app_state
 811            .db
 812            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 813            .await?
 814            .to_proto();
 815        let message = proto::ChannelMessage {
 816            sender_id: user_id.to_proto(),
 817            id: message_id,
 818            body,
 819            timestamp: timestamp.unix_timestamp() as u64,
 820            nonce: Some(nonce),
 821        };
 822        broadcast(request.sender_id, connection_ids, |conn_id| {
 823            self.peer.send(
 824                conn_id,
 825                proto::ChannelMessageSent {
 826                    channel_id: channel_id.to_proto(),
 827                    message: Some(message.clone()),
 828                },
 829            )
 830        })?;
 831        Ok(proto::SendChannelMessageResponse {
 832            message: Some(message),
 833        })
 834    }
 835
 836    async fn get_channel_messages(
 837        self: Arc<Self>,
 838        request: TypedEnvelope<proto::GetChannelMessages>,
 839    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 840        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 841        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 842        if !self
 843            .app_state
 844            .db
 845            .can_user_access_channel(user_id, channel_id)
 846            .await?
 847        {
 848            Err(anyhow!("access denied"))?;
 849        }
 850
 851        let messages = self
 852            .app_state
 853            .db
 854            .get_channel_messages(
 855                channel_id,
 856                MESSAGE_COUNT_PER_PAGE,
 857                Some(MessageId::from_proto(request.payload.before_message_id)),
 858            )
 859            .await?
 860            .into_iter()
 861            .map(|msg| proto::ChannelMessage {
 862                id: msg.id.to_proto(),
 863                body: msg.body,
 864                timestamp: msg.sent_at.unix_timestamp() as u64,
 865                sender_id: msg.sender_id.to_proto(),
 866                nonce: Some(msg.nonce.as_u128().into()),
 867            })
 868            .collect::<Vec<_>>();
 869
 870        Ok(proto::GetChannelMessagesResponse {
 871            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 872            messages,
 873        })
 874    }
 875
 876    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 877        self.store.read()
 878    }
 879
 880    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 881        self.store.write()
 882    }
 883}
 884
 885impl Executor for RealExecutor {
 886    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 887        task::spawn(future);
 888    }
 889}
 890
 891fn broadcast<F>(
 892    sender_id: ConnectionId,
 893    receiver_ids: Vec<ConnectionId>,
 894    mut f: F,
 895) -> anyhow::Result<()>
 896where
 897    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 898{
 899    let mut result = Ok(());
 900    for receiver_id in receiver_ids {
 901        if receiver_id != sender_id {
 902            if let Err(error) = f(receiver_id) {
 903                if result.is_ok() {
 904                    result = Err(error);
 905                }
 906            }
 907        }
 908    }
 909    result
 910}
 911
 912pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 913    let server = Server::new(app.state().clone(), rpc.clone(), None);
 914    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 915        let server = server.clone();
 916        async move {
 917            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 918
 919            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 920            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 921            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 922            let client_protocol_version: Option<u32> = request
 923                .header("X-Zed-Protocol-Version")
 924                .and_then(|v| v.as_str().parse().ok());
 925
 926            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 927                return Ok(Response::new(StatusCode::UpgradeRequired));
 928            }
 929
 930            let header = match request.header("Sec-Websocket-Key") {
 931                Some(h) => h.as_str(),
 932                None => return Err(anyhow!("expected sec-websocket-key"))?,
 933            };
 934
 935            let user_id = process_auth_header(&request).await?;
 936
 937            let mut response = Response::new(StatusCode::SwitchingProtocols);
 938            response.insert_header(UPGRADE, "websocket");
 939            response.insert_header(CONNECTION, "Upgrade");
 940            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 941            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 942            response.insert_header("Sec-Websocket-Version", "13");
 943
 944            let http_res: &mut tide::http::Response = response.as_mut();
 945            let upgrade_receiver = http_res.recv_upgrade().await;
 946            let addr = request.remote().unwrap_or("unknown").to_string();
 947            task::spawn(async move {
 948                if let Some(stream) = upgrade_receiver.await {
 949                    server
 950                        .handle_connection(
 951                            Connection::new(
 952                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 953                            ),
 954                            addr,
 955                            user_id,
 956                            None,
 957                            RealExecutor,
 958                        )
 959                        .await;
 960                }
 961            });
 962
 963            Ok(response)
 964        }
 965    });
 966}
 967
 968fn header_contains_ignore_case<T>(
 969    request: &tide::Request<T>,
 970    header_name: HeaderName,
 971    value: &str,
 972) -> bool {
 973    request
 974        .header(header_name)
 975        .map(|h| {
 976            h.as_str()
 977                .split(',')
 978                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 979        })
 980        .unwrap_or(false)
 981}
 982
 983#[cfg(test)]
 984mod tests {
 985    use super::*;
 986    use crate::{
 987        auth,
 988        db::{tests::TestDb, UserId},
 989        github, AppState, Config,
 990    };
 991    use ::rpc::Peer;
 992    use collections::BTreeMap;
 993    use gpui::{executor, ModelHandle, TestAppContext};
 994    use parking_lot::Mutex;
 995    use postage::{sink::Sink, watch};
 996    use rand::prelude::*;
 997    use rpc::PeerId;
 998    use serde_json::json;
 999    use sqlx::types::time::OffsetDateTime;
1000    use std::{
1001        cell::Cell,
1002        env,
1003        ops::Deref,
1004        path::{Path, PathBuf},
1005        rc::Rc,
1006        sync::{
1007            atomic::{AtomicBool, Ordering::SeqCst},
1008            Arc,
1009        },
1010        time::Duration,
1011    };
1012    use zed::{
1013        client::{
1014            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1015            EstablishConnectionError, UserStore,
1016        },
1017        editor::{
1018            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1019            Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1020        },
1021        fs::{FakeFs, Fs as _},
1022        language::{
1023            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1024            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1025        },
1026        lsp,
1027        project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath},
1028        workspace::{Settings, Workspace, WorkspaceParams},
1029    };
1030
1031    #[cfg(test)]
1032    #[ctor::ctor]
1033    fn init_logger() {
1034        if std::env::var("RUST_LOG").is_ok() {
1035            env_logger::init();
1036        }
1037    }
1038
1039    #[gpui::test(iterations = 10)]
1040    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1041        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1042        let lang_registry = Arc::new(LanguageRegistry::new());
1043        let fs = FakeFs::new(cx_a.background());
1044        cx_a.foreground().forbid_parking();
1045
1046        // Connect to a server as 2 clients.
1047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1048        let client_a = server.create_client(cx_a, "user_a").await;
1049        let client_b = server.create_client(cx_b, "user_b").await;
1050
1051        // Share a project as client A
1052        fs.insert_tree(
1053            "/a",
1054            json!({
1055                ".zed.toml": r#"collaborators = ["user_b"]"#,
1056                "a.txt": "a-contents",
1057                "b.txt": "b-contents",
1058            }),
1059        )
1060        .await;
1061        let project_a = cx_a.update(|cx| {
1062            Project::local(
1063                client_a.clone(),
1064                client_a.user_store.clone(),
1065                lang_registry.clone(),
1066                fs.clone(),
1067                cx,
1068            )
1069        });
1070        let (worktree_a, _) = project_a
1071            .update(cx_a, |p, cx| {
1072                p.find_or_create_local_worktree("/a", false, cx)
1073            })
1074            .await
1075            .unwrap();
1076        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1077        worktree_a
1078            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1079            .await;
1080        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1081        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1082
1083        // Join that project as client B
1084        let project_b = Project::remote(
1085            project_id,
1086            client_b.clone(),
1087            client_b.user_store.clone(),
1088            lang_registry.clone(),
1089            fs.clone(),
1090            &mut cx_b.to_async(),
1091        )
1092        .await
1093        .unwrap();
1094
1095        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1096            assert_eq!(
1097                project
1098                    .collaborators()
1099                    .get(&client_a.peer_id)
1100                    .unwrap()
1101                    .user
1102                    .github_login,
1103                "user_a"
1104            );
1105            project.replica_id()
1106        });
1107        project_a
1108            .condition(&cx_a, |tree, _| {
1109                tree.collaborators()
1110                    .get(&client_b.peer_id)
1111                    .map_or(false, |collaborator| {
1112                        collaborator.replica_id == replica_id_b
1113                            && collaborator.user.github_login == "user_b"
1114                    })
1115            })
1116            .await;
1117
1118        // Open the same file as client B and client A.
1119        let buffer_b = project_b
1120            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1121            .await
1122            .unwrap();
1123        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1124        buffer_b.read_with(cx_b, |buf, cx| {
1125            assert_eq!(buf.read(cx).text(), "b-contents")
1126        });
1127        project_a.read_with(cx_a, |project, cx| {
1128            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1129        });
1130        let buffer_a = project_a
1131            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1132            .await
1133            .unwrap();
1134
1135        let editor_b = cx_b.add_view(window_b, |cx| {
1136            Editor::for_buffer(
1137                buffer_b,
1138                None,
1139                watch::channel_with(Settings::test(cx)).1,
1140                cx,
1141            )
1142        });
1143
1144        // TODO
1145        // // Create a selection set as client B and see that selection set as client A.
1146        // buffer_a
1147        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1148        //     .await;
1149
1150        // Edit the buffer as client B and see that edit as client A.
1151        editor_b.update(cx_b, |editor, cx| {
1152            editor.handle_input(&Input("ok, ".into()), cx)
1153        });
1154        buffer_a
1155            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1156            .await;
1157
1158        // TODO
1159        // // Remove the selection set as client B, see those selections disappear as client A.
1160        cx_b.update(move |_| drop(editor_b));
1161        // buffer_a
1162        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1163        //     .await;
1164
1165        // Dropping the client B's project removes client B from client A's collaborators.
1166        cx_b.update(move |_| drop(project_b));
1167        project_a
1168            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1169            .await;
1170    }
1171
1172    #[gpui::test(iterations = 10)]
1173    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1174        let lang_registry = Arc::new(LanguageRegistry::new());
1175        let fs = FakeFs::new(cx_a.background());
1176        cx_a.foreground().forbid_parking();
1177
1178        // Connect to a server as 2 clients.
1179        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1180        let client_a = server.create_client(cx_a, "user_a").await;
1181        let client_b = server.create_client(cx_b, "user_b").await;
1182
1183        // Share a project as client A
1184        fs.insert_tree(
1185            "/a",
1186            json!({
1187                ".zed.toml": r#"collaborators = ["user_b"]"#,
1188                "a.txt": "a-contents",
1189                "b.txt": "b-contents",
1190            }),
1191        )
1192        .await;
1193        let project_a = cx_a.update(|cx| {
1194            Project::local(
1195                client_a.clone(),
1196                client_a.user_store.clone(),
1197                lang_registry.clone(),
1198                fs.clone(),
1199                cx,
1200            )
1201        });
1202        let (worktree_a, _) = project_a
1203            .update(cx_a, |p, cx| {
1204                p.find_or_create_local_worktree("/a", false, cx)
1205            })
1206            .await
1207            .unwrap();
1208        worktree_a
1209            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1210            .await;
1211        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1212        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1213        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1214        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1215
1216        // Join that project as client B
1217        let project_b = Project::remote(
1218            project_id,
1219            client_b.clone(),
1220            client_b.user_store.clone(),
1221            lang_registry.clone(),
1222            fs.clone(),
1223            &mut cx_b.to_async(),
1224        )
1225        .await
1226        .unwrap();
1227        project_b
1228            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1229            .await
1230            .unwrap();
1231
1232        // Unshare the project as client A
1233        project_a
1234            .update(cx_a, |project, cx| project.unshare(cx))
1235            .await
1236            .unwrap();
1237        project_b
1238            .condition(cx_b, |project, _| project.is_read_only())
1239            .await;
1240        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1241        cx_b.update(|_| {
1242            drop(project_b);
1243        });
1244
1245        // Share the project again and ensure guests can still join.
1246        project_a
1247            .update(cx_a, |project, cx| project.share(cx))
1248            .await
1249            .unwrap();
1250        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1251
1252        let project_b2 = Project::remote(
1253            project_id,
1254            client_b.clone(),
1255            client_b.user_store.clone(),
1256            lang_registry.clone(),
1257            fs.clone(),
1258            &mut cx_b.to_async(),
1259        )
1260        .await
1261        .unwrap();
1262        project_b2
1263            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1264            .await
1265            .unwrap();
1266    }
1267
1268    #[gpui::test(iterations = 10)]
1269    async fn test_propagate_saves_and_fs_changes(
1270        cx_a: &mut TestAppContext,
1271        cx_b: &mut TestAppContext,
1272        cx_c: &mut TestAppContext,
1273    ) {
1274        let lang_registry = Arc::new(LanguageRegistry::new());
1275        let fs = FakeFs::new(cx_a.background());
1276        cx_a.foreground().forbid_parking();
1277
1278        // Connect to a server as 3 clients.
1279        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1280        let client_a = server.create_client(cx_a, "user_a").await;
1281        let client_b = server.create_client(cx_b, "user_b").await;
1282        let client_c = server.create_client(cx_c, "user_c").await;
1283
1284        // Share a worktree as client A.
1285        fs.insert_tree(
1286            "/a",
1287            json!({
1288                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1289                "file1": "",
1290                "file2": ""
1291            }),
1292        )
1293        .await;
1294        let project_a = cx_a.update(|cx| {
1295            Project::local(
1296                client_a.clone(),
1297                client_a.user_store.clone(),
1298                lang_registry.clone(),
1299                fs.clone(),
1300                cx,
1301            )
1302        });
1303        let (worktree_a, _) = project_a
1304            .update(cx_a, |p, cx| {
1305                p.find_or_create_local_worktree("/a", false, cx)
1306            })
1307            .await
1308            .unwrap();
1309        worktree_a
1310            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1311            .await;
1312        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1313        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1314        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1315
1316        // Join that worktree as clients B and C.
1317        let project_b = Project::remote(
1318            project_id,
1319            client_b.clone(),
1320            client_b.user_store.clone(),
1321            lang_registry.clone(),
1322            fs.clone(),
1323            &mut cx_b.to_async(),
1324        )
1325        .await
1326        .unwrap();
1327        let project_c = Project::remote(
1328            project_id,
1329            client_c.clone(),
1330            client_c.user_store.clone(),
1331            lang_registry.clone(),
1332            fs.clone(),
1333            &mut cx_c.to_async(),
1334        )
1335        .await
1336        .unwrap();
1337        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1338        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1339
1340        // Open and edit a buffer as both guests B and C.
1341        let buffer_b = project_b
1342            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1343            .await
1344            .unwrap();
1345        let buffer_c = project_c
1346            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1347            .await
1348            .unwrap();
1349        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1350        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1351
1352        // Open and edit that buffer as the host.
1353        let buffer_a = project_a
1354            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1355            .await
1356            .unwrap();
1357
1358        buffer_a
1359            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1360            .await;
1361        buffer_a.update(cx_a, |buf, cx| {
1362            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1363        });
1364
1365        // Wait for edits to propagate
1366        buffer_a
1367            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1368            .await;
1369        buffer_b
1370            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1371            .await;
1372        buffer_c
1373            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1374            .await;
1375
1376        // Edit the buffer as the host and concurrently save as guest B.
1377        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1378        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1379        save_b.await.unwrap();
1380        assert_eq!(
1381            fs.load("/a/file1".as_ref()).await.unwrap(),
1382            "hi-a, i-am-c, i-am-b, i-am-a"
1383        );
1384        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1385        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1386        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1387
1388        // Make changes on host's file system, see those changes on guest worktrees.
1389        fs.rename(
1390            "/a/file1".as_ref(),
1391            "/a/file1-renamed".as_ref(),
1392            Default::default(),
1393        )
1394        .await
1395        .unwrap();
1396
1397        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1398            .await
1399            .unwrap();
1400        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1401
1402        worktree_a
1403            .condition(&cx_a, |tree, _| {
1404                tree.paths()
1405                    .map(|p| p.to_string_lossy())
1406                    .collect::<Vec<_>>()
1407                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1408            })
1409            .await;
1410        worktree_b
1411            .condition(&cx_b, |tree, _| {
1412                tree.paths()
1413                    .map(|p| p.to_string_lossy())
1414                    .collect::<Vec<_>>()
1415                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1416            })
1417            .await;
1418        worktree_c
1419            .condition(&cx_c, |tree, _| {
1420                tree.paths()
1421                    .map(|p| p.to_string_lossy())
1422                    .collect::<Vec<_>>()
1423                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1424            })
1425            .await;
1426
1427        // Ensure buffer files are updated as well.
1428        buffer_a
1429            .condition(&cx_a, |buf, _| {
1430                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1431            })
1432            .await;
1433        buffer_b
1434            .condition(&cx_b, |buf, _| {
1435                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1436            })
1437            .await;
1438        buffer_c
1439            .condition(&cx_c, |buf, _| {
1440                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1441            })
1442            .await;
1443    }
1444
1445    #[gpui::test(iterations = 10)]
1446    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1447        cx_a.foreground().forbid_parking();
1448        let lang_registry = Arc::new(LanguageRegistry::new());
1449        let fs = FakeFs::new(cx_a.background());
1450
1451        // Connect to a server as 2 clients.
1452        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1453        let client_a = server.create_client(cx_a, "user_a").await;
1454        let client_b = server.create_client(cx_b, "user_b").await;
1455
1456        // Share a project as client A
1457        fs.insert_tree(
1458            "/dir",
1459            json!({
1460                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1461                "a.txt": "a-contents",
1462            }),
1463        )
1464        .await;
1465
1466        let project_a = cx_a.update(|cx| {
1467            Project::local(
1468                client_a.clone(),
1469                client_a.user_store.clone(),
1470                lang_registry.clone(),
1471                fs.clone(),
1472                cx,
1473            )
1474        });
1475        let (worktree_a, _) = project_a
1476            .update(cx_a, |p, cx| {
1477                p.find_or_create_local_worktree("/dir", false, cx)
1478            })
1479            .await
1480            .unwrap();
1481        worktree_a
1482            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1483            .await;
1484        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1485        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1486        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1487
1488        // Join that project as client B
1489        let project_b = Project::remote(
1490            project_id,
1491            client_b.clone(),
1492            client_b.user_store.clone(),
1493            lang_registry.clone(),
1494            fs.clone(),
1495            &mut cx_b.to_async(),
1496        )
1497        .await
1498        .unwrap();
1499
1500        // Open a buffer as client B
1501        let buffer_b = project_b
1502            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1503            .await
1504            .unwrap();
1505
1506        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1507        buffer_b.read_with(cx_b, |buf, _| {
1508            assert!(buf.is_dirty());
1509            assert!(!buf.has_conflict());
1510        });
1511
1512        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1513        buffer_b
1514            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1515            .await;
1516        buffer_b.read_with(cx_b, |buf, _| {
1517            assert!(!buf.has_conflict());
1518        });
1519
1520        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1521        buffer_b.read_with(cx_b, |buf, _| {
1522            assert!(buf.is_dirty());
1523            assert!(!buf.has_conflict());
1524        });
1525    }
1526
1527    #[gpui::test(iterations = 10)]
1528    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1529        cx_a.foreground().forbid_parking();
1530        let lang_registry = Arc::new(LanguageRegistry::new());
1531        let fs = FakeFs::new(cx_a.background());
1532
1533        // Connect to a server as 2 clients.
1534        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1535        let client_a = server.create_client(cx_a, "user_a").await;
1536        let client_b = server.create_client(cx_b, "user_b").await;
1537
1538        // Share a project as client A
1539        fs.insert_tree(
1540            "/dir",
1541            json!({
1542                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1543                "a.txt": "a-contents",
1544            }),
1545        )
1546        .await;
1547
1548        let project_a = cx_a.update(|cx| {
1549            Project::local(
1550                client_a.clone(),
1551                client_a.user_store.clone(),
1552                lang_registry.clone(),
1553                fs.clone(),
1554                cx,
1555            )
1556        });
1557        let (worktree_a, _) = project_a
1558            .update(cx_a, |p, cx| {
1559                p.find_or_create_local_worktree("/dir", false, cx)
1560            })
1561            .await
1562            .unwrap();
1563        worktree_a
1564            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1565            .await;
1566        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1567        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1568        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1569
1570        // Join that project as client B
1571        let project_b = Project::remote(
1572            project_id,
1573            client_b.clone(),
1574            client_b.user_store.clone(),
1575            lang_registry.clone(),
1576            fs.clone(),
1577            &mut cx_b.to_async(),
1578        )
1579        .await
1580        .unwrap();
1581        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1582
1583        // Open a buffer as client B
1584        let buffer_b = project_b
1585            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1586            .await
1587            .unwrap();
1588        buffer_b.read_with(cx_b, |buf, _| {
1589            assert!(!buf.is_dirty());
1590            assert!(!buf.has_conflict());
1591        });
1592
1593        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1594            .await
1595            .unwrap();
1596        buffer_b
1597            .condition(&cx_b, |buf, _| {
1598                buf.text() == "new contents" && !buf.is_dirty()
1599            })
1600            .await;
1601        buffer_b.read_with(cx_b, |buf, _| {
1602            assert!(!buf.has_conflict());
1603        });
1604    }
1605
1606    #[gpui::test(iterations = 10)]
1607    async fn test_editing_while_guest_opens_buffer(
1608        cx_a: &mut TestAppContext,
1609        cx_b: &mut TestAppContext,
1610    ) {
1611        cx_a.foreground().forbid_parking();
1612        let lang_registry = Arc::new(LanguageRegistry::new());
1613        let fs = FakeFs::new(cx_a.background());
1614
1615        // Connect to a server as 2 clients.
1616        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1617        let client_a = server.create_client(cx_a, "user_a").await;
1618        let client_b = server.create_client(cx_b, "user_b").await;
1619
1620        // Share a project as client A
1621        fs.insert_tree(
1622            "/dir",
1623            json!({
1624                ".zed.toml": r#"collaborators = ["user_b"]"#,
1625                "a.txt": "a-contents",
1626            }),
1627        )
1628        .await;
1629        let project_a = cx_a.update(|cx| {
1630            Project::local(
1631                client_a.clone(),
1632                client_a.user_store.clone(),
1633                lang_registry.clone(),
1634                fs.clone(),
1635                cx,
1636            )
1637        });
1638        let (worktree_a, _) = project_a
1639            .update(cx_a, |p, cx| {
1640                p.find_or_create_local_worktree("/dir", false, cx)
1641            })
1642            .await
1643            .unwrap();
1644        worktree_a
1645            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1646            .await;
1647        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1648        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1649        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1650
1651        // Join that project as client B
1652        let project_b = Project::remote(
1653            project_id,
1654            client_b.clone(),
1655            client_b.user_store.clone(),
1656            lang_registry.clone(),
1657            fs.clone(),
1658            &mut cx_b.to_async(),
1659        )
1660        .await
1661        .unwrap();
1662
1663        // Open a buffer as client A
1664        let buffer_a = project_a
1665            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1666            .await
1667            .unwrap();
1668
1669        // Start opening the same buffer as client B
1670        let buffer_b = cx_b
1671            .background()
1672            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1673
1674        // Edit the buffer as client A while client B is still opening it.
1675        cx_b.background().simulate_random_delay().await;
1676        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1677        cx_b.background().simulate_random_delay().await;
1678        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1679
1680        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1681        let buffer_b = buffer_b.await.unwrap();
1682        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1683    }
1684
1685    #[gpui::test(iterations = 10)]
1686    async fn test_leaving_worktree_while_opening_buffer(
1687        cx_a: &mut TestAppContext,
1688        cx_b: &mut TestAppContext,
1689    ) {
1690        cx_a.foreground().forbid_parking();
1691        let lang_registry = Arc::new(LanguageRegistry::new());
1692        let fs = FakeFs::new(cx_a.background());
1693
1694        // Connect to a server as 2 clients.
1695        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1696        let client_a = server.create_client(cx_a, "user_a").await;
1697        let client_b = server.create_client(cx_b, "user_b").await;
1698
1699        // Share a project as client A
1700        fs.insert_tree(
1701            "/dir",
1702            json!({
1703                ".zed.toml": r#"collaborators = ["user_b"]"#,
1704                "a.txt": "a-contents",
1705            }),
1706        )
1707        .await;
1708        let project_a = cx_a.update(|cx| {
1709            Project::local(
1710                client_a.clone(),
1711                client_a.user_store.clone(),
1712                lang_registry.clone(),
1713                fs.clone(),
1714                cx,
1715            )
1716        });
1717        let (worktree_a, _) = project_a
1718            .update(cx_a, |p, cx| {
1719                p.find_or_create_local_worktree("/dir", false, cx)
1720            })
1721            .await
1722            .unwrap();
1723        worktree_a
1724            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1725            .await;
1726        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1727        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1728        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1729
1730        // Join that project as client B
1731        let project_b = Project::remote(
1732            project_id,
1733            client_b.clone(),
1734            client_b.user_store.clone(),
1735            lang_registry.clone(),
1736            fs.clone(),
1737            &mut cx_b.to_async(),
1738        )
1739        .await
1740        .unwrap();
1741
1742        // See that a guest has joined as client A.
1743        project_a
1744            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1745            .await;
1746
1747        // Begin opening a buffer as client B, but leave the project before the open completes.
1748        let buffer_b = cx_b
1749            .background()
1750            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1751        cx_b.update(|_| drop(project_b));
1752        drop(buffer_b);
1753
1754        // See that the guest has left.
1755        project_a
1756            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1757            .await;
1758    }
1759
1760    #[gpui::test(iterations = 10)]
1761    async fn test_peer_disconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1762        cx_a.foreground().forbid_parking();
1763        let lang_registry = Arc::new(LanguageRegistry::new());
1764        let fs = FakeFs::new(cx_a.background());
1765
1766        // Connect to a server as 2 clients.
1767        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1768        let client_a = server.create_client(cx_a, "user_a").await;
1769        let client_b = server.create_client(cx_b, "user_b").await;
1770
1771        // Share a project as client A
1772        fs.insert_tree(
1773            "/a",
1774            json!({
1775                ".zed.toml": r#"collaborators = ["user_b"]"#,
1776                "a.txt": "a-contents",
1777                "b.txt": "b-contents",
1778            }),
1779        )
1780        .await;
1781        let project_a = cx_a.update(|cx| {
1782            Project::local(
1783                client_a.clone(),
1784                client_a.user_store.clone(),
1785                lang_registry.clone(),
1786                fs.clone(),
1787                cx,
1788            )
1789        });
1790        let (worktree_a, _) = project_a
1791            .update(cx_a, |p, cx| {
1792                p.find_or_create_local_worktree("/a", false, cx)
1793            })
1794            .await
1795            .unwrap();
1796        worktree_a
1797            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1798            .await;
1799        let project_id = project_a
1800            .update(cx_a, |project, _| project.next_remote_id())
1801            .await;
1802        project_a
1803            .update(cx_a, |project, cx| project.share(cx))
1804            .await
1805            .unwrap();
1806
1807        // Join that project as client B
1808        let _project_b = Project::remote(
1809            project_id,
1810            client_b.clone(),
1811            client_b.user_store.clone(),
1812            lang_registry.clone(),
1813            fs.clone(),
1814            &mut cx_b.to_async(),
1815        )
1816        .await
1817        .unwrap();
1818
1819        // See that a guest has joined as client A.
1820        project_a
1821            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1822            .await;
1823
1824        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1825        client_b.disconnect(&cx_b.to_async()).unwrap();
1826        project_a
1827            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1828            .await;
1829    }
1830
1831    #[gpui::test(iterations = 10)]
1832    async fn test_collaborating_with_diagnostics(
1833        cx_a: &mut TestAppContext,
1834        cx_b: &mut TestAppContext,
1835    ) {
1836        cx_a.foreground().forbid_parking();
1837        let mut lang_registry = Arc::new(LanguageRegistry::new());
1838        let fs = FakeFs::new(cx_a.background());
1839
1840        // Set up a fake language server.
1841        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1842        Arc::get_mut(&mut lang_registry)
1843            .unwrap()
1844            .add(Arc::new(Language::new(
1845                LanguageConfig {
1846                    name: "Rust".into(),
1847                    path_suffixes: vec!["rs".to_string()],
1848                    language_server: Some(language_server_config),
1849                    ..Default::default()
1850                },
1851                Some(tree_sitter_rust::language()),
1852            )));
1853
1854        // Connect to a server as 2 clients.
1855        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1856        let client_a = server.create_client(cx_a, "user_a").await;
1857        let client_b = server.create_client(cx_b, "user_b").await;
1858
1859        // Share a project as client A
1860        fs.insert_tree(
1861            "/a",
1862            json!({
1863                ".zed.toml": r#"collaborators = ["user_b"]"#,
1864                "a.rs": "let one = two",
1865                "other.rs": "",
1866            }),
1867        )
1868        .await;
1869        let project_a = cx_a.update(|cx| {
1870            Project::local(
1871                client_a.clone(),
1872                client_a.user_store.clone(),
1873                lang_registry.clone(),
1874                fs.clone(),
1875                cx,
1876            )
1877        });
1878        let (worktree_a, _) = project_a
1879            .update(cx_a, |p, cx| {
1880                p.find_or_create_local_worktree("/a", false, cx)
1881            })
1882            .await
1883            .unwrap();
1884        worktree_a
1885            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1886            .await;
1887        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1888        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1889        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1890
1891        // Cause the language server to start.
1892        let _ = cx_a
1893            .background()
1894            .spawn(project_a.update(cx_a, |project, cx| {
1895                project.open_buffer(
1896                    ProjectPath {
1897                        worktree_id,
1898                        path: Path::new("other.rs").into(),
1899                    },
1900                    cx,
1901                )
1902            }))
1903            .await
1904            .unwrap();
1905
1906        // Simulate a language server reporting errors for a file.
1907        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1908        fake_language_server
1909            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1910                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1911                version: None,
1912                diagnostics: vec![lsp::Diagnostic {
1913                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1914                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1915                    message: "message 1".to_string(),
1916                    ..Default::default()
1917                }],
1918            })
1919            .await;
1920
1921        // Wait for server to see the diagnostics update.
1922        server
1923            .condition(|store| {
1924                let worktree = store
1925                    .project(project_id)
1926                    .unwrap()
1927                    .share
1928                    .as_ref()
1929                    .unwrap()
1930                    .worktrees
1931                    .get(&worktree_id.to_proto())
1932                    .unwrap();
1933
1934                !worktree.diagnostic_summaries.is_empty()
1935            })
1936            .await;
1937
1938        // Join the worktree as client B.
1939        let project_b = Project::remote(
1940            project_id,
1941            client_b.clone(),
1942            client_b.user_store.clone(),
1943            lang_registry.clone(),
1944            fs.clone(),
1945            &mut cx_b.to_async(),
1946        )
1947        .await
1948        .unwrap();
1949
1950        project_b.read_with(cx_b, |project, cx| {
1951            assert_eq!(
1952                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1953                &[(
1954                    ProjectPath {
1955                        worktree_id,
1956                        path: Arc::from(Path::new("a.rs")),
1957                    },
1958                    DiagnosticSummary {
1959                        error_count: 1,
1960                        warning_count: 0,
1961                        ..Default::default()
1962                    },
1963                )]
1964            )
1965        });
1966
1967        // Simulate a language server reporting more errors for a file.
1968        fake_language_server
1969            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1970                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1971                version: None,
1972                diagnostics: vec![
1973                    lsp::Diagnostic {
1974                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1975                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1976                        message: "message 1".to_string(),
1977                        ..Default::default()
1978                    },
1979                    lsp::Diagnostic {
1980                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1981                        range: lsp::Range::new(
1982                            lsp::Position::new(0, 10),
1983                            lsp::Position::new(0, 13),
1984                        ),
1985                        message: "message 2".to_string(),
1986                        ..Default::default()
1987                    },
1988                ],
1989            })
1990            .await;
1991
1992        // Client b gets the updated summaries
1993        project_b
1994            .condition(&cx_b, |project, cx| {
1995                project.diagnostic_summaries(cx).collect::<Vec<_>>()
1996                    == &[(
1997                        ProjectPath {
1998                            worktree_id,
1999                            path: Arc::from(Path::new("a.rs")),
2000                        },
2001                        DiagnosticSummary {
2002                            error_count: 1,
2003                            warning_count: 1,
2004                            ..Default::default()
2005                        },
2006                    )]
2007            })
2008            .await;
2009
2010        // Open the file with the errors on client B. They should be present.
2011        let buffer_b = cx_b
2012            .background()
2013            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2014            .await
2015            .unwrap();
2016
2017        buffer_b.read_with(cx_b, |buffer, _| {
2018            assert_eq!(
2019                buffer
2020                    .snapshot()
2021                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2022                    .map(|entry| entry)
2023                    .collect::<Vec<_>>(),
2024                &[
2025                    DiagnosticEntry {
2026                        range: Point::new(0, 4)..Point::new(0, 7),
2027                        diagnostic: Diagnostic {
2028                            group_id: 0,
2029                            message: "message 1".to_string(),
2030                            severity: lsp::DiagnosticSeverity::ERROR,
2031                            is_primary: true,
2032                            ..Default::default()
2033                        }
2034                    },
2035                    DiagnosticEntry {
2036                        range: Point::new(0, 10)..Point::new(0, 13),
2037                        diagnostic: Diagnostic {
2038                            group_id: 1,
2039                            severity: lsp::DiagnosticSeverity::WARNING,
2040                            message: "message 2".to_string(),
2041                            is_primary: true,
2042                            ..Default::default()
2043                        }
2044                    }
2045                ]
2046            );
2047        });
2048    }
2049
2050    #[gpui::test(iterations = 10)]
2051    async fn test_collaborating_with_completion(
2052        cx_a: &mut TestAppContext,
2053        cx_b: &mut TestAppContext,
2054    ) {
2055        cx_a.foreground().forbid_parking();
2056        let mut lang_registry = Arc::new(LanguageRegistry::new());
2057        let fs = FakeFs::new(cx_a.background());
2058
2059        // Set up a fake language server.
2060        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2061        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2062            completion_provider: Some(lsp::CompletionOptions {
2063                trigger_characters: Some(vec![".".to_string()]),
2064                ..Default::default()
2065            }),
2066            ..Default::default()
2067        });
2068        Arc::get_mut(&mut lang_registry)
2069            .unwrap()
2070            .add(Arc::new(Language::new(
2071                LanguageConfig {
2072                    name: "Rust".into(),
2073                    path_suffixes: vec!["rs".to_string()],
2074                    language_server: Some(language_server_config),
2075                    ..Default::default()
2076                },
2077                Some(tree_sitter_rust::language()),
2078            )));
2079
2080        // Connect to a server as 2 clients.
2081        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2082        let client_a = server.create_client(cx_a, "user_a").await;
2083        let client_b = server.create_client(cx_b, "user_b").await;
2084
2085        // Share a project as client A
2086        fs.insert_tree(
2087            "/a",
2088            json!({
2089                ".zed.toml": r#"collaborators = ["user_b"]"#,
2090                "main.rs": "fn main() { a }",
2091                "other.rs": "",
2092            }),
2093        )
2094        .await;
2095        let project_a = cx_a.update(|cx| {
2096            Project::local(
2097                client_a.clone(),
2098                client_a.user_store.clone(),
2099                lang_registry.clone(),
2100                fs.clone(),
2101                cx,
2102            )
2103        });
2104        let (worktree_a, _) = project_a
2105            .update(cx_a, |p, cx| {
2106                p.find_or_create_local_worktree("/a", false, cx)
2107            })
2108            .await
2109            .unwrap();
2110        worktree_a
2111            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2112            .await;
2113        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2114        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2115        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2116
2117        // Join the worktree as client B.
2118        let project_b = Project::remote(
2119            project_id,
2120            client_b.clone(),
2121            client_b.user_store.clone(),
2122            lang_registry.clone(),
2123            fs.clone(),
2124            &mut cx_b.to_async(),
2125        )
2126        .await
2127        .unwrap();
2128
2129        // Open a file in an editor as the guest.
2130        let buffer_b = project_b
2131            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2132            .await
2133            .unwrap();
2134        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2135        let editor_b = cx_b.add_view(window_b, |cx| {
2136            Editor::for_buffer(
2137                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2138                Some(project_b.clone()),
2139                watch::channel_with(Settings::test(cx)).1,
2140                cx,
2141            )
2142        });
2143
2144        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2145        buffer_b
2146            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2147            .await;
2148
2149        // Type a completion trigger character as the guest.
2150        editor_b.update(cx_b, |editor, cx| {
2151            editor.select_ranges([13..13], None, cx);
2152            editor.handle_input(&Input(".".into()), cx);
2153            cx.focus(&editor_b);
2154        });
2155
2156        // Receive a completion request as the host's language server.
2157        // Return some completions from the host's language server.
2158        cx_a.foreground().start_waiting();
2159        fake_language_server
2160            .handle_request::<lsp::request::Completion, _>(|params, _| {
2161                assert_eq!(
2162                    params.text_document_position.text_document.uri,
2163                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2164                );
2165                assert_eq!(
2166                    params.text_document_position.position,
2167                    lsp::Position::new(0, 14),
2168                );
2169
2170                Some(lsp::CompletionResponse::Array(vec![
2171                    lsp::CompletionItem {
2172                        label: "first_method(…)".into(),
2173                        detail: Some("fn(&mut self, B) -> C".into()),
2174                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2175                            new_text: "first_method($1)".to_string(),
2176                            range: lsp::Range::new(
2177                                lsp::Position::new(0, 14),
2178                                lsp::Position::new(0, 14),
2179                            ),
2180                        })),
2181                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2182                        ..Default::default()
2183                    },
2184                    lsp::CompletionItem {
2185                        label: "second_method(…)".into(),
2186                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2187                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2188                            new_text: "second_method()".to_string(),
2189                            range: lsp::Range::new(
2190                                lsp::Position::new(0, 14),
2191                                lsp::Position::new(0, 14),
2192                            ),
2193                        })),
2194                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2195                        ..Default::default()
2196                    },
2197                ]))
2198            })
2199            .next()
2200            .await
2201            .unwrap();
2202        cx_a.foreground().finish_waiting();
2203
2204        // Open the buffer on the host.
2205        let buffer_a = project_a
2206            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2207            .await
2208            .unwrap();
2209        buffer_a
2210            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2211            .await;
2212
2213        // Confirm a completion on the guest.
2214        editor_b
2215            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2216            .await;
2217        editor_b.update(cx_b, |editor, cx| {
2218            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2219            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2220        });
2221
2222        // Return a resolved completion from the host's language server.
2223        // The resolved completion has an additional text edit.
2224        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2225            |params, _| {
2226                assert_eq!(params.label, "first_method(…)");
2227                lsp::CompletionItem {
2228                    label: "first_method(…)".into(),
2229                    detail: Some("fn(&mut self, B) -> C".into()),
2230                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2231                        new_text: "first_method($1)".to_string(),
2232                        range: lsp::Range::new(
2233                            lsp::Position::new(0, 14),
2234                            lsp::Position::new(0, 14),
2235                        ),
2236                    })),
2237                    additional_text_edits: Some(vec![lsp::TextEdit {
2238                        new_text: "use d::SomeTrait;\n".to_string(),
2239                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2240                    }]),
2241                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2242                    ..Default::default()
2243                }
2244            },
2245        );
2246
2247        // The additional edit is applied.
2248        buffer_a
2249            .condition(&cx_a, |buffer, _| {
2250                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2251            })
2252            .await;
2253        buffer_b
2254            .condition(&cx_b, |buffer, _| {
2255                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2256            })
2257            .await;
2258    }
2259
2260    #[gpui::test(iterations = 10)]
2261    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2262        cx_a.foreground().forbid_parking();
2263        let mut lang_registry = Arc::new(LanguageRegistry::new());
2264        let fs = FakeFs::new(cx_a.background());
2265
2266        // Set up a fake language server.
2267        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2268        Arc::get_mut(&mut lang_registry)
2269            .unwrap()
2270            .add(Arc::new(Language::new(
2271                LanguageConfig {
2272                    name: "Rust".into(),
2273                    path_suffixes: vec!["rs".to_string()],
2274                    language_server: Some(language_server_config),
2275                    ..Default::default()
2276                },
2277                Some(tree_sitter_rust::language()),
2278            )));
2279
2280        // Connect to a server as 2 clients.
2281        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2282        let client_a = server.create_client(cx_a, "user_a").await;
2283        let client_b = server.create_client(cx_b, "user_b").await;
2284
2285        // Share a project as client A
2286        fs.insert_tree(
2287            "/a",
2288            json!({
2289                ".zed.toml": r#"collaborators = ["user_b"]"#,
2290                "a.rs": "let one = two",
2291            }),
2292        )
2293        .await;
2294        let project_a = cx_a.update(|cx| {
2295            Project::local(
2296                client_a.clone(),
2297                client_a.user_store.clone(),
2298                lang_registry.clone(),
2299                fs.clone(),
2300                cx,
2301            )
2302        });
2303        let (worktree_a, _) = project_a
2304            .update(cx_a, |p, cx| {
2305                p.find_or_create_local_worktree("/a", false, cx)
2306            })
2307            .await
2308            .unwrap();
2309        worktree_a
2310            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2311            .await;
2312        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2313        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2314        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2315
2316        // Join the worktree as client B.
2317        let project_b = Project::remote(
2318            project_id,
2319            client_b.clone(),
2320            client_b.user_store.clone(),
2321            lang_registry.clone(),
2322            fs.clone(),
2323            &mut cx_b.to_async(),
2324        )
2325        .await
2326        .unwrap();
2327
2328        let buffer_b = cx_b
2329            .background()
2330            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2331            .await
2332            .unwrap();
2333
2334        let format = project_b.update(cx_b, |project, cx| {
2335            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2336        });
2337
2338        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2339        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2340            Some(vec![
2341                lsp::TextEdit {
2342                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2343                    new_text: "h".to_string(),
2344                },
2345                lsp::TextEdit {
2346                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2347                    new_text: "y".to_string(),
2348                },
2349            ])
2350        });
2351
2352        format.await.unwrap();
2353        assert_eq!(
2354            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2355            "let honey = two"
2356        );
2357    }
2358
2359    #[gpui::test(iterations = 10)]
2360    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2361        cx_a.foreground().forbid_parking();
2362        let mut lang_registry = Arc::new(LanguageRegistry::new());
2363        let fs = FakeFs::new(cx_a.background());
2364        fs.insert_tree(
2365            "/root-1",
2366            json!({
2367                ".zed.toml": r#"collaborators = ["user_b"]"#,
2368                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2369            }),
2370        )
2371        .await;
2372        fs.insert_tree(
2373            "/root-2",
2374            json!({
2375                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2376            }),
2377        )
2378        .await;
2379
2380        // Set up a fake language server.
2381        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2382        Arc::get_mut(&mut lang_registry)
2383            .unwrap()
2384            .add(Arc::new(Language::new(
2385                LanguageConfig {
2386                    name: "Rust".into(),
2387                    path_suffixes: vec!["rs".to_string()],
2388                    language_server: Some(language_server_config),
2389                    ..Default::default()
2390                },
2391                Some(tree_sitter_rust::language()),
2392            )));
2393
2394        // Connect to a server as 2 clients.
2395        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2396        let client_a = server.create_client(cx_a, "user_a").await;
2397        let client_b = server.create_client(cx_b, "user_b").await;
2398
2399        // Share a project as client A
2400        let project_a = cx_a.update(|cx| {
2401            Project::local(
2402                client_a.clone(),
2403                client_a.user_store.clone(),
2404                lang_registry.clone(),
2405                fs.clone(),
2406                cx,
2407            )
2408        });
2409        let (worktree_a, _) = project_a
2410            .update(cx_a, |p, cx| {
2411                p.find_or_create_local_worktree("/root-1", false, cx)
2412            })
2413            .await
2414            .unwrap();
2415        worktree_a
2416            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2417            .await;
2418        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2419        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2420        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2421
2422        // Join the worktree as client B.
2423        let project_b = Project::remote(
2424            project_id,
2425            client_b.clone(),
2426            client_b.user_store.clone(),
2427            lang_registry.clone(),
2428            fs.clone(),
2429            &mut cx_b.to_async(),
2430        )
2431        .await
2432        .unwrap();
2433
2434        // Open the file on client B.
2435        let buffer_b = cx_b
2436            .background()
2437            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2438            .await
2439            .unwrap();
2440
2441        // Request the definition of a symbol as the guest.
2442        let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2443
2444        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2445        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2446            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2447                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2448                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2449            )))
2450        });
2451
2452        let definitions_1 = definitions_1.await.unwrap();
2453        cx_b.read(|cx| {
2454            assert_eq!(definitions_1.len(), 1);
2455            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2456            let target_buffer = definitions_1[0].buffer.read(cx);
2457            assert_eq!(
2458                target_buffer.text(),
2459                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2460            );
2461            assert_eq!(
2462                definitions_1[0].range.to_point(target_buffer),
2463                Point::new(0, 6)..Point::new(0, 9)
2464            );
2465        });
2466
2467        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2468        // the previous call to `definition`.
2469        let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2470        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2471            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2472                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2473                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2474            )))
2475        });
2476
2477        let definitions_2 = definitions_2.await.unwrap();
2478        cx_b.read(|cx| {
2479            assert_eq!(definitions_2.len(), 1);
2480            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2481            let target_buffer = definitions_2[0].buffer.read(cx);
2482            assert_eq!(
2483                target_buffer.text(),
2484                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2485            );
2486            assert_eq!(
2487                definitions_2[0].range.to_point(target_buffer),
2488                Point::new(1, 6)..Point::new(1, 11)
2489            );
2490        });
2491        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2492    }
2493
2494    #[gpui::test(iterations = 10)]
2495    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2496        cx_a.foreground().forbid_parking();
2497        let mut lang_registry = Arc::new(LanguageRegistry::new());
2498        let fs = FakeFs::new(cx_a.background());
2499        fs.insert_tree(
2500            "/root-1",
2501            json!({
2502                ".zed.toml": r#"collaborators = ["user_b"]"#,
2503                "one.rs": "const ONE: usize = 1;",
2504                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2505            }),
2506        )
2507        .await;
2508        fs.insert_tree(
2509            "/root-2",
2510            json!({
2511                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2512            }),
2513        )
2514        .await;
2515
2516        // Set up a fake language server.
2517        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2518        Arc::get_mut(&mut lang_registry)
2519            .unwrap()
2520            .add(Arc::new(Language::new(
2521                LanguageConfig {
2522                    name: "Rust".into(),
2523                    path_suffixes: vec!["rs".to_string()],
2524                    language_server: Some(language_server_config),
2525                    ..Default::default()
2526                },
2527                Some(tree_sitter_rust::language()),
2528            )));
2529
2530        // Connect to a server as 2 clients.
2531        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2532        let client_a = server.create_client(cx_a, "user_a").await;
2533        let client_b = server.create_client(cx_b, "user_b").await;
2534
2535        // Share a project as client A
2536        let project_a = cx_a.update(|cx| {
2537            Project::local(
2538                client_a.clone(),
2539                client_a.user_store.clone(),
2540                lang_registry.clone(),
2541                fs.clone(),
2542                cx,
2543            )
2544        });
2545        let (worktree_a, _) = project_a
2546            .update(cx_a, |p, cx| {
2547                p.find_or_create_local_worktree("/root-1", false, cx)
2548            })
2549            .await
2550            .unwrap();
2551        worktree_a
2552            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2553            .await;
2554        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2555        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2556        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2557
2558        // Join the worktree as client B.
2559        let project_b = Project::remote(
2560            project_id,
2561            client_b.clone(),
2562            client_b.user_store.clone(),
2563            lang_registry.clone(),
2564            fs.clone(),
2565            &mut cx_b.to_async(),
2566        )
2567        .await
2568        .unwrap();
2569
2570        // Open the file on client B.
2571        let buffer_b = cx_b
2572            .background()
2573            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2574            .await
2575            .unwrap();
2576
2577        // Request references to a symbol as the guest.
2578        let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2579
2580        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2581        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2582            assert_eq!(
2583                params.text_document_position.text_document.uri.as_str(),
2584                "file:///root-1/one.rs"
2585            );
2586            Some(vec![
2587                lsp::Location {
2588                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2589                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2590                },
2591                lsp::Location {
2592                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2593                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2594                },
2595                lsp::Location {
2596                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2597                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2598                },
2599            ])
2600        });
2601
2602        let references = references.await.unwrap();
2603        cx_b.read(|cx| {
2604            assert_eq!(references.len(), 3);
2605            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2606
2607            let two_buffer = references[0].buffer.read(cx);
2608            let three_buffer = references[2].buffer.read(cx);
2609            assert_eq!(
2610                two_buffer.file().unwrap().path().as_ref(),
2611                Path::new("two.rs")
2612            );
2613            assert_eq!(references[1].buffer, references[0].buffer);
2614            assert_eq!(
2615                three_buffer.file().unwrap().full_path(cx),
2616                Path::new("three.rs")
2617            );
2618
2619            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2620            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2621            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2622        });
2623    }
2624
2625    #[gpui::test(iterations = 10)]
2626    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2627        cx_a.foreground().forbid_parking();
2628        let lang_registry = Arc::new(LanguageRegistry::new());
2629        let fs = FakeFs::new(cx_a.background());
2630        fs.insert_tree(
2631            "/root-1",
2632            json!({
2633                ".zed.toml": r#"collaborators = ["user_b"]"#,
2634                "a": "hello world",
2635                "b": "goodnight moon",
2636                "c": "a world of goo",
2637                "d": "world champion of clown world",
2638            }),
2639        )
2640        .await;
2641        fs.insert_tree(
2642            "/root-2",
2643            json!({
2644                "e": "disney world is fun",
2645            }),
2646        )
2647        .await;
2648
2649        // Connect to a server as 2 clients.
2650        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2651        let client_a = server.create_client(cx_a, "user_a").await;
2652        let client_b = server.create_client(cx_b, "user_b").await;
2653
2654        // Share a project as client A
2655        let project_a = cx_a.update(|cx| {
2656            Project::local(
2657                client_a.clone(),
2658                client_a.user_store.clone(),
2659                lang_registry.clone(),
2660                fs.clone(),
2661                cx,
2662            )
2663        });
2664        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2665
2666        let (worktree_1, _) = project_a
2667            .update(cx_a, |p, cx| {
2668                p.find_or_create_local_worktree("/root-1", false, cx)
2669            })
2670            .await
2671            .unwrap();
2672        worktree_1
2673            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2674            .await;
2675        let (worktree_2, _) = project_a
2676            .update(cx_a, |p, cx| {
2677                p.find_or_create_local_worktree("/root-2", false, cx)
2678            })
2679            .await
2680            .unwrap();
2681        worktree_2
2682            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2683            .await;
2684
2685        eprintln!("sharing");
2686
2687        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2688
2689        // Join the worktree as client B.
2690        let project_b = Project::remote(
2691            project_id,
2692            client_b.clone(),
2693            client_b.user_store.clone(),
2694            lang_registry.clone(),
2695            fs.clone(),
2696            &mut cx_b.to_async(),
2697        )
2698        .await
2699        .unwrap();
2700
2701        let results = project_b
2702            .update(cx_b, |project, cx| {
2703                project.search(SearchQuery::text("world", false, false), cx)
2704            })
2705            .await
2706            .unwrap();
2707
2708        let mut ranges_by_path = results
2709            .into_iter()
2710            .map(|(buffer, ranges)| {
2711                buffer.read_with(cx_b, |buffer, cx| {
2712                    let path = buffer.file().unwrap().full_path(cx);
2713                    let offset_ranges = ranges
2714                        .into_iter()
2715                        .map(|range| range.to_offset(buffer))
2716                        .collect::<Vec<_>>();
2717                    (path, offset_ranges)
2718                })
2719            })
2720            .collect::<Vec<_>>();
2721        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2722
2723        assert_eq!(
2724            ranges_by_path,
2725            &[
2726                (PathBuf::from("root-1/a"), vec![6..11]),
2727                (PathBuf::from("root-1/c"), vec![2..7]),
2728                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2729                (PathBuf::from("root-2/e"), vec![7..12]),
2730            ]
2731        );
2732    }
2733
2734    #[gpui::test(iterations = 10)]
2735    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2736        cx_a.foreground().forbid_parking();
2737        let lang_registry = Arc::new(LanguageRegistry::new());
2738        let fs = FakeFs::new(cx_a.background());
2739        fs.insert_tree(
2740            "/root-1",
2741            json!({
2742                ".zed.toml": r#"collaborators = ["user_b"]"#,
2743                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2744            }),
2745        )
2746        .await;
2747
2748        // Set up a fake language server.
2749        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2750        lang_registry.add(Arc::new(Language::new(
2751            LanguageConfig {
2752                name: "Rust".into(),
2753                path_suffixes: vec!["rs".to_string()],
2754                language_server: Some(language_server_config),
2755                ..Default::default()
2756            },
2757            Some(tree_sitter_rust::language()),
2758        )));
2759
2760        // Connect to a server as 2 clients.
2761        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2762        let client_a = server.create_client(cx_a, "user_a").await;
2763        let client_b = server.create_client(cx_b, "user_b").await;
2764
2765        // Share a project as client A
2766        let project_a = cx_a.update(|cx| {
2767            Project::local(
2768                client_a.clone(),
2769                client_a.user_store.clone(),
2770                lang_registry.clone(),
2771                fs.clone(),
2772                cx,
2773            )
2774        });
2775        let (worktree_a, _) = project_a
2776            .update(cx_a, |p, cx| {
2777                p.find_or_create_local_worktree("/root-1", false, cx)
2778            })
2779            .await
2780            .unwrap();
2781        worktree_a
2782            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2783            .await;
2784        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2785        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2786        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2787
2788        // Join the worktree as client B.
2789        let project_b = Project::remote(
2790            project_id,
2791            client_b.clone(),
2792            client_b.user_store.clone(),
2793            lang_registry.clone(),
2794            fs.clone(),
2795            &mut cx_b.to_async(),
2796        )
2797        .await
2798        .unwrap();
2799
2800        // Open the file on client B.
2801        let buffer_b = cx_b
2802            .background()
2803            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2804            .await
2805            .unwrap();
2806
2807        // Request document highlights as the guest.
2808        let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2809
2810        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2811        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2812            |params, _| {
2813                assert_eq!(
2814                    params
2815                        .text_document_position_params
2816                        .text_document
2817                        .uri
2818                        .as_str(),
2819                    "file:///root-1/main.rs"
2820                );
2821                assert_eq!(
2822                    params.text_document_position_params.position,
2823                    lsp::Position::new(0, 34)
2824                );
2825                Some(vec![
2826                    lsp::DocumentHighlight {
2827                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2828                        range: lsp::Range::new(
2829                            lsp::Position::new(0, 10),
2830                            lsp::Position::new(0, 16),
2831                        ),
2832                    },
2833                    lsp::DocumentHighlight {
2834                        kind: Some(lsp::DocumentHighlightKind::READ),
2835                        range: lsp::Range::new(
2836                            lsp::Position::new(0, 32),
2837                            lsp::Position::new(0, 38),
2838                        ),
2839                    },
2840                    lsp::DocumentHighlight {
2841                        kind: Some(lsp::DocumentHighlightKind::READ),
2842                        range: lsp::Range::new(
2843                            lsp::Position::new(0, 41),
2844                            lsp::Position::new(0, 47),
2845                        ),
2846                    },
2847                ])
2848            },
2849        );
2850
2851        let highlights = highlights.await.unwrap();
2852        buffer_b.read_with(cx_b, |buffer, _| {
2853            let snapshot = buffer.snapshot();
2854
2855            let highlights = highlights
2856                .into_iter()
2857                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2858                .collect::<Vec<_>>();
2859            assert_eq!(
2860                highlights,
2861                &[
2862                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2863                    (lsp::DocumentHighlightKind::READ, 32..38),
2864                    (lsp::DocumentHighlightKind::READ, 41..47)
2865                ]
2866            )
2867        });
2868    }
2869
2870    #[gpui::test(iterations = 10)]
2871    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2872        cx_a.foreground().forbid_parking();
2873        let mut lang_registry = Arc::new(LanguageRegistry::new());
2874        let fs = FakeFs::new(cx_a.background());
2875        fs.insert_tree(
2876            "/code",
2877            json!({
2878                "crate-1": {
2879                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2880                    "one.rs": "const ONE: usize = 1;",
2881                },
2882                "crate-2": {
2883                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2884                },
2885                "private": {
2886                    "passwords.txt": "the-password",
2887                }
2888            }),
2889        )
2890        .await;
2891
2892        // Set up a fake language server.
2893        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2894        Arc::get_mut(&mut lang_registry)
2895            .unwrap()
2896            .add(Arc::new(Language::new(
2897                LanguageConfig {
2898                    name: "Rust".into(),
2899                    path_suffixes: vec!["rs".to_string()],
2900                    language_server: Some(language_server_config),
2901                    ..Default::default()
2902                },
2903                Some(tree_sitter_rust::language()),
2904            )));
2905
2906        // Connect to a server as 2 clients.
2907        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2908        let client_a = server.create_client(cx_a, "user_a").await;
2909        let client_b = server.create_client(cx_b, "user_b").await;
2910
2911        // Share a project as client A
2912        let project_a = cx_a.update(|cx| {
2913            Project::local(
2914                client_a.clone(),
2915                client_a.user_store.clone(),
2916                lang_registry.clone(),
2917                fs.clone(),
2918                cx,
2919            )
2920        });
2921        let (worktree_a, _) = project_a
2922            .update(cx_a, |p, cx| {
2923                p.find_or_create_local_worktree("/code/crate-1", false, cx)
2924            })
2925            .await
2926            .unwrap();
2927        worktree_a
2928            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2929            .await;
2930        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2931        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2932        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2933
2934        // Join the worktree as client B.
2935        let project_b = Project::remote(
2936            project_id,
2937            client_b.clone(),
2938            client_b.user_store.clone(),
2939            lang_registry.clone(),
2940            fs.clone(),
2941            &mut cx_b.to_async(),
2942        )
2943        .await
2944        .unwrap();
2945
2946        // Cause the language server to start.
2947        let _buffer = cx_b
2948            .background()
2949            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2950            .await
2951            .unwrap();
2952
2953        // Request the definition of a symbol as the guest.
2954        let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
2955        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2956        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2957            #[allow(deprecated)]
2958            Some(vec![lsp::SymbolInformation {
2959                name: "TWO".into(),
2960                location: lsp::Location {
2961                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2962                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2963                },
2964                kind: lsp::SymbolKind::CONSTANT,
2965                tags: None,
2966                container_name: None,
2967                deprecated: None,
2968            }])
2969        });
2970
2971        let symbols = symbols.await.unwrap();
2972        assert_eq!(symbols.len(), 1);
2973        assert_eq!(symbols[0].name, "TWO");
2974
2975        // Open one of the returned symbols.
2976        let buffer_b_2 = project_b
2977            .update(cx_b, |project, cx| {
2978                project.open_buffer_for_symbol(&symbols[0], cx)
2979            })
2980            .await
2981            .unwrap();
2982        buffer_b_2.read_with(cx_b, |buffer, _| {
2983            assert_eq!(
2984                buffer.file().unwrap().path().as_ref(),
2985                Path::new("../crate-2/two.rs")
2986            );
2987        });
2988
2989        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
2990        let mut fake_symbol = symbols[0].clone();
2991        fake_symbol.path = Path::new("/code/secrets").into();
2992        let error = project_b
2993            .update(cx_b, |project, cx| {
2994                project.open_buffer_for_symbol(&fake_symbol, cx)
2995            })
2996            .await
2997            .unwrap_err();
2998        assert!(error.to_string().contains("invalid symbol signature"));
2999    }
3000
3001    #[gpui::test(iterations = 10)]
3002    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3003        cx_a: &mut TestAppContext,
3004        cx_b: &mut TestAppContext,
3005        mut rng: StdRng,
3006    ) {
3007        cx_a.foreground().forbid_parking();
3008        let mut lang_registry = Arc::new(LanguageRegistry::new());
3009        let fs = FakeFs::new(cx_a.background());
3010        fs.insert_tree(
3011            "/root",
3012            json!({
3013                ".zed.toml": r#"collaborators = ["user_b"]"#,
3014                "a.rs": "const ONE: usize = b::TWO;",
3015                "b.rs": "const TWO: usize = 2",
3016            }),
3017        )
3018        .await;
3019
3020        // Set up a fake language server.
3021        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3022
3023        Arc::get_mut(&mut lang_registry)
3024            .unwrap()
3025            .add(Arc::new(Language::new(
3026                LanguageConfig {
3027                    name: "Rust".into(),
3028                    path_suffixes: vec!["rs".to_string()],
3029                    language_server: Some(language_server_config),
3030                    ..Default::default()
3031                },
3032                Some(tree_sitter_rust::language()),
3033            )));
3034
3035        // Connect to a server as 2 clients.
3036        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3037        let client_a = server.create_client(cx_a, "user_a").await;
3038        let client_b = server.create_client(cx_b, "user_b").await;
3039
3040        // Share a project as client A
3041        let project_a = cx_a.update(|cx| {
3042            Project::local(
3043                client_a.clone(),
3044                client_a.user_store.clone(),
3045                lang_registry.clone(),
3046                fs.clone(),
3047                cx,
3048            )
3049        });
3050
3051        let (worktree_a, _) = project_a
3052            .update(cx_a, |p, cx| {
3053                p.find_or_create_local_worktree("/root", false, cx)
3054            })
3055            .await
3056            .unwrap();
3057        worktree_a
3058            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3059            .await;
3060        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3061        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3062        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3063
3064        // Join the worktree as client B.
3065        let project_b = Project::remote(
3066            project_id,
3067            client_b.clone(),
3068            client_b.user_store.clone(),
3069            lang_registry.clone(),
3070            fs.clone(),
3071            &mut cx_b.to_async(),
3072        )
3073        .await
3074        .unwrap();
3075
3076        let buffer_b1 = cx_b
3077            .background()
3078            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3079            .await
3080            .unwrap();
3081
3082        let definitions;
3083        let buffer_b2;
3084        if rng.gen() {
3085            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3086            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3087        } else {
3088            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3089            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3090        }
3091
3092        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3093        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3094            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3095                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3096                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3097            )))
3098        });
3099
3100        let buffer_b2 = buffer_b2.await.unwrap();
3101        let definitions = definitions.await.unwrap();
3102        assert_eq!(definitions.len(), 1);
3103        assert_eq!(definitions[0].buffer, buffer_b2);
3104    }
3105
3106    #[gpui::test(iterations = 10)]
3107    async fn test_collaborating_with_code_actions(
3108        cx_a: &mut TestAppContext,
3109        cx_b: &mut TestAppContext,
3110    ) {
3111        cx_a.foreground().forbid_parking();
3112        let mut lang_registry = Arc::new(LanguageRegistry::new());
3113        let fs = FakeFs::new(cx_a.background());
3114        let mut path_openers_b = Vec::new();
3115        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3116
3117        // Set up a fake language server.
3118        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3119        Arc::get_mut(&mut lang_registry)
3120            .unwrap()
3121            .add(Arc::new(Language::new(
3122                LanguageConfig {
3123                    name: "Rust".into(),
3124                    path_suffixes: vec!["rs".to_string()],
3125                    language_server: Some(language_server_config),
3126                    ..Default::default()
3127                },
3128                Some(tree_sitter_rust::language()),
3129            )));
3130
3131        // Connect to a server as 2 clients.
3132        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3133        let client_a = server.create_client(cx_a, "user_a").await;
3134        let client_b = server.create_client(cx_b, "user_b").await;
3135
3136        // Share a project as client A
3137        fs.insert_tree(
3138            "/a",
3139            json!({
3140                ".zed.toml": r#"collaborators = ["user_b"]"#,
3141                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3142                "other.rs": "pub fn foo() -> usize { 4 }",
3143            }),
3144        )
3145        .await;
3146        let project_a = cx_a.update(|cx| {
3147            Project::local(
3148                client_a.clone(),
3149                client_a.user_store.clone(),
3150                lang_registry.clone(),
3151                fs.clone(),
3152                cx,
3153            )
3154        });
3155        let (worktree_a, _) = project_a
3156            .update(cx_a, |p, cx| {
3157                p.find_or_create_local_worktree("/a", false, cx)
3158            })
3159            .await
3160            .unwrap();
3161        worktree_a
3162            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3163            .await;
3164        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3165        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3166        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3167
3168        // Join the worktree as client B.
3169        let project_b = Project::remote(
3170            project_id,
3171            client_b.clone(),
3172            client_b.user_store.clone(),
3173            lang_registry.clone(),
3174            fs.clone(),
3175            &mut cx_b.to_async(),
3176        )
3177        .await
3178        .unwrap();
3179        let mut params = cx_b.update(WorkspaceParams::test);
3180        params.languages = lang_registry.clone();
3181        params.client = client_b.client.clone();
3182        params.user_store = client_b.user_store.clone();
3183        params.project = project_b;
3184        params.path_openers = path_openers_b.into();
3185
3186        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3187        let editor_b = workspace_b
3188            .update(cx_b, |workspace, cx| {
3189                workspace.open_path((worktree_id, "main.rs").into(), cx)
3190            })
3191            .await
3192            .unwrap()
3193            .downcast::<Editor>()
3194            .unwrap();
3195
3196        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3197        fake_language_server
3198            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3199                assert_eq!(
3200                    params.text_document.uri,
3201                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3202                );
3203                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3204                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3205                None
3206            })
3207            .next()
3208            .await;
3209
3210        // Move cursor to a location that contains code actions.
3211        editor_b.update(cx_b, |editor, cx| {
3212            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3213            cx.focus(&editor_b);
3214        });
3215
3216        fake_language_server
3217            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3218                assert_eq!(
3219                    params.text_document.uri,
3220                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3221                );
3222                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3223                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3224
3225                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3226                    lsp::CodeAction {
3227                        title: "Inline into all callers".to_string(),
3228                        edit: Some(lsp::WorkspaceEdit {
3229                            changes: Some(
3230                                [
3231                                    (
3232                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3233                                        vec![lsp::TextEdit::new(
3234                                            lsp::Range::new(
3235                                                lsp::Position::new(1, 22),
3236                                                lsp::Position::new(1, 34),
3237                                            ),
3238                                            "4".to_string(),
3239                                        )],
3240                                    ),
3241                                    (
3242                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3243                                        vec![lsp::TextEdit::new(
3244                                            lsp::Range::new(
3245                                                lsp::Position::new(0, 0),
3246                                                lsp::Position::new(0, 27),
3247                                            ),
3248                                            "".to_string(),
3249                                        )],
3250                                    ),
3251                                ]
3252                                .into_iter()
3253                                .collect(),
3254                            ),
3255                            ..Default::default()
3256                        }),
3257                        data: Some(json!({
3258                            "codeActionParams": {
3259                                "range": {
3260                                    "start": {"line": 1, "column": 31},
3261                                    "end": {"line": 1, "column": 31},
3262                                }
3263                            }
3264                        })),
3265                        ..Default::default()
3266                    },
3267                )])
3268            })
3269            .next()
3270            .await;
3271
3272        // Toggle code actions and wait for them to display.
3273        editor_b.update(cx_b, |editor, cx| {
3274            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3275        });
3276        editor_b
3277            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3278            .await;
3279
3280        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3281
3282        // Confirming the code action will trigger a resolve request.
3283        let confirm_action = workspace_b
3284            .update(cx_b, |workspace, cx| {
3285                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3286            })
3287            .unwrap();
3288        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3289            lsp::CodeAction {
3290                title: "Inline into all callers".to_string(),
3291                edit: Some(lsp::WorkspaceEdit {
3292                    changes: Some(
3293                        [
3294                            (
3295                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3296                                vec![lsp::TextEdit::new(
3297                                    lsp::Range::new(
3298                                        lsp::Position::new(1, 22),
3299                                        lsp::Position::new(1, 34),
3300                                    ),
3301                                    "4".to_string(),
3302                                )],
3303                            ),
3304                            (
3305                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3306                                vec![lsp::TextEdit::new(
3307                                    lsp::Range::new(
3308                                        lsp::Position::new(0, 0),
3309                                        lsp::Position::new(0, 27),
3310                                    ),
3311                                    "".to_string(),
3312                                )],
3313                            ),
3314                        ]
3315                        .into_iter()
3316                        .collect(),
3317                    ),
3318                    ..Default::default()
3319                }),
3320                ..Default::default()
3321            }
3322        });
3323
3324        // After the action is confirmed, an editor containing both modified files is opened.
3325        confirm_action.await.unwrap();
3326        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3327            workspace
3328                .active_item(cx)
3329                .unwrap()
3330                .downcast::<Editor>()
3331                .unwrap()
3332        });
3333        code_action_editor.update(cx_b, |editor, cx| {
3334            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3335            editor.undo(&Undo, cx);
3336            assert_eq!(
3337                editor.text(cx),
3338                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3339            );
3340            editor.redo(&Redo, cx);
3341            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3342        });
3343    }
3344
3345    #[gpui::test(iterations = 10)]
3346    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3347        cx_a.foreground().forbid_parking();
3348        let mut lang_registry = Arc::new(LanguageRegistry::new());
3349        let fs = FakeFs::new(cx_a.background());
3350        let mut path_openers_b = Vec::new();
3351        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3352
3353        // Set up a fake language server.
3354        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3355        Arc::get_mut(&mut lang_registry)
3356            .unwrap()
3357            .add(Arc::new(Language::new(
3358                LanguageConfig {
3359                    name: "Rust".into(),
3360                    path_suffixes: vec!["rs".to_string()],
3361                    language_server: Some(language_server_config),
3362                    ..Default::default()
3363                },
3364                Some(tree_sitter_rust::language()),
3365            )));
3366
3367        // Connect to a server as 2 clients.
3368        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3369        let client_a = server.create_client(cx_a, "user_a").await;
3370        let client_b = server.create_client(cx_b, "user_b").await;
3371
3372        // Share a project as client A
3373        fs.insert_tree(
3374            "/dir",
3375            json!({
3376                ".zed.toml": r#"collaborators = ["user_b"]"#,
3377                "one.rs": "const ONE: usize = 1;",
3378                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3379            }),
3380        )
3381        .await;
3382        let project_a = cx_a.update(|cx| {
3383            Project::local(
3384                client_a.clone(),
3385                client_a.user_store.clone(),
3386                lang_registry.clone(),
3387                fs.clone(),
3388                cx,
3389            )
3390        });
3391        let (worktree_a, _) = project_a
3392            .update(cx_a, |p, cx| {
3393                p.find_or_create_local_worktree("/dir", false, cx)
3394            })
3395            .await
3396            .unwrap();
3397        worktree_a
3398            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3399            .await;
3400        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3401        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3402        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3403
3404        // Join the worktree as client B.
3405        let project_b = Project::remote(
3406            project_id,
3407            client_b.clone(),
3408            client_b.user_store.clone(),
3409            lang_registry.clone(),
3410            fs.clone(),
3411            &mut cx_b.to_async(),
3412        )
3413        .await
3414        .unwrap();
3415        let mut params = cx_b.update(WorkspaceParams::test);
3416        params.languages = lang_registry.clone();
3417        params.client = client_b.client.clone();
3418        params.user_store = client_b.user_store.clone();
3419        params.project = project_b;
3420        params.path_openers = path_openers_b.into();
3421
3422        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3423        let editor_b = workspace_b
3424            .update(cx_b, |workspace, cx| {
3425                workspace.open_path((worktree_id, "one.rs").into(), cx)
3426            })
3427            .await
3428            .unwrap()
3429            .downcast::<Editor>()
3430            .unwrap();
3431        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3432
3433        // Move cursor to a location that can be renamed.
3434        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3435            editor.select_ranges([7..7], None, cx);
3436            editor.rename(&Rename, cx).unwrap()
3437        });
3438
3439        fake_language_server
3440            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3441                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3442                assert_eq!(params.position, lsp::Position::new(0, 7));
3443                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3444                    lsp::Position::new(0, 6),
3445                    lsp::Position::new(0, 9),
3446                )))
3447            })
3448            .next()
3449            .await
3450            .unwrap();
3451        prepare_rename.await.unwrap();
3452        editor_b.update(cx_b, |editor, cx| {
3453            let rename = editor.pending_rename().unwrap();
3454            let buffer = editor.buffer().read(cx).snapshot(cx);
3455            assert_eq!(
3456                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3457                6..9
3458            );
3459            rename.editor.update(cx, |rename_editor, cx| {
3460                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3461                    rename_buffer.edit([0..3], "THREE", cx);
3462                });
3463            });
3464        });
3465
3466        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3467            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3468        });
3469        fake_language_server
3470            .handle_request::<lsp::request::Rename, _>(|params, _| {
3471                assert_eq!(
3472                    params.text_document_position.text_document.uri.as_str(),
3473                    "file:///dir/one.rs"
3474                );
3475                assert_eq!(
3476                    params.text_document_position.position,
3477                    lsp::Position::new(0, 6)
3478                );
3479                assert_eq!(params.new_name, "THREE");
3480                Some(lsp::WorkspaceEdit {
3481                    changes: Some(
3482                        [
3483                            (
3484                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3485                                vec![lsp::TextEdit::new(
3486                                    lsp::Range::new(
3487                                        lsp::Position::new(0, 6),
3488                                        lsp::Position::new(0, 9),
3489                                    ),
3490                                    "THREE".to_string(),
3491                                )],
3492                            ),
3493                            (
3494                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3495                                vec![
3496                                    lsp::TextEdit::new(
3497                                        lsp::Range::new(
3498                                            lsp::Position::new(0, 24),
3499                                            lsp::Position::new(0, 27),
3500                                        ),
3501                                        "THREE".to_string(),
3502                                    ),
3503                                    lsp::TextEdit::new(
3504                                        lsp::Range::new(
3505                                            lsp::Position::new(0, 35),
3506                                            lsp::Position::new(0, 38),
3507                                        ),
3508                                        "THREE".to_string(),
3509                                    ),
3510                                ],
3511                            ),
3512                        ]
3513                        .into_iter()
3514                        .collect(),
3515                    ),
3516                    ..Default::default()
3517                })
3518            })
3519            .next()
3520            .await
3521            .unwrap();
3522        confirm_rename.await.unwrap();
3523
3524        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3525            workspace
3526                .active_item(cx)
3527                .unwrap()
3528                .downcast::<Editor>()
3529                .unwrap()
3530        });
3531        rename_editor.update(cx_b, |editor, cx| {
3532            assert_eq!(
3533                editor.text(cx),
3534                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3535            );
3536            editor.undo(&Undo, cx);
3537            assert_eq!(
3538                editor.text(cx),
3539                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3540            );
3541            editor.redo(&Redo, cx);
3542            assert_eq!(
3543                editor.text(cx),
3544                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3545            );
3546        });
3547
3548        // Ensure temporary rename edits cannot be undone/redone.
3549        editor_b.update(cx_b, |editor, cx| {
3550            editor.undo(&Undo, cx);
3551            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3552            editor.undo(&Undo, cx);
3553            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3554            editor.redo(&Redo, cx);
3555            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3556        })
3557    }
3558
3559    #[gpui::test(iterations = 10)]
3560    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3561        cx_a.foreground().forbid_parking();
3562
3563        // Connect to a server as 2 clients.
3564        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3565        let client_a = server.create_client(cx_a, "user_a").await;
3566        let client_b = server.create_client(cx_b, "user_b").await;
3567
3568        // Create an org that includes these 2 users.
3569        let db = &server.app_state.db;
3570        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3571        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3572            .await
3573            .unwrap();
3574        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3575            .await
3576            .unwrap();
3577
3578        // Create a channel that includes all the users.
3579        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3580        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3581            .await
3582            .unwrap();
3583        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3584            .await
3585            .unwrap();
3586        db.create_channel_message(
3587            channel_id,
3588            client_b.current_user_id(&cx_b),
3589            "hello A, it's B.",
3590            OffsetDateTime::now_utc(),
3591            1,
3592        )
3593        .await
3594        .unwrap();
3595
3596        let channels_a = cx_a
3597            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3598        channels_a
3599            .condition(cx_a, |list, _| list.available_channels().is_some())
3600            .await;
3601        channels_a.read_with(cx_a, |list, _| {
3602            assert_eq!(
3603                list.available_channels().unwrap(),
3604                &[ChannelDetails {
3605                    id: channel_id.to_proto(),
3606                    name: "test-channel".to_string()
3607                }]
3608            )
3609        });
3610        let channel_a = channels_a.update(cx_a, |this, cx| {
3611            this.get_channel(channel_id.to_proto(), cx).unwrap()
3612        });
3613        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3614        channel_a
3615            .condition(&cx_a, |channel, _| {
3616                channel_messages(channel)
3617                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3618            })
3619            .await;
3620
3621        let channels_b = cx_b
3622            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3623        channels_b
3624            .condition(cx_b, |list, _| list.available_channels().is_some())
3625            .await;
3626        channels_b.read_with(cx_b, |list, _| {
3627            assert_eq!(
3628                list.available_channels().unwrap(),
3629                &[ChannelDetails {
3630                    id: channel_id.to_proto(),
3631                    name: "test-channel".to_string()
3632                }]
3633            )
3634        });
3635
3636        let channel_b = channels_b.update(cx_b, |this, cx| {
3637            this.get_channel(channel_id.to_proto(), cx).unwrap()
3638        });
3639        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3640        channel_b
3641            .condition(&cx_b, |channel, _| {
3642                channel_messages(channel)
3643                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3644            })
3645            .await;
3646
3647        channel_a
3648            .update(cx_a, |channel, cx| {
3649                channel
3650                    .send_message("oh, hi B.".to_string(), cx)
3651                    .unwrap()
3652                    .detach();
3653                let task = channel.send_message("sup".to_string(), cx).unwrap();
3654                assert_eq!(
3655                    channel_messages(channel),
3656                    &[
3657                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3658                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3659                        ("user_a".to_string(), "sup".to_string(), true)
3660                    ]
3661                );
3662                task
3663            })
3664            .await
3665            .unwrap();
3666
3667        channel_b
3668            .condition(&cx_b, |channel, _| {
3669                channel_messages(channel)
3670                    == [
3671                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3672                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3673                        ("user_a".to_string(), "sup".to_string(), false),
3674                    ]
3675            })
3676            .await;
3677
3678        assert_eq!(
3679            server
3680                .state()
3681                .await
3682                .channel(channel_id)
3683                .unwrap()
3684                .connection_ids
3685                .len(),
3686            2
3687        );
3688        cx_b.update(|_| drop(channel_b));
3689        server
3690            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3691            .await;
3692
3693        cx_a.update(|_| drop(channel_a));
3694        server
3695            .condition(|state| state.channel(channel_id).is_none())
3696            .await;
3697    }
3698
3699    #[gpui::test(iterations = 10)]
3700    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3701        cx_a.foreground().forbid_parking();
3702
3703        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3704        let client_a = server.create_client(cx_a, "user_a").await;
3705
3706        let db = &server.app_state.db;
3707        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3708        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3709        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3710            .await
3711            .unwrap();
3712        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3713            .await
3714            .unwrap();
3715
3716        let channels_a = cx_a
3717            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3718        channels_a
3719            .condition(cx_a, |list, _| list.available_channels().is_some())
3720            .await;
3721        let channel_a = channels_a.update(cx_a, |this, cx| {
3722            this.get_channel(channel_id.to_proto(), cx).unwrap()
3723        });
3724
3725        // Messages aren't allowed to be too long.
3726        channel_a
3727            .update(cx_a, |channel, cx| {
3728                let long_body = "this is long.\n".repeat(1024);
3729                channel.send_message(long_body, cx).unwrap()
3730            })
3731            .await
3732            .unwrap_err();
3733
3734        // Messages aren't allowed to be blank.
3735        channel_a.update(cx_a, |channel, cx| {
3736            channel.send_message(String::new(), cx).unwrap_err()
3737        });
3738
3739        // Leading and trailing whitespace are trimmed.
3740        channel_a
3741            .update(cx_a, |channel, cx| {
3742                channel
3743                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3744                    .unwrap()
3745            })
3746            .await
3747            .unwrap();
3748        assert_eq!(
3749            db.get_channel_messages(channel_id, 10, None)
3750                .await
3751                .unwrap()
3752                .iter()
3753                .map(|m| &m.body)
3754                .collect::<Vec<_>>(),
3755            &["surrounded by whitespace"]
3756        );
3757    }
3758
3759    #[gpui::test(iterations = 10)]
3760    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3761        cx_a.foreground().forbid_parking();
3762
3763        // Connect to a server as 2 clients.
3764        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3765        let client_a = server.create_client(cx_a, "user_a").await;
3766        let client_b = server.create_client(cx_b, "user_b").await;
3767        let mut status_b = client_b.status();
3768
3769        // Create an org that includes these 2 users.
3770        let db = &server.app_state.db;
3771        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3772        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3773            .await
3774            .unwrap();
3775        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3776            .await
3777            .unwrap();
3778
3779        // Create a channel that includes all the users.
3780        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3781        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3782            .await
3783            .unwrap();
3784        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3785            .await
3786            .unwrap();
3787        db.create_channel_message(
3788            channel_id,
3789            client_b.current_user_id(&cx_b),
3790            "hello A, it's B.",
3791            OffsetDateTime::now_utc(),
3792            2,
3793        )
3794        .await
3795        .unwrap();
3796
3797        let channels_a = cx_a
3798            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3799        channels_a
3800            .condition(cx_a, |list, _| list.available_channels().is_some())
3801            .await;
3802
3803        channels_a.read_with(cx_a, |list, _| {
3804            assert_eq!(
3805                list.available_channels().unwrap(),
3806                &[ChannelDetails {
3807                    id: channel_id.to_proto(),
3808                    name: "test-channel".to_string()
3809                }]
3810            )
3811        });
3812        let channel_a = channels_a.update(cx_a, |this, cx| {
3813            this.get_channel(channel_id.to_proto(), cx).unwrap()
3814        });
3815        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3816        channel_a
3817            .condition(&cx_a, |channel, _| {
3818                channel_messages(channel)
3819                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3820            })
3821            .await;
3822
3823        let channels_b = cx_b
3824            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3825        channels_b
3826            .condition(cx_b, |list, _| list.available_channels().is_some())
3827            .await;
3828        channels_b.read_with(cx_b, |list, _| {
3829            assert_eq!(
3830                list.available_channels().unwrap(),
3831                &[ChannelDetails {
3832                    id: channel_id.to_proto(),
3833                    name: "test-channel".to_string()
3834                }]
3835            )
3836        });
3837
3838        let channel_b = channels_b.update(cx_b, |this, cx| {
3839            this.get_channel(channel_id.to_proto(), cx).unwrap()
3840        });
3841        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3842        channel_b
3843            .condition(&cx_b, |channel, _| {
3844                channel_messages(channel)
3845                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3846            })
3847            .await;
3848
3849        // Disconnect client B, ensuring we can still access its cached channel data.
3850        server.forbid_connections();
3851        server.disconnect_client(client_b.current_user_id(&cx_b));
3852        while !matches!(
3853            status_b.next().await,
3854            Some(client::Status::ReconnectionError { .. })
3855        ) {}
3856
3857        channels_b.read_with(cx_b, |channels, _| {
3858            assert_eq!(
3859                channels.available_channels().unwrap(),
3860                [ChannelDetails {
3861                    id: channel_id.to_proto(),
3862                    name: "test-channel".to_string()
3863                }]
3864            )
3865        });
3866        channel_b.read_with(cx_b, |channel, _| {
3867            assert_eq!(
3868                channel_messages(channel),
3869                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3870            )
3871        });
3872
3873        // Send a message from client B while it is disconnected.
3874        channel_b
3875            .update(cx_b, |channel, cx| {
3876                let task = channel
3877                    .send_message("can you see this?".to_string(), cx)
3878                    .unwrap();
3879                assert_eq!(
3880                    channel_messages(channel),
3881                    &[
3882                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3883                        ("user_b".to_string(), "can you see this?".to_string(), true)
3884                    ]
3885                );
3886                task
3887            })
3888            .await
3889            .unwrap_err();
3890
3891        // Send a message from client A while B is disconnected.
3892        channel_a
3893            .update(cx_a, |channel, cx| {
3894                channel
3895                    .send_message("oh, hi B.".to_string(), cx)
3896                    .unwrap()
3897                    .detach();
3898                let task = channel.send_message("sup".to_string(), cx).unwrap();
3899                assert_eq!(
3900                    channel_messages(channel),
3901                    &[
3902                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3903                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3904                        ("user_a".to_string(), "sup".to_string(), true)
3905                    ]
3906                );
3907                task
3908            })
3909            .await
3910            .unwrap();
3911
3912        // Give client B a chance to reconnect.
3913        server.allow_connections();
3914        cx_b.foreground().advance_clock(Duration::from_secs(10));
3915
3916        // Verify that B sees the new messages upon reconnection, as well as the message client B
3917        // sent while offline.
3918        channel_b
3919            .condition(&cx_b, |channel, _| {
3920                channel_messages(channel)
3921                    == [
3922                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3923                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3924                        ("user_a".to_string(), "sup".to_string(), false),
3925                        ("user_b".to_string(), "can you see this?".to_string(), false),
3926                    ]
3927            })
3928            .await;
3929
3930        // Ensure client A and B can communicate normally after reconnection.
3931        channel_a
3932            .update(cx_a, |channel, cx| {
3933                channel.send_message("you online?".to_string(), cx).unwrap()
3934            })
3935            .await
3936            .unwrap();
3937        channel_b
3938            .condition(&cx_b, |channel, _| {
3939                channel_messages(channel)
3940                    == [
3941                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3942                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3943                        ("user_a".to_string(), "sup".to_string(), false),
3944                        ("user_b".to_string(), "can you see this?".to_string(), false),
3945                        ("user_a".to_string(), "you online?".to_string(), false),
3946                    ]
3947            })
3948            .await;
3949
3950        channel_b
3951            .update(cx_b, |channel, cx| {
3952                channel.send_message("yep".to_string(), cx).unwrap()
3953            })
3954            .await
3955            .unwrap();
3956        channel_a
3957            .condition(&cx_a, |channel, _| {
3958                channel_messages(channel)
3959                    == [
3960                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3961                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3962                        ("user_a".to_string(), "sup".to_string(), false),
3963                        ("user_b".to_string(), "can you see this?".to_string(), false),
3964                        ("user_a".to_string(), "you online?".to_string(), false),
3965                        ("user_b".to_string(), "yep".to_string(), false),
3966                    ]
3967            })
3968            .await;
3969    }
3970
3971    #[gpui::test(iterations = 10)]
3972    async fn test_contacts(
3973        cx_a: &mut TestAppContext,
3974        cx_b: &mut TestAppContext,
3975        cx_c: &mut TestAppContext,
3976    ) {
3977        cx_a.foreground().forbid_parking();
3978        let lang_registry = Arc::new(LanguageRegistry::new());
3979        let fs = FakeFs::new(cx_a.background());
3980
3981        // Connect to a server as 3 clients.
3982        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3983        let client_a = server.create_client(cx_a, "user_a").await;
3984        let client_b = server.create_client(cx_b, "user_b").await;
3985        let client_c = server.create_client(cx_c, "user_c").await;
3986
3987        // Share a worktree as client A.
3988        fs.insert_tree(
3989            "/a",
3990            json!({
3991                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3992            }),
3993        )
3994        .await;
3995
3996        let project_a = cx_a.update(|cx| {
3997            Project::local(
3998                client_a.clone(),
3999                client_a.user_store.clone(),
4000                lang_registry.clone(),
4001                fs.clone(),
4002                cx,
4003            )
4004        });
4005        let (worktree_a, _) = project_a
4006            .update(cx_a, |p, cx| {
4007                p.find_or_create_local_worktree("/a", false, cx)
4008            })
4009            .await
4010            .unwrap();
4011        worktree_a
4012            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4013            .await;
4014
4015        client_a
4016            .user_store
4017            .condition(&cx_a, |user_store, _| {
4018                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4019            })
4020            .await;
4021        client_b
4022            .user_store
4023            .condition(&cx_b, |user_store, _| {
4024                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4025            })
4026            .await;
4027        client_c
4028            .user_store
4029            .condition(&cx_c, |user_store, _| {
4030                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4031            })
4032            .await;
4033
4034        let project_id = project_a
4035            .update(cx_a, |project, _| project.next_remote_id())
4036            .await;
4037        project_a
4038            .update(cx_a, |project, cx| project.share(cx))
4039            .await
4040            .unwrap();
4041
4042        let _project_b = Project::remote(
4043            project_id,
4044            client_b.clone(),
4045            client_b.user_store.clone(),
4046            lang_registry.clone(),
4047            fs.clone(),
4048            &mut cx_b.to_async(),
4049        )
4050        .await
4051        .unwrap();
4052
4053        client_a
4054            .user_store
4055            .condition(&cx_a, |user_store, _| {
4056                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4057            })
4058            .await;
4059        client_b
4060            .user_store
4061            .condition(&cx_b, |user_store, _| {
4062                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4063            })
4064            .await;
4065        client_c
4066            .user_store
4067            .condition(&cx_c, |user_store, _| {
4068                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4069            })
4070            .await;
4071
4072        project_a
4073            .condition(&cx_a, |project, _| {
4074                project.collaborators().contains_key(&client_b.peer_id)
4075            })
4076            .await;
4077
4078        cx_a.update(move |_| drop(project_a));
4079        client_a
4080            .user_store
4081            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4082            .await;
4083        client_b
4084            .user_store
4085            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4086            .await;
4087        client_c
4088            .user_store
4089            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4090            .await;
4091
4092        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4093            user_store
4094                .contacts()
4095                .iter()
4096                .map(|contact| {
4097                    let worktrees = contact
4098                        .projects
4099                        .iter()
4100                        .map(|p| {
4101                            (
4102                                p.worktree_root_names[0].as_str(),
4103                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4104                            )
4105                        })
4106                        .collect();
4107                    (contact.user.github_login.as_str(), worktrees)
4108                })
4109                .collect()
4110        }
4111    }
4112
4113    #[gpui::test(iterations = 100)]
4114    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4115        cx.foreground().forbid_parking();
4116        let max_peers = env::var("MAX_PEERS")
4117            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4118            .unwrap_or(5);
4119        let max_operations = env::var("OPERATIONS")
4120            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4121            .unwrap_or(10);
4122
4123        let rng = Arc::new(Mutex::new(rng));
4124
4125        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4126        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4127
4128        let fs = FakeFs::new(cx.background());
4129        fs.insert_tree(
4130            "/_collab",
4131            json!({
4132                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4133            }),
4134        )
4135        .await;
4136
4137        let operations = Rc::new(Cell::new(0));
4138        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4139        let mut clients = Vec::new();
4140
4141        let mut next_entity_id = 100000;
4142        let mut host_cx = TestAppContext::new(
4143            cx.foreground_platform(),
4144            cx.platform(),
4145            cx.foreground(),
4146            cx.background(),
4147            cx.font_cache(),
4148            cx.leak_detector(),
4149            next_entity_id,
4150        );
4151        let host = server.create_client(&mut host_cx, "host").await;
4152        let host_project = host_cx.update(|cx| {
4153            Project::local(
4154                host.client.clone(),
4155                host.user_store.clone(),
4156                Arc::new(LanguageRegistry::new()),
4157                fs.clone(),
4158                cx,
4159            )
4160        });
4161        let host_project_id = host_project
4162            .update(&mut host_cx, |p, _| p.next_remote_id())
4163            .await;
4164
4165        let (collab_worktree, _) = host_project
4166            .update(&mut host_cx, |project, cx| {
4167                project.find_or_create_local_worktree("/_collab", false, cx)
4168            })
4169            .await
4170            .unwrap();
4171        collab_worktree
4172            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4173            .await;
4174        host_project
4175            .update(&mut host_cx, |project, cx| project.share(cx))
4176            .await
4177            .unwrap();
4178
4179        clients.push(cx.foreground().spawn(host.simulate_host(
4180            host_project,
4181            language_server_config,
4182            operations.clone(),
4183            max_operations,
4184            rng.clone(),
4185            host_cx,
4186        )));
4187
4188        while operations.get() < max_operations {
4189            cx.background().simulate_random_delay().await;
4190            if clients.len() >= max_peers {
4191                break;
4192            } else if rng.lock().gen_bool(0.05) {
4193                operations.set(operations.get() + 1);
4194
4195                let guest_id = clients.len();
4196                log::info!("Adding guest {}", guest_id);
4197                next_entity_id += 100000;
4198                let mut guest_cx = TestAppContext::new(
4199                    cx.foreground_platform(),
4200                    cx.platform(),
4201                    cx.foreground(),
4202                    cx.background(),
4203                    cx.font_cache(),
4204                    cx.leak_detector(),
4205                    next_entity_id,
4206                );
4207                let guest = server
4208                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4209                    .await;
4210                let guest_project = Project::remote(
4211                    host_project_id,
4212                    guest.client.clone(),
4213                    guest.user_store.clone(),
4214                    guest_lang_registry.clone(),
4215                    FakeFs::new(cx.background()),
4216                    &mut guest_cx.to_async(),
4217                )
4218                .await
4219                .unwrap();
4220                clients.push(cx.foreground().spawn(guest.simulate_guest(
4221                    guest_id,
4222                    guest_project,
4223                    operations.clone(),
4224                    max_operations,
4225                    rng.clone(),
4226                    guest_cx,
4227                )));
4228
4229                log::info!("Guest {} added", guest_id);
4230            }
4231        }
4232
4233        let mut clients = futures::future::join_all(clients).await;
4234        cx.foreground().run_until_parked();
4235
4236        let (host_client, host_cx) = clients.remove(0);
4237        let host_project = host_client.project.as_ref().unwrap();
4238        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4239            project
4240                .worktrees(cx)
4241                .map(|worktree| {
4242                    let snapshot = worktree.read(cx).snapshot();
4243                    (snapshot.id(), snapshot)
4244                })
4245                .collect::<BTreeMap<_, _>>()
4246        });
4247
4248        for (guest_client, guest_cx) in clients.iter() {
4249            let guest_id = guest_client.client.id();
4250            let worktree_snapshots =
4251                guest_client
4252                    .project
4253                    .as_ref()
4254                    .unwrap()
4255                    .read_with(guest_cx, |project, cx| {
4256                        project
4257                            .worktrees(cx)
4258                            .map(|worktree| {
4259                                let worktree = worktree.read(cx);
4260                                (worktree.id(), worktree.snapshot())
4261                            })
4262                            .collect::<BTreeMap<_, _>>()
4263                    });
4264
4265            assert_eq!(
4266                worktree_snapshots.keys().collect::<Vec<_>>(),
4267                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4268                "guest {} has different worktrees than the host",
4269                guest_id
4270            );
4271            for (id, host_snapshot) in &host_worktree_snapshots {
4272                let guest_snapshot = &worktree_snapshots[id];
4273                assert_eq!(
4274                    guest_snapshot.root_name(),
4275                    host_snapshot.root_name(),
4276                    "guest {} has different root name than the host for worktree {}",
4277                    guest_id,
4278                    id
4279                );
4280                assert_eq!(
4281                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4282                    host_snapshot.entries(false).collect::<Vec<_>>(),
4283                    "guest {} has different snapshot than the host for worktree {}",
4284                    guest_id,
4285                    id
4286                );
4287            }
4288
4289            guest_client
4290                .project
4291                .as_ref()
4292                .unwrap()
4293                .read_with(guest_cx, |project, cx| {
4294                    assert!(
4295                        !project.has_deferred_operations(cx),
4296                        "guest {} has deferred operations",
4297                        guest_id,
4298                    );
4299                });
4300
4301            for guest_buffer in &guest_client.buffers {
4302                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4303                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4304                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4305                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4306                        guest_id, guest_client.peer_id, buffer_id
4307                    ))
4308                });
4309                assert_eq!(
4310                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4311                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4312                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4313                    guest_id,
4314                    buffer_id,
4315                    host_buffer
4316                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4317                );
4318            }
4319        }
4320    }
4321
4322    struct TestServer {
4323        peer: Arc<Peer>,
4324        app_state: Arc<AppState>,
4325        server: Arc<Server>,
4326        foreground: Rc<executor::Foreground>,
4327        notifications: mpsc::UnboundedReceiver<()>,
4328        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4329        forbid_connections: Arc<AtomicBool>,
4330        _test_db: TestDb,
4331    }
4332
4333    impl TestServer {
4334        async fn start(
4335            foreground: Rc<executor::Foreground>,
4336            background: Arc<executor::Background>,
4337        ) -> Self {
4338            let test_db = TestDb::fake(background);
4339            let app_state = Self::build_app_state(&test_db).await;
4340            let peer = Peer::new();
4341            let notifications = mpsc::unbounded();
4342            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4343            Self {
4344                peer,
4345                app_state,
4346                server,
4347                foreground,
4348                notifications: notifications.1,
4349                connection_killers: Default::default(),
4350                forbid_connections: Default::default(),
4351                _test_db: test_db,
4352            }
4353        }
4354
4355        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4356            let http = FakeHttpClient::with_404_response();
4357            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4358            let client_name = name.to_string();
4359            let mut client = Client::new(http.clone());
4360            let server = self.server.clone();
4361            let connection_killers = self.connection_killers.clone();
4362            let forbid_connections = self.forbid_connections.clone();
4363            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4364
4365            Arc::get_mut(&mut client)
4366                .unwrap()
4367                .override_authenticate(move |cx| {
4368                    cx.spawn(|_| async move {
4369                        let access_token = "the-token".to_string();
4370                        Ok(Credentials {
4371                            user_id: user_id.0 as u64,
4372                            access_token,
4373                        })
4374                    })
4375                })
4376                .override_establish_connection(move |credentials, cx| {
4377                    assert_eq!(credentials.user_id, user_id.0 as u64);
4378                    assert_eq!(credentials.access_token, "the-token");
4379
4380                    let server = server.clone();
4381                    let connection_killers = connection_killers.clone();
4382                    let forbid_connections = forbid_connections.clone();
4383                    let client_name = client_name.clone();
4384                    let connection_id_tx = connection_id_tx.clone();
4385                    cx.spawn(move |cx| async move {
4386                        if forbid_connections.load(SeqCst) {
4387                            Err(EstablishConnectionError::other(anyhow!(
4388                                "server is forbidding connections"
4389                            )))
4390                        } else {
4391                            let (client_conn, server_conn, kill_conn) =
4392                                Connection::in_memory(cx.background());
4393                            connection_killers.lock().insert(user_id, kill_conn);
4394                            cx.background()
4395                                .spawn(server.handle_connection(
4396                                    server_conn,
4397                                    client_name,
4398                                    user_id,
4399                                    Some(connection_id_tx),
4400                                    cx.background(),
4401                                ))
4402                                .detach();
4403                            Ok(client_conn)
4404                        }
4405                    })
4406                });
4407
4408            client
4409                .authenticate_and_connect(&cx.to_async())
4410                .await
4411                .unwrap();
4412
4413            Channel::init(&client);
4414            Project::init(&client);
4415
4416            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4417            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4418            let mut authed_user =
4419                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4420            while authed_user.next().await.unwrap().is_none() {}
4421
4422            TestClient {
4423                client,
4424                peer_id,
4425                user_store,
4426                project: Default::default(),
4427                buffers: Default::default(),
4428            }
4429        }
4430
4431        fn disconnect_client(&self, user_id: UserId) {
4432            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4433                let _ = kill_conn.try_send(Some(()));
4434            }
4435        }
4436
4437        fn forbid_connections(&self) {
4438            self.forbid_connections.store(true, SeqCst);
4439        }
4440
4441        fn allow_connections(&self) {
4442            self.forbid_connections.store(false, SeqCst);
4443        }
4444
4445        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4446            let mut config = Config::default();
4447            config.session_secret = "a".repeat(32);
4448            config.database_url = test_db.url.clone();
4449            let github_client = github::AppClient::test();
4450            Arc::new(AppState {
4451                db: test_db.db().clone(),
4452                handlebars: Default::default(),
4453                auth_client: auth::build_client("", ""),
4454                repo_client: github::RepoClient::test(&github_client),
4455                github_client,
4456                config,
4457            })
4458        }
4459
4460        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4461            self.server.store.read()
4462        }
4463
4464        async fn condition<F>(&mut self, mut predicate: F)
4465        where
4466            F: FnMut(&Store) -> bool,
4467        {
4468            async_std::future::timeout(Duration::from_millis(500), async {
4469                while !(predicate)(&*self.server.store.read()) {
4470                    self.foreground.start_waiting();
4471                    self.notifications.next().await;
4472                    self.foreground.finish_waiting();
4473                }
4474            })
4475            .await
4476            .expect("condition timed out");
4477        }
4478    }
4479
4480    impl Drop for TestServer {
4481        fn drop(&mut self) {
4482            self.peer.reset();
4483        }
4484    }
4485
4486    struct TestClient {
4487        client: Arc<Client>,
4488        pub peer_id: PeerId,
4489        pub user_store: ModelHandle<UserStore>,
4490        project: Option<ModelHandle<Project>>,
4491        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4492    }
4493
4494    impl Deref for TestClient {
4495        type Target = Arc<Client>;
4496
4497        fn deref(&self) -> &Self::Target {
4498            &self.client
4499        }
4500    }
4501
4502    impl TestClient {
4503        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4504            UserId::from_proto(
4505                self.user_store
4506                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4507            )
4508        }
4509
4510        fn simulate_host(
4511            mut self,
4512            project: ModelHandle<Project>,
4513            mut language_server_config: LanguageServerConfig,
4514            operations: Rc<Cell<usize>>,
4515            max_operations: usize,
4516            rng: Arc<Mutex<StdRng>>,
4517            mut cx: TestAppContext,
4518        ) -> impl Future<Output = (Self, TestAppContext)> {
4519            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4520
4521            // Set up a fake language server.
4522            language_server_config.set_fake_initializer({
4523                let rng = rng.clone();
4524                let files = files.clone();
4525                let project = project.downgrade();
4526                move |fake_server| {
4527                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4528                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4529                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4530                                range: lsp::Range::new(
4531                                    lsp::Position::new(0, 0),
4532                                    lsp::Position::new(0, 0),
4533                                ),
4534                                new_text: "the-new-text".to_string(),
4535                            })),
4536                            ..Default::default()
4537                        }]))
4538                    });
4539
4540                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4541                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4542                            lsp::CodeAction {
4543                                title: "the-code-action".to_string(),
4544                                ..Default::default()
4545                            },
4546                        )])
4547                    });
4548
4549                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4550                        |params, _| {
4551                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4552                                params.position,
4553                                params.position,
4554                            )))
4555                        },
4556                    );
4557
4558                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4559                        let files = files.clone();
4560                        let rng = rng.clone();
4561                        move |_, _| {
4562                            let files = files.lock();
4563                            let mut rng = rng.lock();
4564                            let count = rng.gen_range::<usize, _>(1..3);
4565                            let files = (0..count)
4566                                .map(|_| files.choose(&mut *rng).unwrap())
4567                                .collect::<Vec<_>>();
4568                            log::info!("LSP: Returning definitions in files {:?}", &files);
4569                            Some(lsp::GotoDefinitionResponse::Array(
4570                                files
4571                                    .into_iter()
4572                                    .map(|file| lsp::Location {
4573                                        uri: lsp::Url::from_file_path(file).unwrap(),
4574                                        range: Default::default(),
4575                                    })
4576                                    .collect(),
4577                            ))
4578                        }
4579                    });
4580
4581                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4582                        let rng = rng.clone();
4583                        let project = project.clone();
4584                        move |params, mut cx| {
4585                            if let Some(project) = project.upgrade(&cx) {
4586                                project.update(&mut cx, |project, cx| {
4587                                    let path = params
4588                                        .text_document_position_params
4589                                        .text_document
4590                                        .uri
4591                                        .to_file_path()
4592                                        .unwrap();
4593                                    let (worktree, relative_path) =
4594                                        project.find_local_worktree(&path, cx)?;
4595                                    let project_path =
4596                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4597                                    let buffer =
4598                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4599
4600                                    let mut highlights = Vec::new();
4601                                    let highlight_count = rng.lock().gen_range(1..=5);
4602                                    let mut prev_end = 0;
4603                                    for _ in 0..highlight_count {
4604                                        let range =
4605                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4606                                        let start = buffer
4607                                            .offset_to_point_utf16(range.start)
4608                                            .to_lsp_position();
4609                                        let end = buffer
4610                                            .offset_to_point_utf16(range.end)
4611                                            .to_lsp_position();
4612                                        highlights.push(lsp::DocumentHighlight {
4613                                            range: lsp::Range::new(start, end),
4614                                            kind: Some(lsp::DocumentHighlightKind::READ),
4615                                        });
4616                                        prev_end = range.end;
4617                                    }
4618                                    Some(highlights)
4619                                })
4620                            } else {
4621                                None
4622                            }
4623                        }
4624                    });
4625                }
4626            });
4627
4628            project.update(&mut cx, |project, _| {
4629                project.languages().add(Arc::new(Language::new(
4630                    LanguageConfig {
4631                        name: "Rust".into(),
4632                        path_suffixes: vec!["rs".to_string()],
4633                        language_server: Some(language_server_config),
4634                        ..Default::default()
4635                    },
4636                    None,
4637                )));
4638            });
4639
4640            async move {
4641                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4642                while operations.get() < max_operations {
4643                    operations.set(operations.get() + 1);
4644
4645                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4646                    match distribution {
4647                        0..=20 if !files.lock().is_empty() => {
4648                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4649                            let mut path = path.as_path();
4650                            while let Some(parent_path) = path.parent() {
4651                                path = parent_path;
4652                                if rng.lock().gen() {
4653                                    break;
4654                                }
4655                            }
4656
4657                            log::info!("Host: find/create local worktree {:?}", path);
4658                            project
4659                                .update(&mut cx, |project, cx| {
4660                                    project.find_or_create_local_worktree(path, false, cx)
4661                                })
4662                                .await
4663                                .unwrap();
4664                        }
4665                        10..=80 if !files.lock().is_empty() => {
4666                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4667                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4668                                let (worktree, path) = project
4669                                    .update(&mut cx, |project, cx| {
4670                                        project.find_or_create_local_worktree(
4671                                            file.clone(),
4672                                            false,
4673                                            cx,
4674                                        )
4675                                    })
4676                                    .await
4677                                    .unwrap();
4678                                let project_path =
4679                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4680                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
4681                                let buffer = project
4682                                    .update(&mut cx, |project, cx| {
4683                                        project.open_buffer(project_path, cx)
4684                                    })
4685                                    .await
4686                                    .unwrap();
4687                                self.buffers.insert(buffer.clone());
4688                                buffer
4689                            } else {
4690                                self.buffers
4691                                    .iter()
4692                                    .choose(&mut *rng.lock())
4693                                    .unwrap()
4694                                    .clone()
4695                            };
4696
4697                            if rng.lock().gen_bool(0.1) {
4698                                cx.update(|cx| {
4699                                    log::info!(
4700                                        "Host: dropping buffer {:?}",
4701                                        buffer.read(cx).file().unwrap().full_path(cx)
4702                                    );
4703                                    self.buffers.remove(&buffer);
4704                                    drop(buffer);
4705                                });
4706                            } else {
4707                                buffer.update(&mut cx, |buffer, cx| {
4708                                    log::info!(
4709                                        "Host: updating buffer {:?} ({})",
4710                                        buffer.file().unwrap().full_path(cx),
4711                                        buffer.remote_id()
4712                                    );
4713                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4714                                });
4715                            }
4716                        }
4717                        _ => loop {
4718                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4719                            let mut path = PathBuf::new();
4720                            path.push("/");
4721                            for _ in 0..path_component_count {
4722                                let letter = rng.lock().gen_range(b'a'..=b'z');
4723                                path.push(std::str::from_utf8(&[letter]).unwrap());
4724                            }
4725                            path.set_extension("rs");
4726                            let parent_path = path.parent().unwrap();
4727
4728                            log::info!("Host: creating file {:?}", path,);
4729
4730                            if fs.create_dir(&parent_path).await.is_ok()
4731                                && fs.create_file(&path, Default::default()).await.is_ok()
4732                            {
4733                                files.lock().push(path);
4734                                break;
4735                            } else {
4736                                log::info!("Host: cannot create file");
4737                            }
4738                        },
4739                    }
4740
4741                    cx.background().simulate_random_delay().await;
4742                }
4743
4744                log::info!("Host done");
4745
4746                self.project = Some(project);
4747                (self, cx)
4748            }
4749        }
4750
4751        pub async fn simulate_guest(
4752            mut self,
4753            guest_id: usize,
4754            project: ModelHandle<Project>,
4755            operations: Rc<Cell<usize>>,
4756            max_operations: usize,
4757            rng: Arc<Mutex<StdRng>>,
4758            mut cx: TestAppContext,
4759        ) -> (Self, TestAppContext) {
4760            while operations.get() < max_operations {
4761                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4762                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4763                        project
4764                            .worktrees(&cx)
4765                            .filter(|worktree| {
4766                                let worktree = worktree.read(cx);
4767                                !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
4768                            })
4769                            .choose(&mut *rng.lock())
4770                    }) {
4771                        worktree
4772                    } else {
4773                        cx.background().simulate_random_delay().await;
4774                        continue;
4775                    };
4776
4777                    operations.set(operations.get() + 1);
4778                    let (worktree_root_name, project_path) =
4779                        worktree.read_with(&cx, |worktree, _| {
4780                            let entry = worktree
4781                                .entries(false)
4782                                .filter(|e| e.is_file())
4783                                .choose(&mut *rng.lock())
4784                                .unwrap();
4785                            (
4786                                worktree.root_name().to_string(),
4787                                (worktree.id(), entry.path.clone()),
4788                            )
4789                        });
4790                    log::info!(
4791                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
4792                        guest_id,
4793                        project_path.0,
4794                        worktree_root_name,
4795                        project_path.1
4796                    );
4797                    let buffer = project
4798                        .update(&mut cx, |project, cx| {
4799                            project.open_buffer(project_path.clone(), cx)
4800                        })
4801                        .await
4802                        .unwrap();
4803                    log::info!(
4804                        "Guest {}: path in worktree {:?} {:?} {:?} opened with buffer id {:?}",
4805                        guest_id,
4806                        project_path.0,
4807                        worktree_root_name,
4808                        project_path.1,
4809                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4810                    );
4811                    self.buffers.insert(buffer.clone());
4812                    buffer
4813                } else {
4814                    operations.set(operations.get() + 1);
4815
4816                    self.buffers
4817                        .iter()
4818                        .choose(&mut *rng.lock())
4819                        .unwrap()
4820                        .clone()
4821                };
4822
4823                let choice = rng.lock().gen_range(0..100);
4824                match choice {
4825                    0..=9 => {
4826                        cx.update(|cx| {
4827                            log::info!(
4828                                "Guest {}: dropping buffer {:?}",
4829                                guest_id,
4830                                buffer.read(cx).file().unwrap().full_path(cx)
4831                            );
4832                            self.buffers.remove(&buffer);
4833                            drop(buffer);
4834                        });
4835                    }
4836                    10..=19 => {
4837                        let completions = project.update(&mut cx, |project, cx| {
4838                            log::info!(
4839                                "Guest {}: requesting completions for buffer {:?}",
4840                                guest_id,
4841                                buffer.read(cx).file().unwrap().full_path(cx)
4842                            );
4843                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4844                            project.completions(&buffer, offset, cx)
4845                        });
4846                        let completions = cx.background().spawn(async move {
4847                            completions.await.expect("completions request failed");
4848                        });
4849                        if rng.lock().gen_bool(0.3) {
4850                            log::info!("Guest {}: detaching completions request", guest_id);
4851                            completions.detach();
4852                        } else {
4853                            completions.await;
4854                        }
4855                    }
4856                    20..=29 => {
4857                        let code_actions = project.update(&mut cx, |project, cx| {
4858                            log::info!(
4859                                "Guest {}: requesting code actions for buffer {:?}",
4860                                guest_id,
4861                                buffer.read(cx).file().unwrap().full_path(cx)
4862                            );
4863                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4864                            project.code_actions(&buffer, range, cx)
4865                        });
4866                        let code_actions = cx.background().spawn(async move {
4867                            code_actions.await.expect("code actions request failed");
4868                        });
4869                        if rng.lock().gen_bool(0.3) {
4870                            log::info!("Guest {}: detaching code actions request", guest_id);
4871                            code_actions.detach();
4872                        } else {
4873                            code_actions.await;
4874                        }
4875                    }
4876                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4877                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4878                            log::info!(
4879                                "Guest {}: saving buffer {:?}",
4880                                guest_id,
4881                                buffer.file().unwrap().full_path(cx)
4882                            );
4883                            (buffer.version(), buffer.save(cx))
4884                        });
4885                        let save = cx.spawn(|cx| async move {
4886                            let (saved_version, _) = save.await.expect("save request failed");
4887                            buffer.read_with(&cx, |buffer, _| {
4888                                assert!(buffer.version().observed_all(&saved_version));
4889                                assert!(saved_version.observed_all(&requested_version));
4890                            });
4891                        });
4892                        if rng.lock().gen_bool(0.3) {
4893                            log::info!("Guest {}: detaching save request", guest_id);
4894                            save.detach();
4895                        } else {
4896                            save.await;
4897                        }
4898                    }
4899                    40..=44 => {
4900                        let prepare_rename = project.update(&mut cx, |project, cx| {
4901                            log::info!(
4902                                "Guest {}: preparing rename for buffer {:?}",
4903                                guest_id,
4904                                buffer.read(cx).file().unwrap().full_path(cx)
4905                            );
4906                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4907                            project.prepare_rename(buffer, offset, cx)
4908                        });
4909                        let prepare_rename = cx.background().spawn(async move {
4910                            prepare_rename.await.expect("prepare rename request failed");
4911                        });
4912                        if rng.lock().gen_bool(0.3) {
4913                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4914                            prepare_rename.detach();
4915                        } else {
4916                            prepare_rename.await;
4917                        }
4918                    }
4919                    45..=49 => {
4920                        let definitions = project.update(&mut cx, |project, cx| {
4921                            log::info!(
4922                                "Guest {}: requesting definitions for buffer {:?}",
4923                                guest_id,
4924                                buffer.read(cx).file().unwrap().full_path(cx)
4925                            );
4926                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4927                            project.definition(&buffer, offset, cx)
4928                        });
4929                        let definitions = cx.background().spawn(async move {
4930                            definitions.await.expect("definitions request failed")
4931                        });
4932                        if rng.lock().gen_bool(0.3) {
4933                            log::info!("Guest {}: detaching definitions request", guest_id);
4934                            definitions.detach();
4935                        } else {
4936                            self.buffers
4937                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
4938                        }
4939                    }
4940                    50..=54 => {
4941                        let highlights = project.update(&mut cx, |project, cx| {
4942                            log::info!(
4943                                "Guest {}: requesting highlights for buffer {:?}",
4944                                guest_id,
4945                                buffer.read(cx).file().unwrap().full_path(cx)
4946                            );
4947                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4948                            project.document_highlights(&buffer, offset, cx)
4949                        });
4950                        let highlights = cx.background().spawn(async move {
4951                            highlights.await.expect("highlights request failed");
4952                        });
4953                        if rng.lock().gen_bool(0.3) {
4954                            log::info!("Guest {}: detaching highlights request", guest_id);
4955                            highlights.detach();
4956                        } else {
4957                            highlights.await;
4958                        }
4959                    }
4960                    55..=59 => {
4961                        let search = project.update(&mut cx, |project, cx| {
4962                            let query = rng.lock().gen_range('a'..='z');
4963                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
4964                            project.search(SearchQuery::text(query, false, false), cx)
4965                        });
4966                        let search = cx
4967                            .background()
4968                            .spawn(async move { search.await.expect("search request failed") });
4969                        if rng.lock().gen_bool(0.3) {
4970                            log::info!("Guest {}: detaching search request", guest_id);
4971                            search.detach();
4972                        } else {
4973                            self.buffers.extend(search.await.into_keys());
4974                        }
4975                    }
4976                    _ => {
4977                        buffer.update(&mut cx, |buffer, cx| {
4978                            log::info!(
4979                                "Guest {}: updating buffer {:?}",
4980                                guest_id,
4981                                buffer.file().unwrap().full_path(cx)
4982                            );
4983                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4984                        });
4985                    }
4986                }
4987                cx.background().simulate_random_delay().await;
4988            }
4989
4990            log::info!("Guest {} done", guest_id);
4991
4992            self.project = Some(project);
4993            (self, cx)
4994        }
4995    }
4996
4997    impl Drop for TestClient {
4998        fn drop(&mut self) {
4999            self.client.tear_down();
5000        }
5001    }
5002
5003    impl Executor for Arc<gpui::executor::Background> {
5004        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5005            self.spawn(future).detach();
5006        }
5007    }
5008
5009    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5010        channel
5011            .messages()
5012            .cursor::<()>()
5013            .map(|m| {
5014                (
5015                    m.sender.github_login.clone(),
5016                    m.body.clone(),
5017                    m.is_pending(),
5018                )
5019            })
5020            .collect()
5021    }
5022
5023    struct EmptyView;
5024
5025    impl gpui::Entity for EmptyView {
5026        type Event = ();
5027    }
5028
5029    impl gpui::View for EmptyView {
5030        fn ui_name() -> &'static str {
5031            "empty view"
5032        }
5033
5034        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5035            gpui::Element::boxed(gpui::elements::Empty)
5036        }
5037    }
5038}