rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  81            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  82            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  83            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  84            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  85            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  86            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  87            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  88            .add_request_handler(
  89                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  90            )
  91            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
  92            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
  93            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
  94            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
  95            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
  96            .add_message_handler(Server::close_buffer)
  97            .add_request_handler(Server::update_buffer)
  98            .add_message_handler(Server::update_buffer_file)
  99            .add_message_handler(Server::buffer_reloaded)
 100            .add_message_handler(Server::buffer_saved)
 101            .add_request_handler(Server::save_buffer)
 102            .add_request_handler(Server::get_channels)
 103            .add_request_handler(Server::get_users)
 104            .add_request_handler(Server::join_channel)
 105            .add_message_handler(Server::leave_channel)
 106            .add_request_handler(Server::send_channel_message)
 107            .add_request_handler(Server::get_channel_messages);
 108
 109        Arc::new(server)
 110    }
 111
 112    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 113    where
 114        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 115        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 116        M: EnvelopedMessage,
 117    {
 118        let prev_handler = self.handlers.insert(
 119            TypeId::of::<M>(),
 120            Box::new(move |server, envelope| {
 121                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 122                (handler)(server, *envelope).boxed()
 123            }),
 124        );
 125        if prev_handler.is_some() {
 126            panic!("registered a handler for the same message twice");
 127        }
 128        self
 129    }
 130
 131    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 132    where
 133        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 134        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 135        M: RequestMessage,
 136    {
 137        self.add_message_handler(move |server, envelope| {
 138            let receipt = envelope.receipt();
 139            let response = (handler)(server.clone(), envelope);
 140            async move {
 141                match response.await {
 142                    Ok(response) => {
 143                        server.peer.respond(receipt, response)?;
 144                        Ok(())
 145                    }
 146                    Err(error) => {
 147                        server.peer.respond_with_error(
 148                            receipt,
 149                            proto::Error {
 150                                message: error.to_string(),
 151                            },
 152                        )?;
 153                        Err(error)
 154                    }
 155                }
 156            }
 157        })
 158    }
 159
 160    pub fn handle_connection<E: Executor>(
 161        self: &Arc<Self>,
 162        connection: Connection,
 163        addr: String,
 164        user_id: UserId,
 165        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 166        executor: E,
 167    ) -> impl Future<Output = ()> {
 168        let mut this = self.clone();
 169        async move {
 170            let (connection_id, handle_io, mut incoming_rx) =
 171                this.peer.add_connection(connection).await;
 172
 173            if let Some(send_connection_id) = send_connection_id.as_mut() {
 174                let _ = send_connection_id.send(connection_id).await;
 175            }
 176
 177            this.state_mut().add_connection(connection_id, user_id);
 178            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 179                log::error!("error updating contacts for {:?}: {}", user_id, err);
 180            }
 181
 182            let handle_io = handle_io.fuse();
 183            futures::pin_mut!(handle_io);
 184            loop {
 185                let next_message = incoming_rx.next().fuse();
 186                futures::pin_mut!(next_message);
 187                futures::select_biased! {
 188                    result = handle_io => {
 189                        if let Err(err) = result {
 190                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 191                        }
 192                        break;
 193                    }
 194                    message = next_message => {
 195                        if let Some(message) = message {
 196                            let start_time = Instant::now();
 197                            let type_name = message.payload_type_name();
 198                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 199                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 200                                let notifications = this.notifications.clone();
 201                                let is_background = message.is_background();
 202                                let handle_message = (handler)(this.clone(), message);
 203                                let handle_message = async move {
 204                                    if let Err(err) = handle_message.await {
 205                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 206                                    } else {
 207                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 208                                    }
 209                                    if let Some(mut notifications) = notifications {
 210                                        let _ = notifications.send(()).await;
 211                                    }
 212                                };
 213                                if is_background {
 214                                    executor.spawn_detached(handle_message);
 215                                } else {
 216                                    handle_message.await;
 217                                }
 218                            } else {
 219                                log::warn!("unhandled message: {}", type_name);
 220                            }
 221                        } else {
 222                            log::info!("rpc connection closed {:?}", addr);
 223                            break;
 224                        }
 225                    }
 226                }
 227            }
 228
 229            if let Err(err) = this.sign_out(connection_id).await {
 230                log::error!("error signing out connection {:?} - {:?}", addr, err);
 231            }
 232        }
 233    }
 234
 235    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 236        self.peer.disconnect(connection_id);
 237        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 238
 239        for (project_id, project) in removed_connection.hosted_projects {
 240            if let Some(share) = project.share {
 241                broadcast(
 242                    connection_id,
 243                    share.guests.keys().copied().collect(),
 244                    |conn_id| {
 245                        self.peer
 246                            .send(conn_id, proto::UnshareProject { project_id })
 247                    },
 248                )?;
 249            }
 250        }
 251
 252        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 253            broadcast(connection_id, peer_ids, |conn_id| {
 254                self.peer.send(
 255                    conn_id,
 256                    proto::RemoveProjectCollaborator {
 257                        project_id,
 258                        peer_id: connection_id.0,
 259                    },
 260                )
 261            })?;
 262        }
 263
 264        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 265        Ok(())
 266    }
 267
 268    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 269        Ok(proto::Ack {})
 270    }
 271
 272    async fn register_project(
 273        mut self: Arc<Server>,
 274        request: TypedEnvelope<proto::RegisterProject>,
 275    ) -> tide::Result<proto::RegisterProjectResponse> {
 276        let project_id = {
 277            let mut state = self.state_mut();
 278            let user_id = state.user_id_for_connection(request.sender_id)?;
 279            state.register_project(request.sender_id, user_id)
 280        };
 281        Ok(proto::RegisterProjectResponse { project_id })
 282    }
 283
 284    async fn unregister_project(
 285        mut self: Arc<Server>,
 286        request: TypedEnvelope<proto::UnregisterProject>,
 287    ) -> tide::Result<()> {
 288        let project = self
 289            .state_mut()
 290            .unregister_project(request.payload.project_id, request.sender_id)?;
 291        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 292        Ok(())
 293    }
 294
 295    async fn share_project(
 296        mut self: Arc<Server>,
 297        request: TypedEnvelope<proto::ShareProject>,
 298    ) -> tide::Result<proto::Ack> {
 299        self.state_mut()
 300            .share_project(request.payload.project_id, request.sender_id);
 301        Ok(proto::Ack {})
 302    }
 303
 304    async fn unshare_project(
 305        mut self: Arc<Server>,
 306        request: TypedEnvelope<proto::UnshareProject>,
 307    ) -> tide::Result<()> {
 308        let project_id = request.payload.project_id;
 309        let project = self
 310            .state_mut()
 311            .unshare_project(project_id, request.sender_id)?;
 312
 313        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 314            self.peer
 315                .send(conn_id, proto::UnshareProject { project_id })
 316        })?;
 317        self.update_contacts_for_users(&project.authorized_user_ids)?;
 318        Ok(())
 319    }
 320
 321    async fn join_project(
 322        mut self: Arc<Server>,
 323        request: TypedEnvelope<proto::JoinProject>,
 324    ) -> tide::Result<proto::JoinProjectResponse> {
 325        let project_id = request.payload.project_id;
 326
 327        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 328        let (response, connection_ids, contact_user_ids) = self
 329            .state_mut()
 330            .join_project(request.sender_id, user_id, project_id)
 331            .and_then(|joined| {
 332                let share = joined.project.share()?;
 333                let peer_count = share.guests.len();
 334                let mut collaborators = Vec::with_capacity(peer_count);
 335                collaborators.push(proto::Collaborator {
 336                    peer_id: joined.project.host_connection_id.0,
 337                    replica_id: 0,
 338                    user_id: joined.project.host_user_id.to_proto(),
 339                });
 340                let worktrees = share
 341                    .worktrees
 342                    .iter()
 343                    .filter_map(|(id, shared_worktree)| {
 344                        let worktree = joined.project.worktrees.get(&id)?;
 345                        Some(proto::Worktree {
 346                            id: *id,
 347                            root_name: worktree.root_name.clone(),
 348                            entries: shared_worktree.entries.values().cloned().collect(),
 349                            diagnostic_summaries: shared_worktree
 350                                .diagnostic_summaries
 351                                .values()
 352                                .cloned()
 353                                .collect(),
 354                            weak: worktree.weak,
 355                        })
 356                    })
 357                    .collect();
 358                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 359                    if *peer_conn_id != request.sender_id {
 360                        collaborators.push(proto::Collaborator {
 361                            peer_id: peer_conn_id.0,
 362                            replica_id: *peer_replica_id as u32,
 363                            user_id: peer_user_id.to_proto(),
 364                        });
 365                    }
 366                }
 367                let response = proto::JoinProjectResponse {
 368                    worktrees,
 369                    replica_id: joined.replica_id as u32,
 370                    collaborators,
 371                };
 372                let connection_ids = joined.project.connection_ids();
 373                let contact_user_ids = joined.project.authorized_user_ids();
 374                Ok((response, connection_ids, contact_user_ids))
 375            })?;
 376
 377        broadcast(request.sender_id, connection_ids, |conn_id| {
 378            self.peer.send(
 379                conn_id,
 380                proto::AddProjectCollaborator {
 381                    project_id,
 382                    collaborator: Some(proto::Collaborator {
 383                        peer_id: request.sender_id.0,
 384                        replica_id: response.replica_id,
 385                        user_id: user_id.to_proto(),
 386                    }),
 387                },
 388            )
 389        })?;
 390        self.update_contacts_for_users(&contact_user_ids)?;
 391        Ok(response)
 392    }
 393
 394    async fn leave_project(
 395        mut self: Arc<Server>,
 396        request: TypedEnvelope<proto::LeaveProject>,
 397    ) -> tide::Result<()> {
 398        let sender_id = request.sender_id;
 399        let project_id = request.payload.project_id;
 400        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 401
 402        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 403            self.peer.send(
 404                conn_id,
 405                proto::RemoveProjectCollaborator {
 406                    project_id,
 407                    peer_id: sender_id.0,
 408                },
 409            )
 410        })?;
 411        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 412
 413        Ok(())
 414    }
 415
 416    async fn register_worktree(
 417        mut self: Arc<Server>,
 418        request: TypedEnvelope<proto::RegisterWorktree>,
 419    ) -> tide::Result<proto::Ack> {
 420        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 421
 422        let mut contact_user_ids = HashSet::default();
 423        contact_user_ids.insert(host_user_id);
 424        for github_login in &request.payload.authorized_logins {
 425            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 426            contact_user_ids.insert(contact_user_id);
 427        }
 428
 429        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 430        let guest_connection_ids;
 431        {
 432            let mut state = self.state_mut();
 433            guest_connection_ids = state
 434                .read_project(request.payload.project_id, request.sender_id)?
 435                .guest_connection_ids();
 436            state.register_worktree(
 437                request.payload.project_id,
 438                request.payload.worktree_id,
 439                request.sender_id,
 440                Worktree {
 441                    authorized_user_ids: contact_user_ids.clone(),
 442                    root_name: request.payload.root_name.clone(),
 443                    weak: request.payload.weak,
 444                },
 445            )?;
 446        }
 447        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 448            self.peer
 449                .forward_send(request.sender_id, connection_id, request.payload.clone())
 450        })?;
 451        self.update_contacts_for_users(&contact_user_ids)?;
 452        Ok(proto::Ack {})
 453    }
 454
 455    async fn unregister_worktree(
 456        mut self: Arc<Server>,
 457        request: TypedEnvelope<proto::UnregisterWorktree>,
 458    ) -> tide::Result<()> {
 459        let project_id = request.payload.project_id;
 460        let worktree_id = request.payload.worktree_id;
 461        let (worktree, guest_connection_ids) =
 462            self.state_mut()
 463                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 464        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 465            self.peer.send(
 466                conn_id,
 467                proto::UnregisterWorktree {
 468                    project_id,
 469                    worktree_id,
 470                },
 471            )
 472        })?;
 473        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 474        Ok(())
 475    }
 476
 477    async fn update_worktree(
 478        mut self: Arc<Server>,
 479        request: TypedEnvelope<proto::UpdateWorktree>,
 480    ) -> tide::Result<proto::Ack> {
 481        let connection_ids = self.state_mut().update_worktree(
 482            request.sender_id,
 483            request.payload.project_id,
 484            request.payload.worktree_id,
 485            &request.payload.removed_entries,
 486            &request.payload.updated_entries,
 487        )?;
 488
 489        broadcast(request.sender_id, connection_ids, |connection_id| {
 490            self.peer
 491                .forward_send(request.sender_id, connection_id, request.payload.clone())
 492        })?;
 493
 494        Ok(proto::Ack {})
 495    }
 496
 497    async fn update_diagnostic_summary(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 500    ) -> tide::Result<()> {
 501        let summary = request
 502            .payload
 503            .summary
 504            .clone()
 505            .ok_or_else(|| anyhow!("invalid summary"))?;
 506        let receiver_ids = self.state_mut().update_diagnostic_summary(
 507            request.payload.project_id,
 508            request.payload.worktree_id,
 509            request.sender_id,
 510            summary,
 511        )?;
 512
 513        broadcast(request.sender_id, receiver_ids, |connection_id| {
 514            self.peer
 515                .forward_send(request.sender_id, connection_id, request.payload.clone())
 516        })?;
 517        Ok(())
 518    }
 519
 520    async fn disk_based_diagnostics_updating(
 521        self: Arc<Server>,
 522        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 523    ) -> tide::Result<()> {
 524        let receiver_ids = self
 525            .state()
 526            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 527        broadcast(request.sender_id, receiver_ids, |connection_id| {
 528            self.peer
 529                .forward_send(request.sender_id, connection_id, request.payload.clone())
 530        })?;
 531        Ok(())
 532    }
 533
 534    async fn disk_based_diagnostics_updated(
 535        self: Arc<Server>,
 536        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 537    ) -> tide::Result<()> {
 538        let receiver_ids = self
 539            .state()
 540            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 541        broadcast(request.sender_id, receiver_ids, |connection_id| {
 542            self.peer
 543                .forward_send(request.sender_id, connection_id, request.payload.clone())
 544        })?;
 545        Ok(())
 546    }
 547
 548    async fn forward_project_request<T>(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<T>,
 551    ) -> tide::Result<T::Response>
 552    where
 553        T: EntityMessage + RequestMessage,
 554    {
 555        let host_connection_id = self
 556            .state()
 557            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 558            .host_connection_id;
 559        Ok(self
 560            .peer
 561            .forward_request(request.sender_id, host_connection_id, request.payload)
 562            .await?)
 563    }
 564
 565    async fn close_buffer(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::CloseBuffer>,
 568    ) -> tide::Result<()> {
 569        let host_connection_id = self
 570            .state()
 571            .read_project(request.payload.project_id, request.sender_id)?
 572            .host_connection_id;
 573        self.peer
 574            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 575        Ok(())
 576    }
 577
 578    async fn save_buffer(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::SaveBuffer>,
 581    ) -> tide::Result<proto::BufferSaved> {
 582        let host;
 583        let mut guests;
 584        {
 585            let state = self.state();
 586            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 587            host = project.host_connection_id;
 588            guests = project.guest_connection_ids()
 589        }
 590
 591        let response = self
 592            .peer
 593            .forward_request(request.sender_id, host, request.payload.clone())
 594            .await?;
 595
 596        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 597        broadcast(host, guests, |conn_id| {
 598            self.peer.forward_send(host, conn_id, response.clone())
 599        })?;
 600
 601        Ok(response)
 602    }
 603
 604    async fn update_buffer(
 605        self: Arc<Server>,
 606        request: TypedEnvelope<proto::UpdateBuffer>,
 607    ) -> tide::Result<proto::Ack> {
 608        let receiver_ids = self
 609            .state()
 610            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 611        broadcast(request.sender_id, receiver_ids, |connection_id| {
 612            self.peer
 613                .forward_send(request.sender_id, connection_id, request.payload.clone())
 614        })?;
 615        Ok(proto::Ack {})
 616    }
 617
 618    async fn update_buffer_file(
 619        self: Arc<Server>,
 620        request: TypedEnvelope<proto::UpdateBufferFile>,
 621    ) -> tide::Result<()> {
 622        let receiver_ids = self
 623            .state()
 624            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 625        broadcast(request.sender_id, receiver_ids, |connection_id| {
 626            self.peer
 627                .forward_send(request.sender_id, connection_id, request.payload.clone())
 628        })?;
 629        Ok(())
 630    }
 631
 632    async fn buffer_reloaded(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::BufferReloaded>,
 635    ) -> tide::Result<()> {
 636        let receiver_ids = self
 637            .state()
 638            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 639        broadcast(request.sender_id, receiver_ids, |connection_id| {
 640            self.peer
 641                .forward_send(request.sender_id, connection_id, request.payload.clone())
 642        })?;
 643        Ok(())
 644    }
 645
 646    async fn buffer_saved(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::BufferSaved>,
 649    ) -> tide::Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 653        broadcast(request.sender_id, receiver_ids, |connection_id| {
 654            self.peer
 655                .forward_send(request.sender_id, connection_id, request.payload.clone())
 656        })?;
 657        Ok(())
 658    }
 659
 660    async fn get_channels(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::GetChannels>,
 663    ) -> tide::Result<proto::GetChannelsResponse> {
 664        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 665        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 666        Ok(proto::GetChannelsResponse {
 667            channels: channels
 668                .into_iter()
 669                .map(|chan| proto::Channel {
 670                    id: chan.id.to_proto(),
 671                    name: chan.name,
 672                })
 673                .collect(),
 674        })
 675    }
 676
 677    async fn get_users(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::GetUsers>,
 680    ) -> tide::Result<proto::GetUsersResponse> {
 681        let user_ids = request
 682            .payload
 683            .user_ids
 684            .into_iter()
 685            .map(UserId::from_proto)
 686            .collect();
 687        let users = self
 688            .app_state
 689            .db
 690            .get_users_by_ids(user_ids)
 691            .await?
 692            .into_iter()
 693            .map(|user| proto::User {
 694                id: user.id.to_proto(),
 695                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 696                github_login: user.github_login,
 697            })
 698            .collect();
 699        Ok(proto::GetUsersResponse { users })
 700    }
 701
 702    fn update_contacts_for_users<'a>(
 703        self: &Arc<Server>,
 704        user_ids: impl IntoIterator<Item = &'a UserId>,
 705    ) -> anyhow::Result<()> {
 706        let mut result = Ok(());
 707        let state = self.state();
 708        for user_id in user_ids {
 709            let contacts = state.contacts_for_user(*user_id);
 710            for connection_id in state.connection_ids_for_user(*user_id) {
 711                if let Err(error) = self.peer.send(
 712                    connection_id,
 713                    proto::UpdateContacts {
 714                        contacts: contacts.clone(),
 715                    },
 716                ) {
 717                    result = Err(error);
 718                }
 719            }
 720        }
 721        result
 722    }
 723
 724    async fn join_channel(
 725        mut self: Arc<Self>,
 726        request: TypedEnvelope<proto::JoinChannel>,
 727    ) -> tide::Result<proto::JoinChannelResponse> {
 728        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 729        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 730        if !self
 731            .app_state
 732            .db
 733            .can_user_access_channel(user_id, channel_id)
 734            .await?
 735        {
 736            Err(anyhow!("access denied"))?;
 737        }
 738
 739        self.state_mut().join_channel(request.sender_id, channel_id);
 740        let messages = self
 741            .app_state
 742            .db
 743            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 744            .await?
 745            .into_iter()
 746            .map(|msg| proto::ChannelMessage {
 747                id: msg.id.to_proto(),
 748                body: msg.body,
 749                timestamp: msg.sent_at.unix_timestamp() as u64,
 750                sender_id: msg.sender_id.to_proto(),
 751                nonce: Some(msg.nonce.as_u128().into()),
 752            })
 753            .collect::<Vec<_>>();
 754        Ok(proto::JoinChannelResponse {
 755            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 756            messages,
 757        })
 758    }
 759
 760    async fn leave_channel(
 761        mut self: Arc<Self>,
 762        request: TypedEnvelope<proto::LeaveChannel>,
 763    ) -> tide::Result<()> {
 764        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 765        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 766        if !self
 767            .app_state
 768            .db
 769            .can_user_access_channel(user_id, channel_id)
 770            .await?
 771        {
 772            Err(anyhow!("access denied"))?;
 773        }
 774
 775        self.state_mut()
 776            .leave_channel(request.sender_id, channel_id);
 777
 778        Ok(())
 779    }
 780
 781    async fn send_channel_message(
 782        self: Arc<Self>,
 783        request: TypedEnvelope<proto::SendChannelMessage>,
 784    ) -> tide::Result<proto::SendChannelMessageResponse> {
 785        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 786        let user_id;
 787        let connection_ids;
 788        {
 789            let state = self.state();
 790            user_id = state.user_id_for_connection(request.sender_id)?;
 791            connection_ids = state.channel_connection_ids(channel_id)?;
 792        }
 793
 794        // Validate the message body.
 795        let body = request.payload.body.trim().to_string();
 796        if body.len() > MAX_MESSAGE_LEN {
 797            return Err(anyhow!("message is too long"))?;
 798        }
 799        if body.is_empty() {
 800            return Err(anyhow!("message can't be blank"))?;
 801        }
 802
 803        let timestamp = OffsetDateTime::now_utc();
 804        let nonce = request
 805            .payload
 806            .nonce
 807            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 808
 809        let message_id = self
 810            .app_state
 811            .db
 812            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 813            .await?
 814            .to_proto();
 815        let message = proto::ChannelMessage {
 816            sender_id: user_id.to_proto(),
 817            id: message_id,
 818            body,
 819            timestamp: timestamp.unix_timestamp() as u64,
 820            nonce: Some(nonce),
 821        };
 822        broadcast(request.sender_id, connection_ids, |conn_id| {
 823            self.peer.send(
 824                conn_id,
 825                proto::ChannelMessageSent {
 826                    channel_id: channel_id.to_proto(),
 827                    message: Some(message.clone()),
 828                },
 829            )
 830        })?;
 831        Ok(proto::SendChannelMessageResponse {
 832            message: Some(message),
 833        })
 834    }
 835
 836    async fn get_channel_messages(
 837        self: Arc<Self>,
 838        request: TypedEnvelope<proto::GetChannelMessages>,
 839    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 840        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 841        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 842        if !self
 843            .app_state
 844            .db
 845            .can_user_access_channel(user_id, channel_id)
 846            .await?
 847        {
 848            Err(anyhow!("access denied"))?;
 849        }
 850
 851        let messages = self
 852            .app_state
 853            .db
 854            .get_channel_messages(
 855                channel_id,
 856                MESSAGE_COUNT_PER_PAGE,
 857                Some(MessageId::from_proto(request.payload.before_message_id)),
 858            )
 859            .await?
 860            .into_iter()
 861            .map(|msg| proto::ChannelMessage {
 862                id: msg.id.to_proto(),
 863                body: msg.body,
 864                timestamp: msg.sent_at.unix_timestamp() as u64,
 865                sender_id: msg.sender_id.to_proto(),
 866                nonce: Some(msg.nonce.as_u128().into()),
 867            })
 868            .collect::<Vec<_>>();
 869
 870        Ok(proto::GetChannelMessagesResponse {
 871            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 872            messages,
 873        })
 874    }
 875
 876    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 877        self.store.read()
 878    }
 879
 880    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 881        self.store.write()
 882    }
 883}
 884
 885impl Executor for RealExecutor {
 886    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 887        task::spawn(future);
 888    }
 889}
 890
 891fn broadcast<F>(
 892    sender_id: ConnectionId,
 893    receiver_ids: Vec<ConnectionId>,
 894    mut f: F,
 895) -> anyhow::Result<()>
 896where
 897    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 898{
 899    let mut result = Ok(());
 900    for receiver_id in receiver_ids {
 901        if receiver_id != sender_id {
 902            if let Err(error) = f(receiver_id) {
 903                if result.is_ok() {
 904                    result = Err(error);
 905                }
 906            }
 907        }
 908    }
 909    result
 910}
 911
 912pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 913    let server = Server::new(app.state().clone(), rpc.clone(), None);
 914    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 915        let server = server.clone();
 916        async move {
 917            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 918
 919            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 920            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 921            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 922            let client_protocol_version: Option<u32> = request
 923                .header("X-Zed-Protocol-Version")
 924                .and_then(|v| v.as_str().parse().ok());
 925
 926            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 927                return Ok(Response::new(StatusCode::UpgradeRequired));
 928            }
 929
 930            let header = match request.header("Sec-Websocket-Key") {
 931                Some(h) => h.as_str(),
 932                None => return Err(anyhow!("expected sec-websocket-key"))?,
 933            };
 934
 935            let user_id = process_auth_header(&request).await?;
 936
 937            let mut response = Response::new(StatusCode::SwitchingProtocols);
 938            response.insert_header(UPGRADE, "websocket");
 939            response.insert_header(CONNECTION, "Upgrade");
 940            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 941            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 942            response.insert_header("Sec-Websocket-Version", "13");
 943
 944            let http_res: &mut tide::http::Response = response.as_mut();
 945            let upgrade_receiver = http_res.recv_upgrade().await;
 946            let addr = request.remote().unwrap_or("unknown").to_string();
 947            task::spawn(async move {
 948                if let Some(stream) = upgrade_receiver.await {
 949                    server
 950                        .handle_connection(
 951                            Connection::new(
 952                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 953                            ),
 954                            addr,
 955                            user_id,
 956                            None,
 957                            RealExecutor,
 958                        )
 959                        .await;
 960                }
 961            });
 962
 963            Ok(response)
 964        }
 965    });
 966}
 967
 968fn header_contains_ignore_case<T>(
 969    request: &tide::Request<T>,
 970    header_name: HeaderName,
 971    value: &str,
 972) -> bool {
 973    request
 974        .header(header_name)
 975        .map(|h| {
 976            h.as_str()
 977                .split(',')
 978                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 979        })
 980        .unwrap_or(false)
 981}
 982
 983#[cfg(test)]
 984mod tests {
 985    use super::*;
 986    use crate::{
 987        auth,
 988        db::{tests::TestDb, UserId},
 989        github, AppState, Config,
 990    };
 991    use ::rpc::Peer;
 992    use collections::BTreeMap;
 993    use gpui::{executor, ModelHandle, TestAppContext};
 994    use parking_lot::Mutex;
 995    use postage::{sink::Sink, watch};
 996    use rand::prelude::*;
 997    use rpc::PeerId;
 998    use serde_json::json;
 999    use sqlx::types::time::OffsetDateTime;
1000    use std::{
1001        cell::Cell,
1002        env,
1003        ops::Deref,
1004        path::{Path, PathBuf},
1005        rc::Rc,
1006        sync::{
1007            atomic::{AtomicBool, Ordering::SeqCst},
1008            Arc,
1009        },
1010        time::Duration,
1011    };
1012    use zed::{
1013        client::{
1014            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1015            EstablishConnectionError, UserStore,
1016        },
1017        editor::{
1018            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1019            Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1020        },
1021        fs::{FakeFs, Fs as _},
1022        language::{
1023            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1024            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1025        },
1026        lsp,
1027        project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath},
1028        workspace::{Settings, Workspace, WorkspaceParams},
1029    };
1030
1031    #[cfg(test)]
1032    #[ctor::ctor]
1033    fn init_logger() {
1034        if std::env::var("RUST_LOG").is_ok() {
1035            env_logger::init();
1036        }
1037    }
1038
1039    #[gpui::test(iterations = 10)]
1040    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1041        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1042        let lang_registry = Arc::new(LanguageRegistry::new());
1043        let fs = FakeFs::new(cx_a.background());
1044        cx_a.foreground().forbid_parking();
1045
1046        // Connect to a server as 2 clients.
1047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1048        let client_a = server.create_client(&mut cx_a, "user_a").await;
1049        let client_b = server.create_client(&mut cx_b, "user_b").await;
1050
1051        // Share a project as client A
1052        fs.insert_tree(
1053            "/a",
1054            json!({
1055                ".zed.toml": r#"collaborators = ["user_b"]"#,
1056                "a.txt": "a-contents",
1057                "b.txt": "b-contents",
1058            }),
1059        )
1060        .await;
1061        let project_a = cx_a.update(|cx| {
1062            Project::local(
1063                client_a.clone(),
1064                client_a.user_store.clone(),
1065                lang_registry.clone(),
1066                fs.clone(),
1067                cx,
1068            )
1069        });
1070        let (worktree_a, _) = project_a
1071            .update(&mut cx_a, |p, cx| {
1072                p.find_or_create_local_worktree("/a", false, cx)
1073            })
1074            .await
1075            .unwrap();
1076        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1077        worktree_a
1078            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1079            .await;
1080        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1081        project_a
1082            .update(&mut cx_a, |p, cx| p.share(cx))
1083            .await
1084            .unwrap();
1085
1086        // Join that project as client B
1087        let project_b = Project::remote(
1088            project_id,
1089            client_b.clone(),
1090            client_b.user_store.clone(),
1091            lang_registry.clone(),
1092            fs.clone(),
1093            &mut cx_b.to_async(),
1094        )
1095        .await
1096        .unwrap();
1097
1098        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1099            assert_eq!(
1100                project
1101                    .collaborators()
1102                    .get(&client_a.peer_id)
1103                    .unwrap()
1104                    .user
1105                    .github_login,
1106                "user_a"
1107            );
1108            project.replica_id()
1109        });
1110        project_a
1111            .condition(&cx_a, |tree, _| {
1112                tree.collaborators()
1113                    .get(&client_b.peer_id)
1114                    .map_or(false, |collaborator| {
1115                        collaborator.replica_id == replica_id_b
1116                            && collaborator.user.github_login == "user_b"
1117                    })
1118            })
1119            .await;
1120
1121        // Open the same file as client B and client A.
1122        let buffer_b = project_b
1123            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1124            .await
1125            .unwrap();
1126        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1127        buffer_b.read_with(&cx_b, |buf, cx| {
1128            assert_eq!(buf.read(cx).text(), "b-contents")
1129        });
1130        project_a.read_with(&cx_a, |project, cx| {
1131            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1132        });
1133        let buffer_a = project_a
1134            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1135            .await
1136            .unwrap();
1137
1138        let editor_b = cx_b.add_view(window_b, |cx| {
1139            Editor::for_buffer(
1140                buffer_b,
1141                None,
1142                watch::channel_with(Settings::test(cx)).1,
1143                cx,
1144            )
1145        });
1146
1147        // TODO
1148        // // Create a selection set as client B and see that selection set as client A.
1149        // buffer_a
1150        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1151        //     .await;
1152
1153        // Edit the buffer as client B and see that edit as client A.
1154        editor_b.update(&mut cx_b, |editor, cx| {
1155            editor.handle_input(&Input("ok, ".into()), cx)
1156        });
1157        buffer_a
1158            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1159            .await;
1160
1161        // TODO
1162        // // Remove the selection set as client B, see those selections disappear as client A.
1163        cx_b.update(move |_| drop(editor_b));
1164        // buffer_a
1165        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1166        //     .await;
1167
1168        // Dropping the client B's project removes client B from client A's collaborators.
1169        cx_b.update(move |_| drop(project_b));
1170        project_a
1171            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1172            .await;
1173    }
1174
1175    #[gpui::test(iterations = 10)]
1176    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1177        let lang_registry = Arc::new(LanguageRegistry::new());
1178        let fs = FakeFs::new(cx_a.background());
1179        cx_a.foreground().forbid_parking();
1180
1181        // Connect to a server as 2 clients.
1182        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1183        let client_a = server.create_client(&mut cx_a, "user_a").await;
1184        let client_b = server.create_client(&mut cx_b, "user_b").await;
1185
1186        // Share a project as client A
1187        fs.insert_tree(
1188            "/a",
1189            json!({
1190                ".zed.toml": r#"collaborators = ["user_b"]"#,
1191                "a.txt": "a-contents",
1192                "b.txt": "b-contents",
1193            }),
1194        )
1195        .await;
1196        let project_a = cx_a.update(|cx| {
1197            Project::local(
1198                client_a.clone(),
1199                client_a.user_store.clone(),
1200                lang_registry.clone(),
1201                fs.clone(),
1202                cx,
1203            )
1204        });
1205        let (worktree_a, _) = project_a
1206            .update(&mut cx_a, |p, cx| {
1207                p.find_or_create_local_worktree("/a", false, cx)
1208            })
1209            .await
1210            .unwrap();
1211        worktree_a
1212            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1213            .await;
1214        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1215        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1216        project_a
1217            .update(&mut cx_a, |p, cx| p.share(cx))
1218            .await
1219            .unwrap();
1220        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1221
1222        // Join that project as client B
1223        let project_b = Project::remote(
1224            project_id,
1225            client_b.clone(),
1226            client_b.user_store.clone(),
1227            lang_registry.clone(),
1228            fs.clone(),
1229            &mut cx_b.to_async(),
1230        )
1231        .await
1232        .unwrap();
1233        project_b
1234            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1235            .await
1236            .unwrap();
1237
1238        // Unshare the project as client A
1239        project_a
1240            .update(&mut cx_a, |project, cx| project.unshare(cx))
1241            .await
1242            .unwrap();
1243        project_b
1244            .condition(&mut cx_b, |project, _| project.is_read_only())
1245            .await;
1246        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1247        drop(project_b);
1248
1249        // Share the project again and ensure guests can still join.
1250        project_a
1251            .update(&mut cx_a, |project, cx| project.share(cx))
1252            .await
1253            .unwrap();
1254        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1255
1256        let project_c = Project::remote(
1257            project_id,
1258            client_b.clone(),
1259            client_b.user_store.clone(),
1260            lang_registry.clone(),
1261            fs.clone(),
1262            &mut cx_b.to_async(),
1263        )
1264        .await
1265        .unwrap();
1266        project_c
1267            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1268            .await
1269            .unwrap();
1270    }
1271
1272    #[gpui::test(iterations = 10)]
1273    async fn test_propagate_saves_and_fs_changes(
1274        mut cx_a: TestAppContext,
1275        mut cx_b: TestAppContext,
1276        mut cx_c: TestAppContext,
1277    ) {
1278        let lang_registry = Arc::new(LanguageRegistry::new());
1279        let fs = FakeFs::new(cx_a.background());
1280        cx_a.foreground().forbid_parking();
1281
1282        // Connect to a server as 3 clients.
1283        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1284        let client_a = server.create_client(&mut cx_a, "user_a").await;
1285        let client_b = server.create_client(&mut cx_b, "user_b").await;
1286        let client_c = server.create_client(&mut cx_c, "user_c").await;
1287
1288        // Share a worktree as client A.
1289        fs.insert_tree(
1290            "/a",
1291            json!({
1292                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1293                "file1": "",
1294                "file2": ""
1295            }),
1296        )
1297        .await;
1298        let project_a = cx_a.update(|cx| {
1299            Project::local(
1300                client_a.clone(),
1301                client_a.user_store.clone(),
1302                lang_registry.clone(),
1303                fs.clone(),
1304                cx,
1305            )
1306        });
1307        let (worktree_a, _) = project_a
1308            .update(&mut cx_a, |p, cx| {
1309                p.find_or_create_local_worktree("/a", false, cx)
1310            })
1311            .await
1312            .unwrap();
1313        worktree_a
1314            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1315            .await;
1316        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1317        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1318        project_a
1319            .update(&mut cx_a, |p, cx| p.share(cx))
1320            .await
1321            .unwrap();
1322
1323        // Join that worktree as clients B and C.
1324        let project_b = Project::remote(
1325            project_id,
1326            client_b.clone(),
1327            client_b.user_store.clone(),
1328            lang_registry.clone(),
1329            fs.clone(),
1330            &mut cx_b.to_async(),
1331        )
1332        .await
1333        .unwrap();
1334        let project_c = Project::remote(
1335            project_id,
1336            client_c.clone(),
1337            client_c.user_store.clone(),
1338            lang_registry.clone(),
1339            fs.clone(),
1340            &mut cx_c.to_async(),
1341        )
1342        .await
1343        .unwrap();
1344        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1345        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1346
1347        // Open and edit a buffer as both guests B and C.
1348        let buffer_b = project_b
1349            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1350            .await
1351            .unwrap();
1352        let buffer_c = project_c
1353            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1354            .await
1355            .unwrap();
1356        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1357        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1358
1359        // Open and edit that buffer as the host.
1360        let buffer_a = project_a
1361            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1362            .await
1363            .unwrap();
1364
1365        buffer_a
1366            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1367            .await;
1368        buffer_a.update(&mut cx_a, |buf, cx| {
1369            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1370        });
1371
1372        // Wait for edits to propagate
1373        buffer_a
1374            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1375            .await;
1376        buffer_b
1377            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1378            .await;
1379        buffer_c
1380            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1381            .await;
1382
1383        // Edit the buffer as the host and concurrently save as guest B.
1384        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1385        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1386        save_b.await.unwrap();
1387        assert_eq!(
1388            fs.load("/a/file1".as_ref()).await.unwrap(),
1389            "hi-a, i-am-c, i-am-b, i-am-a"
1390        );
1391        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1392        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1393        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1394
1395        // Make changes on host's file system, see those changes on guest worktrees.
1396        fs.rename(
1397            "/a/file1".as_ref(),
1398            "/a/file1-renamed".as_ref(),
1399            Default::default(),
1400        )
1401        .await
1402        .unwrap();
1403
1404        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1405            .await
1406            .unwrap();
1407        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1408
1409        worktree_a
1410            .condition(&cx_a, |tree, _| {
1411                tree.paths()
1412                    .map(|p| p.to_string_lossy())
1413                    .collect::<Vec<_>>()
1414                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1415            })
1416            .await;
1417        worktree_b
1418            .condition(&cx_b, |tree, _| {
1419                tree.paths()
1420                    .map(|p| p.to_string_lossy())
1421                    .collect::<Vec<_>>()
1422                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1423            })
1424            .await;
1425        worktree_c
1426            .condition(&cx_c, |tree, _| {
1427                tree.paths()
1428                    .map(|p| p.to_string_lossy())
1429                    .collect::<Vec<_>>()
1430                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1431            })
1432            .await;
1433
1434        // Ensure buffer files are updated as well.
1435        buffer_a
1436            .condition(&cx_a, |buf, _| {
1437                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1438            })
1439            .await;
1440        buffer_b
1441            .condition(&cx_b, |buf, _| {
1442                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1443            })
1444            .await;
1445        buffer_c
1446            .condition(&cx_c, |buf, _| {
1447                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1448            })
1449            .await;
1450    }
1451
1452    #[gpui::test(iterations = 10)]
1453    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1454        cx_a.foreground().forbid_parking();
1455        let lang_registry = Arc::new(LanguageRegistry::new());
1456        let fs = FakeFs::new(cx_a.background());
1457
1458        // Connect to a server as 2 clients.
1459        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1460        let client_a = server.create_client(&mut cx_a, "user_a").await;
1461        let client_b = server.create_client(&mut cx_b, "user_b").await;
1462
1463        // Share a project as client A
1464        fs.insert_tree(
1465            "/dir",
1466            json!({
1467                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1468                "a.txt": "a-contents",
1469            }),
1470        )
1471        .await;
1472
1473        let project_a = cx_a.update(|cx| {
1474            Project::local(
1475                client_a.clone(),
1476                client_a.user_store.clone(),
1477                lang_registry.clone(),
1478                fs.clone(),
1479                cx,
1480            )
1481        });
1482        let (worktree_a, _) = project_a
1483            .update(&mut cx_a, |p, cx| {
1484                p.find_or_create_local_worktree("/dir", false, cx)
1485            })
1486            .await
1487            .unwrap();
1488        worktree_a
1489            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1490            .await;
1491        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1492        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1493        project_a
1494            .update(&mut cx_a, |p, cx| p.share(cx))
1495            .await
1496            .unwrap();
1497
1498        // Join that project as client B
1499        let project_b = Project::remote(
1500            project_id,
1501            client_b.clone(),
1502            client_b.user_store.clone(),
1503            lang_registry.clone(),
1504            fs.clone(),
1505            &mut cx_b.to_async(),
1506        )
1507        .await
1508        .unwrap();
1509
1510        // Open a buffer as client B
1511        let buffer_b = project_b
1512            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1513            .await
1514            .unwrap();
1515
1516        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1517        buffer_b.read_with(&cx_b, |buf, _| {
1518            assert!(buf.is_dirty());
1519            assert!(!buf.has_conflict());
1520        });
1521
1522        buffer_b
1523            .update(&mut cx_b, |buf, cx| buf.save(cx))
1524            .await
1525            .unwrap();
1526        buffer_b
1527            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1528            .await;
1529        buffer_b.read_with(&cx_b, |buf, _| {
1530            assert!(!buf.has_conflict());
1531        });
1532
1533        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1534        buffer_b.read_with(&cx_b, |buf, _| {
1535            assert!(buf.is_dirty());
1536            assert!(!buf.has_conflict());
1537        });
1538    }
1539
1540    #[gpui::test(iterations = 10)]
1541    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1542        cx_a.foreground().forbid_parking();
1543        let lang_registry = Arc::new(LanguageRegistry::new());
1544        let fs = FakeFs::new(cx_a.background());
1545
1546        // Connect to a server as 2 clients.
1547        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1548        let client_a = server.create_client(&mut cx_a, "user_a").await;
1549        let client_b = server.create_client(&mut cx_b, "user_b").await;
1550
1551        // Share a project as client A
1552        fs.insert_tree(
1553            "/dir",
1554            json!({
1555                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1556                "a.txt": "a-contents",
1557            }),
1558        )
1559        .await;
1560
1561        let project_a = cx_a.update(|cx| {
1562            Project::local(
1563                client_a.clone(),
1564                client_a.user_store.clone(),
1565                lang_registry.clone(),
1566                fs.clone(),
1567                cx,
1568            )
1569        });
1570        let (worktree_a, _) = project_a
1571            .update(&mut cx_a, |p, cx| {
1572                p.find_or_create_local_worktree("/dir", false, cx)
1573            })
1574            .await
1575            .unwrap();
1576        worktree_a
1577            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1578            .await;
1579        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1580        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1581        project_a
1582            .update(&mut cx_a, |p, cx| p.share(cx))
1583            .await
1584            .unwrap();
1585
1586        // Join that project as client B
1587        let project_b = Project::remote(
1588            project_id,
1589            client_b.clone(),
1590            client_b.user_store.clone(),
1591            lang_registry.clone(),
1592            fs.clone(),
1593            &mut cx_b.to_async(),
1594        )
1595        .await
1596        .unwrap();
1597        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1598
1599        // Open a buffer as client B
1600        let buffer_b = project_b
1601            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1602            .await
1603            .unwrap();
1604        buffer_b.read_with(&cx_b, |buf, _| {
1605            assert!(!buf.is_dirty());
1606            assert!(!buf.has_conflict());
1607        });
1608
1609        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1610            .await
1611            .unwrap();
1612        buffer_b
1613            .condition(&cx_b, |buf, _| {
1614                buf.text() == "new contents" && !buf.is_dirty()
1615            })
1616            .await;
1617        buffer_b.read_with(&cx_b, |buf, _| {
1618            assert!(!buf.has_conflict());
1619        });
1620    }
1621
1622    #[gpui::test(iterations = 10)]
1623    async fn test_editing_while_guest_opens_buffer(
1624        mut cx_a: TestAppContext,
1625        mut cx_b: TestAppContext,
1626    ) {
1627        cx_a.foreground().forbid_parking();
1628        let lang_registry = Arc::new(LanguageRegistry::new());
1629        let fs = FakeFs::new(cx_a.background());
1630
1631        // Connect to a server as 2 clients.
1632        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1633        let client_a = server.create_client(&mut cx_a, "user_a").await;
1634        let client_b = server.create_client(&mut cx_b, "user_b").await;
1635
1636        // Share a project as client A
1637        fs.insert_tree(
1638            "/dir",
1639            json!({
1640                ".zed.toml": r#"collaborators = ["user_b"]"#,
1641                "a.txt": "a-contents",
1642            }),
1643        )
1644        .await;
1645        let project_a = cx_a.update(|cx| {
1646            Project::local(
1647                client_a.clone(),
1648                client_a.user_store.clone(),
1649                lang_registry.clone(),
1650                fs.clone(),
1651                cx,
1652            )
1653        });
1654        let (worktree_a, _) = project_a
1655            .update(&mut cx_a, |p, cx| {
1656                p.find_or_create_local_worktree("/dir", false, cx)
1657            })
1658            .await
1659            .unwrap();
1660        worktree_a
1661            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1662            .await;
1663        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1664        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1665        project_a
1666            .update(&mut cx_a, |p, cx| p.share(cx))
1667            .await
1668            .unwrap();
1669
1670        // Join that project as client B
1671        let project_b = Project::remote(
1672            project_id,
1673            client_b.clone(),
1674            client_b.user_store.clone(),
1675            lang_registry.clone(),
1676            fs.clone(),
1677            &mut cx_b.to_async(),
1678        )
1679        .await
1680        .unwrap();
1681
1682        // Open a buffer as client A
1683        let buffer_a = project_a
1684            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1685            .await
1686            .unwrap();
1687
1688        // Start opening the same buffer as client B
1689        let buffer_b = cx_b
1690            .background()
1691            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1692
1693        // Edit the buffer as client A while client B is still opening it.
1694        cx_b.background().simulate_random_delay().await;
1695        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1696        cx_b.background().simulate_random_delay().await;
1697        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1698
1699        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1700        let buffer_b = buffer_b.await.unwrap();
1701        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1702    }
1703
1704    #[gpui::test(iterations = 10)]
1705    async fn test_leaving_worktree_while_opening_buffer(
1706        mut cx_a: TestAppContext,
1707        mut cx_b: TestAppContext,
1708    ) {
1709        cx_a.foreground().forbid_parking();
1710        let lang_registry = Arc::new(LanguageRegistry::new());
1711        let fs = FakeFs::new(cx_a.background());
1712
1713        // Connect to a server as 2 clients.
1714        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1715        let client_a = server.create_client(&mut cx_a, "user_a").await;
1716        let client_b = server.create_client(&mut cx_b, "user_b").await;
1717
1718        // Share a project as client A
1719        fs.insert_tree(
1720            "/dir",
1721            json!({
1722                ".zed.toml": r#"collaborators = ["user_b"]"#,
1723                "a.txt": "a-contents",
1724            }),
1725        )
1726        .await;
1727        let project_a = cx_a.update(|cx| {
1728            Project::local(
1729                client_a.clone(),
1730                client_a.user_store.clone(),
1731                lang_registry.clone(),
1732                fs.clone(),
1733                cx,
1734            )
1735        });
1736        let (worktree_a, _) = project_a
1737            .update(&mut cx_a, |p, cx| {
1738                p.find_or_create_local_worktree("/dir", false, cx)
1739            })
1740            .await
1741            .unwrap();
1742        worktree_a
1743            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1744            .await;
1745        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1746        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1747        project_a
1748            .update(&mut cx_a, |p, cx| p.share(cx))
1749            .await
1750            .unwrap();
1751
1752        // Join that project as client B
1753        let project_b = Project::remote(
1754            project_id,
1755            client_b.clone(),
1756            client_b.user_store.clone(),
1757            lang_registry.clone(),
1758            fs.clone(),
1759            &mut cx_b.to_async(),
1760        )
1761        .await
1762        .unwrap();
1763
1764        // See that a guest has joined as client A.
1765        project_a
1766            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1767            .await;
1768
1769        // Begin opening a buffer as client B, but leave the project before the open completes.
1770        let buffer_b = cx_b
1771            .background()
1772            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1773        cx_b.update(|_| drop(project_b));
1774        drop(buffer_b);
1775
1776        // See that the guest has left.
1777        project_a
1778            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1779            .await;
1780    }
1781
1782    #[gpui::test(iterations = 10)]
1783    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1784        cx_a.foreground().forbid_parking();
1785        let lang_registry = Arc::new(LanguageRegistry::new());
1786        let fs = FakeFs::new(cx_a.background());
1787
1788        // Connect to a server as 2 clients.
1789        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1790        let client_a = server.create_client(&mut cx_a, "user_a").await;
1791        let client_b = server.create_client(&mut cx_b, "user_b").await;
1792
1793        // Share a project as client A
1794        fs.insert_tree(
1795            "/a",
1796            json!({
1797                ".zed.toml": r#"collaborators = ["user_b"]"#,
1798                "a.txt": "a-contents",
1799                "b.txt": "b-contents",
1800            }),
1801        )
1802        .await;
1803        let project_a = cx_a.update(|cx| {
1804            Project::local(
1805                client_a.clone(),
1806                client_a.user_store.clone(),
1807                lang_registry.clone(),
1808                fs.clone(),
1809                cx,
1810            )
1811        });
1812        let (worktree_a, _) = project_a
1813            .update(&mut cx_a, |p, cx| {
1814                p.find_or_create_local_worktree("/a", false, cx)
1815            })
1816            .await
1817            .unwrap();
1818        worktree_a
1819            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1820            .await;
1821        let project_id = project_a
1822            .update(&mut cx_a, |project, _| project.next_remote_id())
1823            .await;
1824        project_a
1825            .update(&mut cx_a, |project, cx| project.share(cx))
1826            .await
1827            .unwrap();
1828
1829        // Join that project as client B
1830        let _project_b = Project::remote(
1831            project_id,
1832            client_b.clone(),
1833            client_b.user_store.clone(),
1834            lang_registry.clone(),
1835            fs.clone(),
1836            &mut cx_b.to_async(),
1837        )
1838        .await
1839        .unwrap();
1840
1841        // See that a guest has joined as client A.
1842        project_a
1843            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1844            .await;
1845
1846        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1847        client_b.disconnect(&cx_b.to_async()).unwrap();
1848        project_a
1849            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1850            .await;
1851    }
1852
1853    #[gpui::test(iterations = 10)]
1854    async fn test_collaborating_with_diagnostics(
1855        mut cx_a: TestAppContext,
1856        mut cx_b: TestAppContext,
1857    ) {
1858        cx_a.foreground().forbid_parking();
1859        let mut lang_registry = Arc::new(LanguageRegistry::new());
1860        let fs = FakeFs::new(cx_a.background());
1861
1862        // Set up a fake language server.
1863        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1864        Arc::get_mut(&mut lang_registry)
1865            .unwrap()
1866            .add(Arc::new(Language::new(
1867                LanguageConfig {
1868                    name: "Rust".into(),
1869                    path_suffixes: vec!["rs".to_string()],
1870                    language_server: Some(language_server_config),
1871                    ..Default::default()
1872                },
1873                Some(tree_sitter_rust::language()),
1874            )));
1875
1876        // Connect to a server as 2 clients.
1877        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1878        let client_a = server.create_client(&mut cx_a, "user_a").await;
1879        let client_b = server.create_client(&mut cx_b, "user_b").await;
1880
1881        // Share a project as client A
1882        fs.insert_tree(
1883            "/a",
1884            json!({
1885                ".zed.toml": r#"collaborators = ["user_b"]"#,
1886                "a.rs": "let one = two",
1887                "other.rs": "",
1888            }),
1889        )
1890        .await;
1891        let project_a = cx_a.update(|cx| {
1892            Project::local(
1893                client_a.clone(),
1894                client_a.user_store.clone(),
1895                lang_registry.clone(),
1896                fs.clone(),
1897                cx,
1898            )
1899        });
1900        let (worktree_a, _) = project_a
1901            .update(&mut cx_a, |p, cx| {
1902                p.find_or_create_local_worktree("/a", false, cx)
1903            })
1904            .await
1905            .unwrap();
1906        worktree_a
1907            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1908            .await;
1909        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1910        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1911        project_a
1912            .update(&mut cx_a, |p, cx| p.share(cx))
1913            .await
1914            .unwrap();
1915
1916        // Cause the language server to start.
1917        let _ = cx_a
1918            .background()
1919            .spawn(project_a.update(&mut cx_a, |project, cx| {
1920                project.open_buffer(
1921                    ProjectPath {
1922                        worktree_id,
1923                        path: Path::new("other.rs").into(),
1924                    },
1925                    cx,
1926                )
1927            }))
1928            .await
1929            .unwrap();
1930
1931        // Simulate a language server reporting errors for a file.
1932        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1933        fake_language_server
1934            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1935                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1936                version: None,
1937                diagnostics: vec![lsp::Diagnostic {
1938                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1939                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1940                    message: "message 1".to_string(),
1941                    ..Default::default()
1942                }],
1943            })
1944            .await;
1945
1946        // Wait for server to see the diagnostics update.
1947        server
1948            .condition(|store| {
1949                let worktree = store
1950                    .project(project_id)
1951                    .unwrap()
1952                    .share
1953                    .as_ref()
1954                    .unwrap()
1955                    .worktrees
1956                    .get(&worktree_id.to_proto())
1957                    .unwrap();
1958
1959                !worktree.diagnostic_summaries.is_empty()
1960            })
1961            .await;
1962
1963        // Join the worktree as client B.
1964        let project_b = Project::remote(
1965            project_id,
1966            client_b.clone(),
1967            client_b.user_store.clone(),
1968            lang_registry.clone(),
1969            fs.clone(),
1970            &mut cx_b.to_async(),
1971        )
1972        .await
1973        .unwrap();
1974
1975        project_b.read_with(&cx_b, |project, cx| {
1976            assert_eq!(
1977                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1978                &[(
1979                    ProjectPath {
1980                        worktree_id,
1981                        path: Arc::from(Path::new("a.rs")),
1982                    },
1983                    DiagnosticSummary {
1984                        error_count: 1,
1985                        warning_count: 0,
1986                        ..Default::default()
1987                    },
1988                )]
1989            )
1990        });
1991
1992        // Simulate a language server reporting more errors for a file.
1993        fake_language_server
1994            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1995                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1996                version: None,
1997                diagnostics: vec![
1998                    lsp::Diagnostic {
1999                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2000                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2001                        message: "message 1".to_string(),
2002                        ..Default::default()
2003                    },
2004                    lsp::Diagnostic {
2005                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2006                        range: lsp::Range::new(
2007                            lsp::Position::new(0, 10),
2008                            lsp::Position::new(0, 13),
2009                        ),
2010                        message: "message 2".to_string(),
2011                        ..Default::default()
2012                    },
2013                ],
2014            })
2015            .await;
2016
2017        // Client b gets the updated summaries
2018        project_b
2019            .condition(&cx_b, |project, cx| {
2020                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2021                    == &[(
2022                        ProjectPath {
2023                            worktree_id,
2024                            path: Arc::from(Path::new("a.rs")),
2025                        },
2026                        DiagnosticSummary {
2027                            error_count: 1,
2028                            warning_count: 1,
2029                            ..Default::default()
2030                        },
2031                    )]
2032            })
2033            .await;
2034
2035        // Open the file with the errors on client B. They should be present.
2036        let buffer_b = cx_b
2037            .background()
2038            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2039            .await
2040            .unwrap();
2041
2042        buffer_b.read_with(&cx_b, |buffer, _| {
2043            assert_eq!(
2044                buffer
2045                    .snapshot()
2046                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2047                    .map(|entry| entry)
2048                    .collect::<Vec<_>>(),
2049                &[
2050                    DiagnosticEntry {
2051                        range: Point::new(0, 4)..Point::new(0, 7),
2052                        diagnostic: Diagnostic {
2053                            group_id: 0,
2054                            message: "message 1".to_string(),
2055                            severity: lsp::DiagnosticSeverity::ERROR,
2056                            is_primary: true,
2057                            ..Default::default()
2058                        }
2059                    },
2060                    DiagnosticEntry {
2061                        range: Point::new(0, 10)..Point::new(0, 13),
2062                        diagnostic: Diagnostic {
2063                            group_id: 1,
2064                            severity: lsp::DiagnosticSeverity::WARNING,
2065                            message: "message 2".to_string(),
2066                            is_primary: true,
2067                            ..Default::default()
2068                        }
2069                    }
2070                ]
2071            );
2072        });
2073    }
2074
2075    #[gpui::test(iterations = 10)]
2076    async fn test_collaborating_with_completion(
2077        mut cx_a: TestAppContext,
2078        mut cx_b: TestAppContext,
2079    ) {
2080        cx_a.foreground().forbid_parking();
2081        let mut lang_registry = Arc::new(LanguageRegistry::new());
2082        let fs = FakeFs::new(cx_a.background());
2083
2084        // Set up a fake language server.
2085        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2086        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2087            completion_provider: Some(lsp::CompletionOptions {
2088                trigger_characters: Some(vec![".".to_string()]),
2089                ..Default::default()
2090            }),
2091            ..Default::default()
2092        });
2093        Arc::get_mut(&mut lang_registry)
2094            .unwrap()
2095            .add(Arc::new(Language::new(
2096                LanguageConfig {
2097                    name: "Rust".into(),
2098                    path_suffixes: vec!["rs".to_string()],
2099                    language_server: Some(language_server_config),
2100                    ..Default::default()
2101                },
2102                Some(tree_sitter_rust::language()),
2103            )));
2104
2105        // Connect to a server as 2 clients.
2106        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2107        let client_a = server.create_client(&mut cx_a, "user_a").await;
2108        let client_b = server.create_client(&mut cx_b, "user_b").await;
2109
2110        // Share a project as client A
2111        fs.insert_tree(
2112            "/a",
2113            json!({
2114                ".zed.toml": r#"collaborators = ["user_b"]"#,
2115                "main.rs": "fn main() { a }",
2116                "other.rs": "",
2117            }),
2118        )
2119        .await;
2120        let project_a = cx_a.update(|cx| {
2121            Project::local(
2122                client_a.clone(),
2123                client_a.user_store.clone(),
2124                lang_registry.clone(),
2125                fs.clone(),
2126                cx,
2127            )
2128        });
2129        let (worktree_a, _) = project_a
2130            .update(&mut cx_a, |p, cx| {
2131                p.find_or_create_local_worktree("/a", false, cx)
2132            })
2133            .await
2134            .unwrap();
2135        worktree_a
2136            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2137            .await;
2138        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2139        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2140        project_a
2141            .update(&mut cx_a, |p, cx| p.share(cx))
2142            .await
2143            .unwrap();
2144
2145        // Join the worktree as client B.
2146        let project_b = Project::remote(
2147            project_id,
2148            client_b.clone(),
2149            client_b.user_store.clone(),
2150            lang_registry.clone(),
2151            fs.clone(),
2152            &mut cx_b.to_async(),
2153        )
2154        .await
2155        .unwrap();
2156
2157        // Open a file in an editor as the guest.
2158        let buffer_b = project_b
2159            .update(&mut cx_b, |p, cx| {
2160                p.open_buffer((worktree_id, "main.rs"), cx)
2161            })
2162            .await
2163            .unwrap();
2164        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2165        let editor_b = cx_b.add_view(window_b, |cx| {
2166            Editor::for_buffer(
2167                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2168                Some(project_b.clone()),
2169                watch::channel_with(Settings::test(cx)).1,
2170                cx,
2171            )
2172        });
2173
2174        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2175        buffer_b
2176            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2177            .await;
2178
2179        // Type a completion trigger character as the guest.
2180        editor_b.update(&mut cx_b, |editor, cx| {
2181            editor.select_ranges([13..13], None, cx);
2182            editor.handle_input(&Input(".".into()), cx);
2183            cx.focus(&editor_b);
2184        });
2185
2186        // Receive a completion request as the host's language server.
2187        // Return some completions from the host's language server.
2188        cx_a.foreground().start_waiting();
2189        fake_language_server
2190            .handle_request::<lsp::request::Completion, _>(|params, _| {
2191                assert_eq!(
2192                    params.text_document_position.text_document.uri,
2193                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2194                );
2195                assert_eq!(
2196                    params.text_document_position.position,
2197                    lsp::Position::new(0, 14),
2198                );
2199
2200                Some(lsp::CompletionResponse::Array(vec![
2201                    lsp::CompletionItem {
2202                        label: "first_method(…)".into(),
2203                        detail: Some("fn(&mut self, B) -> C".into()),
2204                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2205                            new_text: "first_method($1)".to_string(),
2206                            range: lsp::Range::new(
2207                                lsp::Position::new(0, 14),
2208                                lsp::Position::new(0, 14),
2209                            ),
2210                        })),
2211                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2212                        ..Default::default()
2213                    },
2214                    lsp::CompletionItem {
2215                        label: "second_method(…)".into(),
2216                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2217                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2218                            new_text: "second_method()".to_string(),
2219                            range: lsp::Range::new(
2220                                lsp::Position::new(0, 14),
2221                                lsp::Position::new(0, 14),
2222                            ),
2223                        })),
2224                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2225                        ..Default::default()
2226                    },
2227                ]))
2228            })
2229            .next()
2230            .await
2231            .unwrap();
2232        cx_a.foreground().finish_waiting();
2233
2234        // Open the buffer on the host.
2235        let buffer_a = project_a
2236            .update(&mut cx_a, |p, cx| {
2237                p.open_buffer((worktree_id, "main.rs"), cx)
2238            })
2239            .await
2240            .unwrap();
2241        buffer_a
2242            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2243            .await;
2244
2245        // Confirm a completion on the guest.
2246        editor_b
2247            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2248            .await;
2249        editor_b.update(&mut cx_b, |editor, cx| {
2250            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2251            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2252        });
2253
2254        // Return a resolved completion from the host's language server.
2255        // The resolved completion has an additional text edit.
2256        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2257            |params, _| {
2258                assert_eq!(params.label, "first_method(…)");
2259                lsp::CompletionItem {
2260                    label: "first_method(…)".into(),
2261                    detail: Some("fn(&mut self, B) -> C".into()),
2262                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2263                        new_text: "first_method($1)".to_string(),
2264                        range: lsp::Range::new(
2265                            lsp::Position::new(0, 14),
2266                            lsp::Position::new(0, 14),
2267                        ),
2268                    })),
2269                    additional_text_edits: Some(vec![lsp::TextEdit {
2270                        new_text: "use d::SomeTrait;\n".to_string(),
2271                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2272                    }]),
2273                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2274                    ..Default::default()
2275                }
2276            },
2277        );
2278
2279        // The additional edit is applied.
2280        buffer_a
2281            .condition(&cx_a, |buffer, _| {
2282                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2283            })
2284            .await;
2285        buffer_b
2286            .condition(&cx_b, |buffer, _| {
2287                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2288            })
2289            .await;
2290    }
2291
2292    #[gpui::test(iterations = 10)]
2293    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2294        cx_a.foreground().forbid_parking();
2295        let mut lang_registry = Arc::new(LanguageRegistry::new());
2296        let fs = FakeFs::new(cx_a.background());
2297
2298        // Set up a fake language server.
2299        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2300        Arc::get_mut(&mut lang_registry)
2301            .unwrap()
2302            .add(Arc::new(Language::new(
2303                LanguageConfig {
2304                    name: "Rust".into(),
2305                    path_suffixes: vec!["rs".to_string()],
2306                    language_server: Some(language_server_config),
2307                    ..Default::default()
2308                },
2309                Some(tree_sitter_rust::language()),
2310            )));
2311
2312        // Connect to a server as 2 clients.
2313        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2314        let client_a = server.create_client(&mut cx_a, "user_a").await;
2315        let client_b = server.create_client(&mut cx_b, "user_b").await;
2316
2317        // Share a project as client A
2318        fs.insert_tree(
2319            "/a",
2320            json!({
2321                ".zed.toml": r#"collaborators = ["user_b"]"#,
2322                "a.rs": "let one = two",
2323            }),
2324        )
2325        .await;
2326        let project_a = cx_a.update(|cx| {
2327            Project::local(
2328                client_a.clone(),
2329                client_a.user_store.clone(),
2330                lang_registry.clone(),
2331                fs.clone(),
2332                cx,
2333            )
2334        });
2335        let (worktree_a, _) = project_a
2336            .update(&mut cx_a, |p, cx| {
2337                p.find_or_create_local_worktree("/a", false, cx)
2338            })
2339            .await
2340            .unwrap();
2341        worktree_a
2342            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2343            .await;
2344        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2345        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2346        project_a
2347            .update(&mut cx_a, |p, cx| p.share(cx))
2348            .await
2349            .unwrap();
2350
2351        // Join the worktree as client B.
2352        let project_b = Project::remote(
2353            project_id,
2354            client_b.clone(),
2355            client_b.user_store.clone(),
2356            lang_registry.clone(),
2357            fs.clone(),
2358            &mut cx_b.to_async(),
2359        )
2360        .await
2361        .unwrap();
2362
2363        let buffer_b = cx_b
2364            .background()
2365            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2366            .await
2367            .unwrap();
2368
2369        let format = project_b.update(&mut cx_b, |project, cx| {
2370            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2371        });
2372
2373        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2374        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2375            Some(vec![
2376                lsp::TextEdit {
2377                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2378                    new_text: "h".to_string(),
2379                },
2380                lsp::TextEdit {
2381                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2382                    new_text: "y".to_string(),
2383                },
2384            ])
2385        });
2386
2387        format.await.unwrap();
2388        assert_eq!(
2389            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2390            "let honey = two"
2391        );
2392    }
2393
2394    #[gpui::test(iterations = 10)]
2395    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2396        cx_a.foreground().forbid_parking();
2397        let mut lang_registry = Arc::new(LanguageRegistry::new());
2398        let fs = FakeFs::new(cx_a.background());
2399        fs.insert_tree(
2400            "/root-1",
2401            json!({
2402                ".zed.toml": r#"collaborators = ["user_b"]"#,
2403                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2404            }),
2405        )
2406        .await;
2407        fs.insert_tree(
2408            "/root-2",
2409            json!({
2410                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2411            }),
2412        )
2413        .await;
2414
2415        // Set up a fake language server.
2416        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2417        Arc::get_mut(&mut lang_registry)
2418            .unwrap()
2419            .add(Arc::new(Language::new(
2420                LanguageConfig {
2421                    name: "Rust".into(),
2422                    path_suffixes: vec!["rs".to_string()],
2423                    language_server: Some(language_server_config),
2424                    ..Default::default()
2425                },
2426                Some(tree_sitter_rust::language()),
2427            )));
2428
2429        // Connect to a server as 2 clients.
2430        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2431        let client_a = server.create_client(&mut cx_a, "user_a").await;
2432        let client_b = server.create_client(&mut cx_b, "user_b").await;
2433
2434        // Share a project as client A
2435        let project_a = cx_a.update(|cx| {
2436            Project::local(
2437                client_a.clone(),
2438                client_a.user_store.clone(),
2439                lang_registry.clone(),
2440                fs.clone(),
2441                cx,
2442            )
2443        });
2444        let (worktree_a, _) = project_a
2445            .update(&mut cx_a, |p, cx| {
2446                p.find_or_create_local_worktree("/root-1", false, cx)
2447            })
2448            .await
2449            .unwrap();
2450        worktree_a
2451            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2452            .await;
2453        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2454        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2455        project_a
2456            .update(&mut cx_a, |p, cx| p.share(cx))
2457            .await
2458            .unwrap();
2459
2460        // Join the worktree as client B.
2461        let project_b = Project::remote(
2462            project_id,
2463            client_b.clone(),
2464            client_b.user_store.clone(),
2465            lang_registry.clone(),
2466            fs.clone(),
2467            &mut cx_b.to_async(),
2468        )
2469        .await
2470        .unwrap();
2471
2472        // Open the file on client B.
2473        let buffer_b = cx_b
2474            .background()
2475            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2476            .await
2477            .unwrap();
2478
2479        // Request the definition of a symbol as the guest.
2480        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2481
2482        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2483        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2484            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2485                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2486                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2487            )))
2488        });
2489
2490        let definitions_1 = definitions_1.await.unwrap();
2491        cx_b.read(|cx| {
2492            assert_eq!(definitions_1.len(), 1);
2493            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2494            let target_buffer = definitions_1[0].buffer.read(cx);
2495            assert_eq!(
2496                target_buffer.text(),
2497                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2498            );
2499            assert_eq!(
2500                definitions_1[0].range.to_point(target_buffer),
2501                Point::new(0, 6)..Point::new(0, 9)
2502            );
2503        });
2504
2505        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2506        // the previous call to `definition`.
2507        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2508        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2509            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2510                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2511                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2512            )))
2513        });
2514
2515        let definitions_2 = definitions_2.await.unwrap();
2516        cx_b.read(|cx| {
2517            assert_eq!(definitions_2.len(), 1);
2518            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2519            let target_buffer = definitions_2[0].buffer.read(cx);
2520            assert_eq!(
2521                target_buffer.text(),
2522                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2523            );
2524            assert_eq!(
2525                definitions_2[0].range.to_point(target_buffer),
2526                Point::new(1, 6)..Point::new(1, 11)
2527            );
2528        });
2529        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2530    }
2531
2532    #[gpui::test(iterations = 10)]
2533    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2534        cx_a.foreground().forbid_parking();
2535        let mut lang_registry = Arc::new(LanguageRegistry::new());
2536        let fs = FakeFs::new(cx_a.background());
2537        fs.insert_tree(
2538            "/root-1",
2539            json!({
2540                ".zed.toml": r#"collaborators = ["user_b"]"#,
2541                "one.rs": "const ONE: usize = 1;",
2542                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2543            }),
2544        )
2545        .await;
2546        fs.insert_tree(
2547            "/root-2",
2548            json!({
2549                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2550            }),
2551        )
2552        .await;
2553
2554        // Set up a fake language server.
2555        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2556        Arc::get_mut(&mut lang_registry)
2557            .unwrap()
2558            .add(Arc::new(Language::new(
2559                LanguageConfig {
2560                    name: "Rust".into(),
2561                    path_suffixes: vec!["rs".to_string()],
2562                    language_server: Some(language_server_config),
2563                    ..Default::default()
2564                },
2565                Some(tree_sitter_rust::language()),
2566            )));
2567
2568        // Connect to a server as 2 clients.
2569        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2570        let client_a = server.create_client(&mut cx_a, "user_a").await;
2571        let client_b = server.create_client(&mut cx_b, "user_b").await;
2572
2573        // Share a project as client A
2574        let project_a = cx_a.update(|cx| {
2575            Project::local(
2576                client_a.clone(),
2577                client_a.user_store.clone(),
2578                lang_registry.clone(),
2579                fs.clone(),
2580                cx,
2581            )
2582        });
2583        let (worktree_a, _) = project_a
2584            .update(&mut cx_a, |p, cx| {
2585                p.find_or_create_local_worktree("/root-1", false, cx)
2586            })
2587            .await
2588            .unwrap();
2589        worktree_a
2590            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2591            .await;
2592        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2593        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2594        project_a
2595            .update(&mut cx_a, |p, cx| p.share(cx))
2596            .await
2597            .unwrap();
2598
2599        // Join the worktree as client B.
2600        let project_b = Project::remote(
2601            project_id,
2602            client_b.clone(),
2603            client_b.user_store.clone(),
2604            lang_registry.clone(),
2605            fs.clone(),
2606            &mut cx_b.to_async(),
2607        )
2608        .await
2609        .unwrap();
2610
2611        // Open the file on client B.
2612        let buffer_b = cx_b
2613            .background()
2614            .spawn(project_b.update(&mut cx_b, |p, cx| {
2615                p.open_buffer((worktree_id, "one.rs"), cx)
2616            }))
2617            .await
2618            .unwrap();
2619
2620        // Request references to a symbol as the guest.
2621        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2622
2623        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2624        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2625            assert_eq!(
2626                params.text_document_position.text_document.uri.as_str(),
2627                "file:///root-1/one.rs"
2628            );
2629            Some(vec![
2630                lsp::Location {
2631                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2632                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2633                },
2634                lsp::Location {
2635                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2636                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2637                },
2638                lsp::Location {
2639                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2640                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2641                },
2642            ])
2643        });
2644
2645        let references = references.await.unwrap();
2646        cx_b.read(|cx| {
2647            assert_eq!(references.len(), 3);
2648            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2649
2650            let two_buffer = references[0].buffer.read(cx);
2651            let three_buffer = references[2].buffer.read(cx);
2652            assert_eq!(
2653                two_buffer.file().unwrap().path().as_ref(),
2654                Path::new("two.rs")
2655            );
2656            assert_eq!(references[1].buffer, references[0].buffer);
2657            assert_eq!(
2658                three_buffer.file().unwrap().full_path(cx),
2659                Path::new("three.rs")
2660            );
2661
2662            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2663            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2664            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2665        });
2666    }
2667
2668    #[gpui::test(iterations = 10)]
2669    async fn test_project_search(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2670        cx_a.foreground().forbid_parking();
2671        let lang_registry = Arc::new(LanguageRegistry::new());
2672        let fs = FakeFs::new(cx_a.background());
2673        fs.insert_tree(
2674            "/root-1",
2675            json!({
2676                ".zed.toml": r#"collaborators = ["user_b"]"#,
2677                "a": "hello world",
2678                "b": "goodnight moon",
2679                "c": "a world of goo",
2680                "d": "world champion of clown world",
2681            }),
2682        )
2683        .await;
2684        fs.insert_tree(
2685            "/root-2",
2686            json!({
2687                "e": "disney world is fun",
2688            }),
2689        )
2690        .await;
2691
2692        // Connect to a server as 2 clients.
2693        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2694        let client_a = server.create_client(&mut cx_a, "user_a").await;
2695        let client_b = server.create_client(&mut cx_b, "user_b").await;
2696
2697        // Share a project as client A
2698        let project_a = cx_a.update(|cx| {
2699            Project::local(
2700                client_a.clone(),
2701                client_a.user_store.clone(),
2702                lang_registry.clone(),
2703                fs.clone(),
2704                cx,
2705            )
2706        });
2707        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2708
2709        let (worktree_1, _) = project_a
2710            .update(&mut cx_a, |p, cx| {
2711                p.find_or_create_local_worktree("/root-1", false, cx)
2712            })
2713            .await
2714            .unwrap();
2715        worktree_1
2716            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2717            .await;
2718        let (worktree_2, _) = project_a
2719            .update(&mut cx_a, |p, cx| {
2720                p.find_or_create_local_worktree("/root-2", false, cx)
2721            })
2722            .await
2723            .unwrap();
2724        worktree_2
2725            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2726            .await;
2727
2728        eprintln!("sharing");
2729
2730        project_a
2731            .update(&mut cx_a, |p, cx| p.share(cx))
2732            .await
2733            .unwrap();
2734
2735        // Join the worktree as client B.
2736        let project_b = Project::remote(
2737            project_id,
2738            client_b.clone(),
2739            client_b.user_store.clone(),
2740            lang_registry.clone(),
2741            fs.clone(),
2742            &mut cx_b.to_async(),
2743        )
2744        .await
2745        .unwrap();
2746
2747        let results = project_b
2748            .update(&mut cx_b, |project, cx| {
2749                project.search(SearchQuery::text("world", false, false), cx)
2750            })
2751            .await
2752            .unwrap();
2753
2754        let mut ranges_by_path = results
2755            .into_iter()
2756            .map(|(buffer, ranges)| {
2757                buffer.read_with(&cx_b, |buffer, cx| {
2758                    let path = buffer.file().unwrap().full_path(cx);
2759                    let offset_ranges = ranges
2760                        .into_iter()
2761                        .map(|range| range.to_offset(buffer))
2762                        .collect::<Vec<_>>();
2763                    (path, offset_ranges)
2764                })
2765            })
2766            .collect::<Vec<_>>();
2767        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2768
2769        assert_eq!(
2770            ranges_by_path,
2771            &[
2772                (PathBuf::from("root-1/a"), vec![6..11]),
2773                (PathBuf::from("root-1/c"), vec![2..7]),
2774                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2775                (PathBuf::from("root-2/e"), vec![7..12]),
2776            ]
2777        );
2778    }
2779
2780    #[gpui::test(iterations = 10)]
2781    async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2782        cx_a.foreground().forbid_parking();
2783        let lang_registry = Arc::new(LanguageRegistry::new());
2784        let fs = FakeFs::new(cx_a.background());
2785        fs.insert_tree(
2786            "/root-1",
2787            json!({
2788                ".zed.toml": r#"collaborators = ["user_b"]"#,
2789                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2790            }),
2791        )
2792        .await;
2793
2794        // Set up a fake language server.
2795        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2796        lang_registry.add(Arc::new(Language::new(
2797            LanguageConfig {
2798                name: "Rust".into(),
2799                path_suffixes: vec!["rs".to_string()],
2800                language_server: Some(language_server_config),
2801                ..Default::default()
2802            },
2803            Some(tree_sitter_rust::language()),
2804        )));
2805
2806        // Connect to a server as 2 clients.
2807        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2808        let client_a = server.create_client(&mut cx_a, "user_a").await;
2809        let client_b = server.create_client(&mut cx_b, "user_b").await;
2810
2811        // Share a project as client A
2812        let project_a = cx_a.update(|cx| {
2813            Project::local(
2814                client_a.clone(),
2815                client_a.user_store.clone(),
2816                lang_registry.clone(),
2817                fs.clone(),
2818                cx,
2819            )
2820        });
2821        let (worktree_a, _) = project_a
2822            .update(&mut cx_a, |p, cx| {
2823                p.find_or_create_local_worktree("/root-1", false, cx)
2824            })
2825            .await
2826            .unwrap();
2827        worktree_a
2828            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2829            .await;
2830        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2831        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2832        project_a
2833            .update(&mut cx_a, |p, cx| p.share(cx))
2834            .await
2835            .unwrap();
2836
2837        // Join the worktree as client B.
2838        let project_b = Project::remote(
2839            project_id,
2840            client_b.clone(),
2841            client_b.user_store.clone(),
2842            lang_registry.clone(),
2843            fs.clone(),
2844            &mut cx_b.to_async(),
2845        )
2846        .await
2847        .unwrap();
2848
2849        // Open the file on client B.
2850        let buffer_b = cx_b
2851            .background()
2852            .spawn(project_b.update(&mut cx_b, |p, cx| {
2853                p.open_buffer((worktree_id, "main.rs"), cx)
2854            }))
2855            .await
2856            .unwrap();
2857
2858        // Request document highlights as the guest.
2859        let highlights =
2860            project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2861
2862        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2863        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2864            |params, _| {
2865                assert_eq!(
2866                    params
2867                        .text_document_position_params
2868                        .text_document
2869                        .uri
2870                        .as_str(),
2871                    "file:///root-1/main.rs"
2872                );
2873                assert_eq!(
2874                    params.text_document_position_params.position,
2875                    lsp::Position::new(0, 34)
2876                );
2877                Some(vec![
2878                    lsp::DocumentHighlight {
2879                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2880                        range: lsp::Range::new(
2881                            lsp::Position::new(0, 10),
2882                            lsp::Position::new(0, 16),
2883                        ),
2884                    },
2885                    lsp::DocumentHighlight {
2886                        kind: Some(lsp::DocumentHighlightKind::READ),
2887                        range: lsp::Range::new(
2888                            lsp::Position::new(0, 32),
2889                            lsp::Position::new(0, 38),
2890                        ),
2891                    },
2892                    lsp::DocumentHighlight {
2893                        kind: Some(lsp::DocumentHighlightKind::READ),
2894                        range: lsp::Range::new(
2895                            lsp::Position::new(0, 41),
2896                            lsp::Position::new(0, 47),
2897                        ),
2898                    },
2899                ])
2900            },
2901        );
2902
2903        let highlights = highlights.await.unwrap();
2904        buffer_b.read_with(&cx_b, |buffer, _| {
2905            let snapshot = buffer.snapshot();
2906
2907            let highlights = highlights
2908                .into_iter()
2909                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2910                .collect::<Vec<_>>();
2911            assert_eq!(
2912                highlights,
2913                &[
2914                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2915                    (lsp::DocumentHighlightKind::READ, 32..38),
2916                    (lsp::DocumentHighlightKind::READ, 41..47)
2917                ]
2918            )
2919        });
2920    }
2921
2922    #[gpui::test(iterations = 10)]
2923    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2924        cx_a.foreground().forbid_parking();
2925        let mut lang_registry = Arc::new(LanguageRegistry::new());
2926        let fs = FakeFs::new(cx_a.background());
2927        fs.insert_tree(
2928            "/code",
2929            json!({
2930                "crate-1": {
2931                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2932                    "one.rs": "const ONE: usize = 1;",
2933                },
2934                "crate-2": {
2935                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2936                },
2937                "private": {
2938                    "passwords.txt": "the-password",
2939                }
2940            }),
2941        )
2942        .await;
2943
2944        // Set up a fake language server.
2945        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2946        Arc::get_mut(&mut lang_registry)
2947            .unwrap()
2948            .add(Arc::new(Language::new(
2949                LanguageConfig {
2950                    name: "Rust".into(),
2951                    path_suffixes: vec!["rs".to_string()],
2952                    language_server: Some(language_server_config),
2953                    ..Default::default()
2954                },
2955                Some(tree_sitter_rust::language()),
2956            )));
2957
2958        // Connect to a server as 2 clients.
2959        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2960        let client_a = server.create_client(&mut cx_a, "user_a").await;
2961        let client_b = server.create_client(&mut cx_b, "user_b").await;
2962
2963        // Share a project as client A
2964        let project_a = cx_a.update(|cx| {
2965            Project::local(
2966                client_a.clone(),
2967                client_a.user_store.clone(),
2968                lang_registry.clone(),
2969                fs.clone(),
2970                cx,
2971            )
2972        });
2973        let (worktree_a, _) = project_a
2974            .update(&mut cx_a, |p, cx| {
2975                p.find_or_create_local_worktree("/code/crate-1", false, cx)
2976            })
2977            .await
2978            .unwrap();
2979        worktree_a
2980            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2981            .await;
2982        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2983        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2984        project_a
2985            .update(&mut cx_a, |p, cx| p.share(cx))
2986            .await
2987            .unwrap();
2988
2989        // Join the worktree as client B.
2990        let project_b = Project::remote(
2991            project_id,
2992            client_b.clone(),
2993            client_b.user_store.clone(),
2994            lang_registry.clone(),
2995            fs.clone(),
2996            &mut cx_b.to_async(),
2997        )
2998        .await
2999        .unwrap();
3000
3001        // Cause the language server to start.
3002        let _buffer = cx_b
3003            .background()
3004            .spawn(project_b.update(&mut cx_b, |p, cx| {
3005                p.open_buffer((worktree_id, "one.rs"), cx)
3006            }))
3007            .await
3008            .unwrap();
3009
3010        // Request the definition of a symbol as the guest.
3011        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3012        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3013        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3014            #[allow(deprecated)]
3015            Some(vec![lsp::SymbolInformation {
3016                name: "TWO".into(),
3017                location: lsp::Location {
3018                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3019                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3020                },
3021                kind: lsp::SymbolKind::CONSTANT,
3022                tags: None,
3023                container_name: None,
3024                deprecated: None,
3025            }])
3026        });
3027
3028        let symbols = symbols.await.unwrap();
3029        assert_eq!(symbols.len(), 1);
3030        assert_eq!(symbols[0].name, "TWO");
3031
3032        // Open one of the returned symbols.
3033        let buffer_b_2 = project_b
3034            .update(&mut cx_b, |project, cx| {
3035                project.open_buffer_for_symbol(&symbols[0], cx)
3036            })
3037            .await
3038            .unwrap();
3039        buffer_b_2.read_with(&cx_b, |buffer, _| {
3040            assert_eq!(
3041                buffer.file().unwrap().path().as_ref(),
3042                Path::new("../crate-2/two.rs")
3043            );
3044        });
3045
3046        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3047        let mut fake_symbol = symbols[0].clone();
3048        fake_symbol.path = Path::new("/code/secrets").into();
3049        let error = project_b
3050            .update(&mut cx_b, |project, cx| {
3051                project.open_buffer_for_symbol(&fake_symbol, cx)
3052            })
3053            .await
3054            .unwrap_err();
3055        assert!(error.to_string().contains("invalid symbol signature"));
3056    }
3057
3058    #[gpui::test(iterations = 10)]
3059    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3060        mut cx_a: TestAppContext,
3061        mut cx_b: TestAppContext,
3062        mut rng: StdRng,
3063    ) {
3064        cx_a.foreground().forbid_parking();
3065        let mut lang_registry = Arc::new(LanguageRegistry::new());
3066        let fs = FakeFs::new(cx_a.background());
3067        fs.insert_tree(
3068            "/root",
3069            json!({
3070                ".zed.toml": r#"collaborators = ["user_b"]"#,
3071                "a.rs": "const ONE: usize = b::TWO;",
3072                "b.rs": "const TWO: usize = 2",
3073            }),
3074        )
3075        .await;
3076
3077        // Set up a fake language server.
3078        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3079
3080        Arc::get_mut(&mut lang_registry)
3081            .unwrap()
3082            .add(Arc::new(Language::new(
3083                LanguageConfig {
3084                    name: "Rust".into(),
3085                    path_suffixes: vec!["rs".to_string()],
3086                    language_server: Some(language_server_config),
3087                    ..Default::default()
3088                },
3089                Some(tree_sitter_rust::language()),
3090            )));
3091
3092        // Connect to a server as 2 clients.
3093        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3094        let client_a = server.create_client(&mut cx_a, "user_a").await;
3095        let client_b = server.create_client(&mut cx_b, "user_b").await;
3096
3097        // Share a project as client A
3098        let project_a = cx_a.update(|cx| {
3099            Project::local(
3100                client_a.clone(),
3101                client_a.user_store.clone(),
3102                lang_registry.clone(),
3103                fs.clone(),
3104                cx,
3105            )
3106        });
3107
3108        let (worktree_a, _) = project_a
3109            .update(&mut cx_a, |p, cx| {
3110                p.find_or_create_local_worktree("/root", false, cx)
3111            })
3112            .await
3113            .unwrap();
3114        worktree_a
3115            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3116            .await;
3117        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3118        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3119        project_a
3120            .update(&mut cx_a, |p, cx| p.share(cx))
3121            .await
3122            .unwrap();
3123
3124        // Join the worktree as client B.
3125        let project_b = Project::remote(
3126            project_id,
3127            client_b.clone(),
3128            client_b.user_store.clone(),
3129            lang_registry.clone(),
3130            fs.clone(),
3131            &mut cx_b.to_async(),
3132        )
3133        .await
3134        .unwrap();
3135
3136        let buffer_b1 = cx_b
3137            .background()
3138            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3139            .await
3140            .unwrap();
3141
3142        let definitions;
3143        let buffer_b2;
3144        if rng.gen() {
3145            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3146            buffer_b2 =
3147                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3148        } else {
3149            buffer_b2 =
3150                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3151            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3152        }
3153
3154        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3155        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3156            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3157                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3158                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3159            )))
3160        });
3161
3162        let buffer_b2 = buffer_b2.await.unwrap();
3163        let definitions = definitions.await.unwrap();
3164        assert_eq!(definitions.len(), 1);
3165        assert_eq!(definitions[0].buffer, buffer_b2);
3166    }
3167
3168    #[gpui::test(iterations = 10)]
3169    async fn test_collaborating_with_code_actions(
3170        mut cx_a: TestAppContext,
3171        mut cx_b: TestAppContext,
3172    ) {
3173        cx_a.foreground().forbid_parking();
3174        let mut lang_registry = Arc::new(LanguageRegistry::new());
3175        let fs = FakeFs::new(cx_a.background());
3176        let mut path_openers_b = Vec::new();
3177        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3178
3179        // Set up a fake language server.
3180        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3181        Arc::get_mut(&mut lang_registry)
3182            .unwrap()
3183            .add(Arc::new(Language::new(
3184                LanguageConfig {
3185                    name: "Rust".into(),
3186                    path_suffixes: vec!["rs".to_string()],
3187                    language_server: Some(language_server_config),
3188                    ..Default::default()
3189                },
3190                Some(tree_sitter_rust::language()),
3191            )));
3192
3193        // Connect to a server as 2 clients.
3194        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3195        let client_a = server.create_client(&mut cx_a, "user_a").await;
3196        let client_b = server.create_client(&mut cx_b, "user_b").await;
3197
3198        // Share a project as client A
3199        fs.insert_tree(
3200            "/a",
3201            json!({
3202                ".zed.toml": r#"collaborators = ["user_b"]"#,
3203                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3204                "other.rs": "pub fn foo() -> usize { 4 }",
3205            }),
3206        )
3207        .await;
3208        let project_a = cx_a.update(|cx| {
3209            Project::local(
3210                client_a.clone(),
3211                client_a.user_store.clone(),
3212                lang_registry.clone(),
3213                fs.clone(),
3214                cx,
3215            )
3216        });
3217        let (worktree_a, _) = project_a
3218            .update(&mut cx_a, |p, cx| {
3219                p.find_or_create_local_worktree("/a", false, cx)
3220            })
3221            .await
3222            .unwrap();
3223        worktree_a
3224            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3225            .await;
3226        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3227        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3228        project_a
3229            .update(&mut cx_a, |p, cx| p.share(cx))
3230            .await
3231            .unwrap();
3232
3233        // Join the worktree as client B.
3234        let project_b = Project::remote(
3235            project_id,
3236            client_b.clone(),
3237            client_b.user_store.clone(),
3238            lang_registry.clone(),
3239            fs.clone(),
3240            &mut cx_b.to_async(),
3241        )
3242        .await
3243        .unwrap();
3244        let mut params = cx_b.update(WorkspaceParams::test);
3245        params.languages = lang_registry.clone();
3246        params.client = client_b.client.clone();
3247        params.user_store = client_b.user_store.clone();
3248        params.project = project_b;
3249        params.path_openers = path_openers_b.into();
3250
3251        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3252        let editor_b = workspace_b
3253            .update(&mut cx_b, |workspace, cx| {
3254                workspace.open_path((worktree_id, "main.rs").into(), cx)
3255            })
3256            .await
3257            .unwrap()
3258            .downcast::<Editor>()
3259            .unwrap();
3260
3261        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3262        fake_language_server
3263            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3264                assert_eq!(
3265                    params.text_document.uri,
3266                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3267                );
3268                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3269                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3270                None
3271            })
3272            .next()
3273            .await;
3274
3275        // Move cursor to a location that contains code actions.
3276        editor_b.update(&mut cx_b, |editor, cx| {
3277            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3278            cx.focus(&editor_b);
3279        });
3280
3281        fake_language_server
3282            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3283                assert_eq!(
3284                    params.text_document.uri,
3285                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3286                );
3287                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3288                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3289
3290                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3291                    lsp::CodeAction {
3292                        title: "Inline into all callers".to_string(),
3293                        edit: Some(lsp::WorkspaceEdit {
3294                            changes: Some(
3295                                [
3296                                    (
3297                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3298                                        vec![lsp::TextEdit::new(
3299                                            lsp::Range::new(
3300                                                lsp::Position::new(1, 22),
3301                                                lsp::Position::new(1, 34),
3302                                            ),
3303                                            "4".to_string(),
3304                                        )],
3305                                    ),
3306                                    (
3307                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3308                                        vec![lsp::TextEdit::new(
3309                                            lsp::Range::new(
3310                                                lsp::Position::new(0, 0),
3311                                                lsp::Position::new(0, 27),
3312                                            ),
3313                                            "".to_string(),
3314                                        )],
3315                                    ),
3316                                ]
3317                                .into_iter()
3318                                .collect(),
3319                            ),
3320                            ..Default::default()
3321                        }),
3322                        data: Some(json!({
3323                            "codeActionParams": {
3324                                "range": {
3325                                    "start": {"line": 1, "column": 31},
3326                                    "end": {"line": 1, "column": 31},
3327                                }
3328                            }
3329                        })),
3330                        ..Default::default()
3331                    },
3332                )])
3333            })
3334            .next()
3335            .await;
3336
3337        // Toggle code actions and wait for them to display.
3338        editor_b.update(&mut cx_b, |editor, cx| {
3339            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3340        });
3341        editor_b
3342            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3343            .await;
3344
3345        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3346
3347        // Confirming the code action will trigger a resolve request.
3348        let confirm_action = workspace_b
3349            .update(&mut cx_b, |workspace, cx| {
3350                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3351            })
3352            .unwrap();
3353        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3354            lsp::CodeAction {
3355                title: "Inline into all callers".to_string(),
3356                edit: Some(lsp::WorkspaceEdit {
3357                    changes: Some(
3358                        [
3359                            (
3360                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3361                                vec![lsp::TextEdit::new(
3362                                    lsp::Range::new(
3363                                        lsp::Position::new(1, 22),
3364                                        lsp::Position::new(1, 34),
3365                                    ),
3366                                    "4".to_string(),
3367                                )],
3368                            ),
3369                            (
3370                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3371                                vec![lsp::TextEdit::new(
3372                                    lsp::Range::new(
3373                                        lsp::Position::new(0, 0),
3374                                        lsp::Position::new(0, 27),
3375                                    ),
3376                                    "".to_string(),
3377                                )],
3378                            ),
3379                        ]
3380                        .into_iter()
3381                        .collect(),
3382                    ),
3383                    ..Default::default()
3384                }),
3385                ..Default::default()
3386            }
3387        });
3388
3389        // After the action is confirmed, an editor containing both modified files is opened.
3390        confirm_action.await.unwrap();
3391        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3392            workspace
3393                .active_item(cx)
3394                .unwrap()
3395                .downcast::<Editor>()
3396                .unwrap()
3397        });
3398        code_action_editor.update(&mut cx_b, |editor, cx| {
3399            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3400            editor.undo(&Undo, cx);
3401            assert_eq!(
3402                editor.text(cx),
3403                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3404            );
3405            editor.redo(&Redo, cx);
3406            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3407        });
3408    }
3409
3410    #[gpui::test(iterations = 10)]
3411    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3412        cx_a.foreground().forbid_parking();
3413        let mut lang_registry = Arc::new(LanguageRegistry::new());
3414        let fs = FakeFs::new(cx_a.background());
3415        let mut path_openers_b = Vec::new();
3416        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3417
3418        // Set up a fake language server.
3419        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3420        Arc::get_mut(&mut lang_registry)
3421            .unwrap()
3422            .add(Arc::new(Language::new(
3423                LanguageConfig {
3424                    name: "Rust".into(),
3425                    path_suffixes: vec!["rs".to_string()],
3426                    language_server: Some(language_server_config),
3427                    ..Default::default()
3428                },
3429                Some(tree_sitter_rust::language()),
3430            )));
3431
3432        // Connect to a server as 2 clients.
3433        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3434        let client_a = server.create_client(&mut cx_a, "user_a").await;
3435        let client_b = server.create_client(&mut cx_b, "user_b").await;
3436
3437        // Share a project as client A
3438        fs.insert_tree(
3439            "/dir",
3440            json!({
3441                ".zed.toml": r#"collaborators = ["user_b"]"#,
3442                "one.rs": "const ONE: usize = 1;",
3443                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3444            }),
3445        )
3446        .await;
3447        let project_a = cx_a.update(|cx| {
3448            Project::local(
3449                client_a.clone(),
3450                client_a.user_store.clone(),
3451                lang_registry.clone(),
3452                fs.clone(),
3453                cx,
3454            )
3455        });
3456        let (worktree_a, _) = project_a
3457            .update(&mut cx_a, |p, cx| {
3458                p.find_or_create_local_worktree("/dir", false, cx)
3459            })
3460            .await
3461            .unwrap();
3462        worktree_a
3463            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3464            .await;
3465        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3466        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3467        project_a
3468            .update(&mut cx_a, |p, cx| p.share(cx))
3469            .await
3470            .unwrap();
3471
3472        // Join the worktree as client B.
3473        let project_b = Project::remote(
3474            project_id,
3475            client_b.clone(),
3476            client_b.user_store.clone(),
3477            lang_registry.clone(),
3478            fs.clone(),
3479            &mut cx_b.to_async(),
3480        )
3481        .await
3482        .unwrap();
3483        let mut params = cx_b.update(WorkspaceParams::test);
3484        params.languages = lang_registry.clone();
3485        params.client = client_b.client.clone();
3486        params.user_store = client_b.user_store.clone();
3487        params.project = project_b;
3488        params.path_openers = path_openers_b.into();
3489
3490        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3491        let editor_b = workspace_b
3492            .update(&mut cx_b, |workspace, cx| {
3493                workspace.open_path((worktree_id, "one.rs").into(), cx)
3494            })
3495            .await
3496            .unwrap()
3497            .downcast::<Editor>()
3498            .unwrap();
3499        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3500
3501        // Move cursor to a location that can be renamed.
3502        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3503            editor.select_ranges([7..7], None, cx);
3504            editor.rename(&Rename, cx).unwrap()
3505        });
3506
3507        fake_language_server
3508            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3509                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3510                assert_eq!(params.position, lsp::Position::new(0, 7));
3511                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3512                    lsp::Position::new(0, 6),
3513                    lsp::Position::new(0, 9),
3514                )))
3515            })
3516            .next()
3517            .await
3518            .unwrap();
3519        prepare_rename.await.unwrap();
3520        editor_b.update(&mut cx_b, |editor, cx| {
3521            let rename = editor.pending_rename().unwrap();
3522            let buffer = editor.buffer().read(cx).snapshot(cx);
3523            assert_eq!(
3524                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3525                6..9
3526            );
3527            rename.editor.update(cx, |rename_editor, cx| {
3528                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3529                    rename_buffer.edit([0..3], "THREE", cx);
3530                });
3531            });
3532        });
3533
3534        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3535            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3536        });
3537        fake_language_server
3538            .handle_request::<lsp::request::Rename, _>(|params, _| {
3539                assert_eq!(
3540                    params.text_document_position.text_document.uri.as_str(),
3541                    "file:///dir/one.rs"
3542                );
3543                assert_eq!(
3544                    params.text_document_position.position,
3545                    lsp::Position::new(0, 6)
3546                );
3547                assert_eq!(params.new_name, "THREE");
3548                Some(lsp::WorkspaceEdit {
3549                    changes: Some(
3550                        [
3551                            (
3552                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3553                                vec![lsp::TextEdit::new(
3554                                    lsp::Range::new(
3555                                        lsp::Position::new(0, 6),
3556                                        lsp::Position::new(0, 9),
3557                                    ),
3558                                    "THREE".to_string(),
3559                                )],
3560                            ),
3561                            (
3562                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3563                                vec![
3564                                    lsp::TextEdit::new(
3565                                        lsp::Range::new(
3566                                            lsp::Position::new(0, 24),
3567                                            lsp::Position::new(0, 27),
3568                                        ),
3569                                        "THREE".to_string(),
3570                                    ),
3571                                    lsp::TextEdit::new(
3572                                        lsp::Range::new(
3573                                            lsp::Position::new(0, 35),
3574                                            lsp::Position::new(0, 38),
3575                                        ),
3576                                        "THREE".to_string(),
3577                                    ),
3578                                ],
3579                            ),
3580                        ]
3581                        .into_iter()
3582                        .collect(),
3583                    ),
3584                    ..Default::default()
3585                })
3586            })
3587            .next()
3588            .await
3589            .unwrap();
3590        confirm_rename.await.unwrap();
3591
3592        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3593            workspace
3594                .active_item(cx)
3595                .unwrap()
3596                .downcast::<Editor>()
3597                .unwrap()
3598        });
3599        rename_editor.update(&mut cx_b, |editor, cx| {
3600            assert_eq!(
3601                editor.text(cx),
3602                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3603            );
3604            editor.undo(&Undo, cx);
3605            assert_eq!(
3606                editor.text(cx),
3607                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3608            );
3609            editor.redo(&Redo, cx);
3610            assert_eq!(
3611                editor.text(cx),
3612                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3613            );
3614        });
3615
3616        // Ensure temporary rename edits cannot be undone/redone.
3617        editor_b.update(&mut cx_b, |editor, cx| {
3618            editor.undo(&Undo, cx);
3619            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3620            editor.undo(&Undo, cx);
3621            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3622            editor.redo(&Redo, cx);
3623            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3624        })
3625    }
3626
3627    #[gpui::test(iterations = 10)]
3628    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3629        cx_a.foreground().forbid_parking();
3630
3631        // Connect to a server as 2 clients.
3632        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3633        let client_a = server.create_client(&mut cx_a, "user_a").await;
3634        let client_b = server.create_client(&mut cx_b, "user_b").await;
3635
3636        // Create an org that includes these 2 users.
3637        let db = &server.app_state.db;
3638        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3639        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3640            .await
3641            .unwrap();
3642        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3643            .await
3644            .unwrap();
3645
3646        // Create a channel that includes all the users.
3647        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3648        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3649            .await
3650            .unwrap();
3651        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3652            .await
3653            .unwrap();
3654        db.create_channel_message(
3655            channel_id,
3656            client_b.current_user_id(&cx_b),
3657            "hello A, it's B.",
3658            OffsetDateTime::now_utc(),
3659            1,
3660        )
3661        .await
3662        .unwrap();
3663
3664        let channels_a = cx_a
3665            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3666        channels_a
3667            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3668            .await;
3669        channels_a.read_with(&cx_a, |list, _| {
3670            assert_eq!(
3671                list.available_channels().unwrap(),
3672                &[ChannelDetails {
3673                    id: channel_id.to_proto(),
3674                    name: "test-channel".to_string()
3675                }]
3676            )
3677        });
3678        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3679            this.get_channel(channel_id.to_proto(), cx).unwrap()
3680        });
3681        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3682        channel_a
3683            .condition(&cx_a, |channel, _| {
3684                channel_messages(channel)
3685                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3686            })
3687            .await;
3688
3689        let channels_b = cx_b
3690            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3691        channels_b
3692            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3693            .await;
3694        channels_b.read_with(&cx_b, |list, _| {
3695            assert_eq!(
3696                list.available_channels().unwrap(),
3697                &[ChannelDetails {
3698                    id: channel_id.to_proto(),
3699                    name: "test-channel".to_string()
3700                }]
3701            )
3702        });
3703
3704        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3705            this.get_channel(channel_id.to_proto(), cx).unwrap()
3706        });
3707        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3708        channel_b
3709            .condition(&cx_b, |channel, _| {
3710                channel_messages(channel)
3711                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3712            })
3713            .await;
3714
3715        channel_a
3716            .update(&mut cx_a, |channel, cx| {
3717                channel
3718                    .send_message("oh, hi B.".to_string(), cx)
3719                    .unwrap()
3720                    .detach();
3721                let task = channel.send_message("sup".to_string(), cx).unwrap();
3722                assert_eq!(
3723                    channel_messages(channel),
3724                    &[
3725                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3726                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3727                        ("user_a".to_string(), "sup".to_string(), true)
3728                    ]
3729                );
3730                task
3731            })
3732            .await
3733            .unwrap();
3734
3735        channel_b
3736            .condition(&cx_b, |channel, _| {
3737                channel_messages(channel)
3738                    == [
3739                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3740                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3741                        ("user_a".to_string(), "sup".to_string(), false),
3742                    ]
3743            })
3744            .await;
3745
3746        assert_eq!(
3747            server
3748                .state()
3749                .await
3750                .channel(channel_id)
3751                .unwrap()
3752                .connection_ids
3753                .len(),
3754            2
3755        );
3756        cx_b.update(|_| drop(channel_b));
3757        server
3758            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3759            .await;
3760
3761        cx_a.update(|_| drop(channel_a));
3762        server
3763            .condition(|state| state.channel(channel_id).is_none())
3764            .await;
3765    }
3766
3767    #[gpui::test(iterations = 10)]
3768    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3769        cx_a.foreground().forbid_parking();
3770
3771        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3772        let client_a = server.create_client(&mut cx_a, "user_a").await;
3773
3774        let db = &server.app_state.db;
3775        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3776        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3777        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3778            .await
3779            .unwrap();
3780        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3781            .await
3782            .unwrap();
3783
3784        let channels_a = cx_a
3785            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3786        channels_a
3787            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3788            .await;
3789        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3790            this.get_channel(channel_id.to_proto(), cx).unwrap()
3791        });
3792
3793        // Messages aren't allowed to be too long.
3794        channel_a
3795            .update(&mut cx_a, |channel, cx| {
3796                let long_body = "this is long.\n".repeat(1024);
3797                channel.send_message(long_body, cx).unwrap()
3798            })
3799            .await
3800            .unwrap_err();
3801
3802        // Messages aren't allowed to be blank.
3803        channel_a.update(&mut cx_a, |channel, cx| {
3804            channel.send_message(String::new(), cx).unwrap_err()
3805        });
3806
3807        // Leading and trailing whitespace are trimmed.
3808        channel_a
3809            .update(&mut cx_a, |channel, cx| {
3810                channel
3811                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3812                    .unwrap()
3813            })
3814            .await
3815            .unwrap();
3816        assert_eq!(
3817            db.get_channel_messages(channel_id, 10, None)
3818                .await
3819                .unwrap()
3820                .iter()
3821                .map(|m| &m.body)
3822                .collect::<Vec<_>>(),
3823            &["surrounded by whitespace"]
3824        );
3825    }
3826
3827    #[gpui::test(iterations = 10)]
3828    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3829        cx_a.foreground().forbid_parking();
3830
3831        // Connect to a server as 2 clients.
3832        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3833        let client_a = server.create_client(&mut cx_a, "user_a").await;
3834        let client_b = server.create_client(&mut cx_b, "user_b").await;
3835        let mut status_b = client_b.status();
3836
3837        // Create an org that includes these 2 users.
3838        let db = &server.app_state.db;
3839        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3840        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3841            .await
3842            .unwrap();
3843        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3844            .await
3845            .unwrap();
3846
3847        // Create a channel that includes all the users.
3848        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3849        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3850            .await
3851            .unwrap();
3852        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3853            .await
3854            .unwrap();
3855        db.create_channel_message(
3856            channel_id,
3857            client_b.current_user_id(&cx_b),
3858            "hello A, it's B.",
3859            OffsetDateTime::now_utc(),
3860            2,
3861        )
3862        .await
3863        .unwrap();
3864
3865        let channels_a = cx_a
3866            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3867        channels_a
3868            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3869            .await;
3870
3871        channels_a.read_with(&cx_a, |list, _| {
3872            assert_eq!(
3873                list.available_channels().unwrap(),
3874                &[ChannelDetails {
3875                    id: channel_id.to_proto(),
3876                    name: "test-channel".to_string()
3877                }]
3878            )
3879        });
3880        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3881            this.get_channel(channel_id.to_proto(), cx).unwrap()
3882        });
3883        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3884        channel_a
3885            .condition(&cx_a, |channel, _| {
3886                channel_messages(channel)
3887                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3888            })
3889            .await;
3890
3891        let channels_b = cx_b
3892            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3893        channels_b
3894            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3895            .await;
3896        channels_b.read_with(&cx_b, |list, _| {
3897            assert_eq!(
3898                list.available_channels().unwrap(),
3899                &[ChannelDetails {
3900                    id: channel_id.to_proto(),
3901                    name: "test-channel".to_string()
3902                }]
3903            )
3904        });
3905
3906        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3907            this.get_channel(channel_id.to_proto(), cx).unwrap()
3908        });
3909        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3910        channel_b
3911            .condition(&cx_b, |channel, _| {
3912                channel_messages(channel)
3913                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3914            })
3915            .await;
3916
3917        // Disconnect client B, ensuring we can still access its cached channel data.
3918        server.forbid_connections();
3919        server.disconnect_client(client_b.current_user_id(&cx_b));
3920        while !matches!(
3921            status_b.next().await,
3922            Some(client::Status::ReconnectionError { .. })
3923        ) {}
3924
3925        channels_b.read_with(&cx_b, |channels, _| {
3926            assert_eq!(
3927                channels.available_channels().unwrap(),
3928                [ChannelDetails {
3929                    id: channel_id.to_proto(),
3930                    name: "test-channel".to_string()
3931                }]
3932            )
3933        });
3934        channel_b.read_with(&cx_b, |channel, _| {
3935            assert_eq!(
3936                channel_messages(channel),
3937                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3938            )
3939        });
3940
3941        // Send a message from client B while it is disconnected.
3942        channel_b
3943            .update(&mut cx_b, |channel, cx| {
3944                let task = channel
3945                    .send_message("can you see this?".to_string(), cx)
3946                    .unwrap();
3947                assert_eq!(
3948                    channel_messages(channel),
3949                    &[
3950                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3951                        ("user_b".to_string(), "can you see this?".to_string(), true)
3952                    ]
3953                );
3954                task
3955            })
3956            .await
3957            .unwrap_err();
3958
3959        // Send a message from client A while B is disconnected.
3960        channel_a
3961            .update(&mut cx_a, |channel, cx| {
3962                channel
3963                    .send_message("oh, hi B.".to_string(), cx)
3964                    .unwrap()
3965                    .detach();
3966                let task = channel.send_message("sup".to_string(), cx).unwrap();
3967                assert_eq!(
3968                    channel_messages(channel),
3969                    &[
3970                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3971                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3972                        ("user_a".to_string(), "sup".to_string(), true)
3973                    ]
3974                );
3975                task
3976            })
3977            .await
3978            .unwrap();
3979
3980        // Give client B a chance to reconnect.
3981        server.allow_connections();
3982        cx_b.foreground().advance_clock(Duration::from_secs(10));
3983
3984        // Verify that B sees the new messages upon reconnection, as well as the message client B
3985        // sent while offline.
3986        channel_b
3987            .condition(&cx_b, |channel, _| {
3988                channel_messages(channel)
3989                    == [
3990                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3991                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3992                        ("user_a".to_string(), "sup".to_string(), false),
3993                        ("user_b".to_string(), "can you see this?".to_string(), false),
3994                    ]
3995            })
3996            .await;
3997
3998        // Ensure client A and B can communicate normally after reconnection.
3999        channel_a
4000            .update(&mut cx_a, |channel, cx| {
4001                channel.send_message("you online?".to_string(), cx).unwrap()
4002            })
4003            .await
4004            .unwrap();
4005        channel_b
4006            .condition(&cx_b, |channel, _| {
4007                channel_messages(channel)
4008                    == [
4009                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4010                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4011                        ("user_a".to_string(), "sup".to_string(), false),
4012                        ("user_b".to_string(), "can you see this?".to_string(), false),
4013                        ("user_a".to_string(), "you online?".to_string(), false),
4014                    ]
4015            })
4016            .await;
4017
4018        channel_b
4019            .update(&mut cx_b, |channel, cx| {
4020                channel.send_message("yep".to_string(), cx).unwrap()
4021            })
4022            .await
4023            .unwrap();
4024        channel_a
4025            .condition(&cx_a, |channel, _| {
4026                channel_messages(channel)
4027                    == [
4028                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4029                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4030                        ("user_a".to_string(), "sup".to_string(), false),
4031                        ("user_b".to_string(), "can you see this?".to_string(), false),
4032                        ("user_a".to_string(), "you online?".to_string(), false),
4033                        ("user_b".to_string(), "yep".to_string(), false),
4034                    ]
4035            })
4036            .await;
4037    }
4038
4039    #[gpui::test(iterations = 10)]
4040    async fn test_contacts(
4041        mut cx_a: TestAppContext,
4042        mut cx_b: TestAppContext,
4043        mut cx_c: TestAppContext,
4044    ) {
4045        cx_a.foreground().forbid_parking();
4046        let lang_registry = Arc::new(LanguageRegistry::new());
4047        let fs = FakeFs::new(cx_a.background());
4048
4049        // Connect to a server as 3 clients.
4050        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4051        let client_a = server.create_client(&mut cx_a, "user_a").await;
4052        let client_b = server.create_client(&mut cx_b, "user_b").await;
4053        let client_c = server.create_client(&mut cx_c, "user_c").await;
4054
4055        // Share a worktree as client A.
4056        fs.insert_tree(
4057            "/a",
4058            json!({
4059                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4060            }),
4061        )
4062        .await;
4063
4064        let project_a = cx_a.update(|cx| {
4065            Project::local(
4066                client_a.clone(),
4067                client_a.user_store.clone(),
4068                lang_registry.clone(),
4069                fs.clone(),
4070                cx,
4071            )
4072        });
4073        let (worktree_a, _) = project_a
4074            .update(&mut cx_a, |p, cx| {
4075                p.find_or_create_local_worktree("/a", false, cx)
4076            })
4077            .await
4078            .unwrap();
4079        worktree_a
4080            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4081            .await;
4082
4083        client_a
4084            .user_store
4085            .condition(&cx_a, |user_store, _| {
4086                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4087            })
4088            .await;
4089        client_b
4090            .user_store
4091            .condition(&cx_b, |user_store, _| {
4092                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4093            })
4094            .await;
4095        client_c
4096            .user_store
4097            .condition(&cx_c, |user_store, _| {
4098                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4099            })
4100            .await;
4101
4102        let project_id = project_a
4103            .update(&mut cx_a, |project, _| project.next_remote_id())
4104            .await;
4105        project_a
4106            .update(&mut cx_a, |project, cx| project.share(cx))
4107            .await
4108            .unwrap();
4109
4110        let _project_b = Project::remote(
4111            project_id,
4112            client_b.clone(),
4113            client_b.user_store.clone(),
4114            lang_registry.clone(),
4115            fs.clone(),
4116            &mut cx_b.to_async(),
4117        )
4118        .await
4119        .unwrap();
4120
4121        client_a
4122            .user_store
4123            .condition(&cx_a, |user_store, _| {
4124                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4125            })
4126            .await;
4127        client_b
4128            .user_store
4129            .condition(&cx_b, |user_store, _| {
4130                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4131            })
4132            .await;
4133        client_c
4134            .user_store
4135            .condition(&cx_c, |user_store, _| {
4136                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4137            })
4138            .await;
4139
4140        project_a
4141            .condition(&cx_a, |project, _| {
4142                project.collaborators().contains_key(&client_b.peer_id)
4143            })
4144            .await;
4145
4146        cx_a.update(move |_| drop(project_a));
4147        client_a
4148            .user_store
4149            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4150            .await;
4151        client_b
4152            .user_store
4153            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4154            .await;
4155        client_c
4156            .user_store
4157            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4158            .await;
4159
4160        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4161            user_store
4162                .contacts()
4163                .iter()
4164                .map(|contact| {
4165                    let worktrees = contact
4166                        .projects
4167                        .iter()
4168                        .map(|p| {
4169                            (
4170                                p.worktree_root_names[0].as_str(),
4171                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4172                            )
4173                        })
4174                        .collect();
4175                    (contact.user.github_login.as_str(), worktrees)
4176                })
4177                .collect()
4178        }
4179    }
4180
4181    #[gpui::test(iterations = 100)]
4182    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4183        cx.foreground().forbid_parking();
4184        let max_peers = env::var("MAX_PEERS")
4185            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4186            .unwrap_or(5);
4187        let max_operations = env::var("OPERATIONS")
4188            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4189            .unwrap_or(10);
4190
4191        let rng = Arc::new(Mutex::new(rng));
4192
4193        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4194        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4195
4196        let fs = FakeFs::new(cx.background());
4197        fs.insert_tree(
4198            "/_collab",
4199            json!({
4200                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4201            }),
4202        )
4203        .await;
4204
4205        let operations = Rc::new(Cell::new(0));
4206        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4207        let mut clients = Vec::new();
4208
4209        let mut next_entity_id = 100000;
4210        let mut host_cx = TestAppContext::new(
4211            cx.foreground_platform(),
4212            cx.platform(),
4213            cx.foreground(),
4214            cx.background(),
4215            cx.font_cache(),
4216            cx.leak_detector(),
4217            next_entity_id,
4218        );
4219        let host = server.create_client(&mut host_cx, "host").await;
4220        let host_project = host_cx.update(|cx| {
4221            Project::local(
4222                host.client.clone(),
4223                host.user_store.clone(),
4224                Arc::new(LanguageRegistry::new()),
4225                fs.clone(),
4226                cx,
4227            )
4228        });
4229        let host_project_id = host_project
4230            .update(&mut host_cx, |p, _| p.next_remote_id())
4231            .await;
4232
4233        let (collab_worktree, _) = host_project
4234            .update(&mut host_cx, |project, cx| {
4235                project.find_or_create_local_worktree("/_collab", false, cx)
4236            })
4237            .await
4238            .unwrap();
4239        collab_worktree
4240            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4241            .await;
4242        host_project
4243            .update(&mut host_cx, |project, cx| project.share(cx))
4244            .await
4245            .unwrap();
4246
4247        clients.push(cx.foreground().spawn(host.simulate_host(
4248            host_project.clone(),
4249            language_server_config,
4250            operations.clone(),
4251            max_operations,
4252            rng.clone(),
4253            host_cx,
4254        )));
4255
4256        while operations.get() < max_operations {
4257            cx.background().simulate_random_delay().await;
4258            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4259                operations.set(operations.get() + 1);
4260
4261                let guest_id = clients.len();
4262                log::info!("Adding guest {}", guest_id);
4263                next_entity_id += 100000;
4264                let mut guest_cx = TestAppContext::new(
4265                    cx.foreground_platform(),
4266                    cx.platform(),
4267                    cx.foreground(),
4268                    cx.background(),
4269                    cx.font_cache(),
4270                    cx.leak_detector(),
4271                    next_entity_id,
4272                );
4273                let guest = server
4274                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4275                    .await;
4276                let guest_project = Project::remote(
4277                    host_project_id,
4278                    guest.client.clone(),
4279                    guest.user_store.clone(),
4280                    guest_lang_registry.clone(),
4281                    FakeFs::new(cx.background()),
4282                    &mut guest_cx.to_async(),
4283                )
4284                .await
4285                .unwrap();
4286                clients.push(cx.foreground().spawn(guest.simulate_guest(
4287                    guest_id,
4288                    guest_project,
4289                    operations.clone(),
4290                    max_operations,
4291                    rng.clone(),
4292                    guest_cx,
4293                )));
4294
4295                log::info!("Guest {} added", guest_id);
4296            }
4297        }
4298
4299        let mut clients = futures::future::join_all(clients).await;
4300        cx.foreground().run_until_parked();
4301
4302        let (_, host_cx) = clients.remove(0);
4303        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4304            project
4305                .worktrees(cx)
4306                .map(|worktree| {
4307                    let snapshot = worktree.read(cx).snapshot();
4308                    (snapshot.id(), snapshot)
4309                })
4310                .collect::<BTreeMap<_, _>>()
4311        });
4312
4313        for (guest_client, guest_cx) in clients.iter() {
4314            let guest_id = guest_client.client.id();
4315            let worktree_snapshots =
4316                guest_client
4317                    .project
4318                    .as_ref()
4319                    .unwrap()
4320                    .read_with(guest_cx, |project, cx| {
4321                        project
4322                            .worktrees(cx)
4323                            .map(|worktree| {
4324                                let worktree = worktree.read(cx);
4325                                (worktree.id(), worktree.snapshot())
4326                            })
4327                            .collect::<BTreeMap<_, _>>()
4328                    });
4329
4330            assert_eq!(
4331                worktree_snapshots.keys().collect::<Vec<_>>(),
4332                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4333                "guest {} has different worktrees than the host",
4334                guest_id
4335            );
4336            for (id, host_snapshot) in &host_worktree_snapshots {
4337                let guest_snapshot = &worktree_snapshots[id];
4338                assert_eq!(
4339                    guest_snapshot.root_name(),
4340                    host_snapshot.root_name(),
4341                    "guest {} has different root name than the host for worktree {}",
4342                    guest_id,
4343                    id
4344                );
4345                assert_eq!(
4346                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4347                    host_snapshot.entries(false).collect::<Vec<_>>(),
4348                    "guest {} has different snapshot than the host for worktree {}",
4349                    guest_id,
4350                    id
4351                );
4352            }
4353
4354            guest_client
4355                .project
4356                .as_ref()
4357                .unwrap()
4358                .read_with(guest_cx, |project, cx| {
4359                    assert!(
4360                        !project.has_deferred_operations(cx),
4361                        "guest {} has deferred operations",
4362                        guest_id,
4363                    );
4364                });
4365
4366            for guest_buffer in &guest_client.buffers {
4367                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4368                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4369                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4370                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4371                        guest_id, guest_client.peer_id, buffer_id
4372                    ))
4373                });
4374                assert_eq!(
4375                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4376                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4377                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4378                    guest_id,
4379                    buffer_id,
4380                    host_buffer
4381                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4382                );
4383            }
4384        }
4385    }
4386
4387    struct TestServer {
4388        peer: Arc<Peer>,
4389        app_state: Arc<AppState>,
4390        server: Arc<Server>,
4391        foreground: Rc<executor::Foreground>,
4392        notifications: mpsc::UnboundedReceiver<()>,
4393        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4394        forbid_connections: Arc<AtomicBool>,
4395        _test_db: TestDb,
4396    }
4397
4398    impl TestServer {
4399        async fn start(
4400            foreground: Rc<executor::Foreground>,
4401            background: Arc<executor::Background>,
4402        ) -> Self {
4403            let test_db = TestDb::fake(background);
4404            let app_state = Self::build_app_state(&test_db).await;
4405            let peer = Peer::new();
4406            let notifications = mpsc::unbounded();
4407            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4408            Self {
4409                peer,
4410                app_state,
4411                server,
4412                foreground,
4413                notifications: notifications.1,
4414                connection_killers: Default::default(),
4415                forbid_connections: Default::default(),
4416                _test_db: test_db,
4417            }
4418        }
4419
4420        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4421            let http = FakeHttpClient::with_404_response();
4422            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4423            let client_name = name.to_string();
4424            let mut client = Client::new(http.clone());
4425            let server = self.server.clone();
4426            let connection_killers = self.connection_killers.clone();
4427            let forbid_connections = self.forbid_connections.clone();
4428            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4429
4430            Arc::get_mut(&mut client)
4431                .unwrap()
4432                .override_authenticate(move |cx| {
4433                    cx.spawn(|_| async move {
4434                        let access_token = "the-token".to_string();
4435                        Ok(Credentials {
4436                            user_id: user_id.0 as u64,
4437                            access_token,
4438                        })
4439                    })
4440                })
4441                .override_establish_connection(move |credentials, cx| {
4442                    assert_eq!(credentials.user_id, user_id.0 as u64);
4443                    assert_eq!(credentials.access_token, "the-token");
4444
4445                    let server = server.clone();
4446                    let connection_killers = connection_killers.clone();
4447                    let forbid_connections = forbid_connections.clone();
4448                    let client_name = client_name.clone();
4449                    let connection_id_tx = connection_id_tx.clone();
4450                    cx.spawn(move |cx| async move {
4451                        if forbid_connections.load(SeqCst) {
4452                            Err(EstablishConnectionError::other(anyhow!(
4453                                "server is forbidding connections"
4454                            )))
4455                        } else {
4456                            let (client_conn, server_conn, kill_conn) =
4457                                Connection::in_memory(cx.background());
4458                            connection_killers.lock().insert(user_id, kill_conn);
4459                            cx.background()
4460                                .spawn(server.handle_connection(
4461                                    server_conn,
4462                                    client_name,
4463                                    user_id,
4464                                    Some(connection_id_tx),
4465                                    cx.background(),
4466                                ))
4467                                .detach();
4468                            Ok(client_conn)
4469                        }
4470                    })
4471                });
4472
4473            client
4474                .authenticate_and_connect(&cx.to_async())
4475                .await
4476                .unwrap();
4477
4478            Channel::init(&client);
4479            Project::init(&client);
4480
4481            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4482            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4483            let mut authed_user =
4484                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4485            while authed_user.next().await.unwrap().is_none() {}
4486
4487            TestClient {
4488                client,
4489                peer_id,
4490                user_store,
4491                project: Default::default(),
4492                buffers: Default::default(),
4493            }
4494        }
4495
4496        fn disconnect_client(&self, user_id: UserId) {
4497            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4498                let _ = kill_conn.try_send(Some(()));
4499            }
4500        }
4501
4502        fn forbid_connections(&self) {
4503            self.forbid_connections.store(true, SeqCst);
4504        }
4505
4506        fn allow_connections(&self) {
4507            self.forbid_connections.store(false, SeqCst);
4508        }
4509
4510        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4511            let mut config = Config::default();
4512            config.session_secret = "a".repeat(32);
4513            config.database_url = test_db.url.clone();
4514            let github_client = github::AppClient::test();
4515            Arc::new(AppState {
4516                db: test_db.db().clone(),
4517                handlebars: Default::default(),
4518                auth_client: auth::build_client("", ""),
4519                repo_client: github::RepoClient::test(&github_client),
4520                github_client,
4521                config,
4522            })
4523        }
4524
4525        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4526            self.server.store.read()
4527        }
4528
4529        async fn condition<F>(&mut self, mut predicate: F)
4530        where
4531            F: FnMut(&Store) -> bool,
4532        {
4533            async_std::future::timeout(Duration::from_millis(500), async {
4534                while !(predicate)(&*self.server.store.read()) {
4535                    self.foreground.start_waiting();
4536                    self.notifications.next().await;
4537                    self.foreground.finish_waiting();
4538                }
4539            })
4540            .await
4541            .expect("condition timed out");
4542        }
4543    }
4544
4545    impl Drop for TestServer {
4546        fn drop(&mut self) {
4547            self.peer.reset();
4548        }
4549    }
4550
4551    struct TestClient {
4552        client: Arc<Client>,
4553        pub peer_id: PeerId,
4554        pub user_store: ModelHandle<UserStore>,
4555        project: Option<ModelHandle<Project>>,
4556        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4557    }
4558
4559    impl Deref for TestClient {
4560        type Target = Arc<Client>;
4561
4562        fn deref(&self) -> &Self::Target {
4563            &self.client
4564        }
4565    }
4566
4567    impl TestClient {
4568        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4569            UserId::from_proto(
4570                self.user_store
4571                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4572            )
4573        }
4574
4575        fn simulate_host(
4576            mut self,
4577            project: ModelHandle<Project>,
4578            mut language_server_config: LanguageServerConfig,
4579            operations: Rc<Cell<usize>>,
4580            max_operations: usize,
4581            rng: Arc<Mutex<StdRng>>,
4582            mut cx: TestAppContext,
4583        ) -> impl Future<Output = (Self, TestAppContext)> {
4584            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4585
4586            // Set up a fake language server.
4587            language_server_config.set_fake_initializer({
4588                let rng = rng.clone();
4589                let files = files.clone();
4590                let project = project.clone();
4591                move |fake_server| {
4592                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4593                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4594                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4595                                range: lsp::Range::new(
4596                                    lsp::Position::new(0, 0),
4597                                    lsp::Position::new(0, 0),
4598                                ),
4599                                new_text: "the-new-text".to_string(),
4600                            })),
4601                            ..Default::default()
4602                        }]))
4603                    });
4604
4605                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4606                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4607                            lsp::CodeAction {
4608                                title: "the-code-action".to_string(),
4609                                ..Default::default()
4610                            },
4611                        )])
4612                    });
4613
4614                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4615                        |params, _| {
4616                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4617                                params.position,
4618                                params.position,
4619                            )))
4620                        },
4621                    );
4622
4623                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4624                        let files = files.clone();
4625                        let rng = rng.clone();
4626                        move |_, _| {
4627                            let files = files.lock();
4628                            let mut rng = rng.lock();
4629                            let count = rng.gen_range::<usize, _>(1..3);
4630                            let files = (0..count)
4631                                .map(|_| files.choose(&mut *rng).unwrap())
4632                                .collect::<Vec<_>>();
4633                            log::info!("LSP: Returning definitions in files {:?}", &files);
4634                            Some(lsp::GotoDefinitionResponse::Array(
4635                                files
4636                                    .into_iter()
4637                                    .map(|file| lsp::Location {
4638                                        uri: lsp::Url::from_file_path(file).unwrap(),
4639                                        range: Default::default(),
4640                                    })
4641                                    .collect(),
4642                            ))
4643                        }
4644                    });
4645
4646                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4647                        let rng = rng.clone();
4648                        let project = project.clone();
4649                        move |params, mut cx| {
4650                            project.update(&mut cx, |project, cx| {
4651                                let path = params
4652                                    .text_document_position_params
4653                                    .text_document
4654                                    .uri
4655                                    .to_file_path()
4656                                    .unwrap();
4657                                let (worktree, relative_path) =
4658                                    project.find_local_worktree(&path, cx)?;
4659                                let project_path =
4660                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4661                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4662
4663                                let mut highlights = Vec::new();
4664                                let highlight_count = rng.lock().gen_range(1..=5);
4665                                let mut prev_end = 0;
4666                                for _ in 0..highlight_count {
4667                                    let range =
4668                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4669                                    let start =
4670                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4671                                    let end =
4672                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4673                                    highlights.push(lsp::DocumentHighlight {
4674                                        range: lsp::Range::new(start, end),
4675                                        kind: Some(lsp::DocumentHighlightKind::READ),
4676                                    });
4677                                    prev_end = range.end;
4678                                }
4679                                Some(highlights)
4680                            })
4681                        }
4682                    });
4683                }
4684            });
4685
4686            project.update(&mut cx, |project, _| {
4687                project.languages().add(Arc::new(Language::new(
4688                    LanguageConfig {
4689                        name: "Rust".into(),
4690                        path_suffixes: vec!["rs".to_string()],
4691                        language_server: Some(language_server_config),
4692                        ..Default::default()
4693                    },
4694                    None,
4695                )));
4696            });
4697
4698            async move {
4699                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4700                while operations.get() < max_operations {
4701                    operations.set(operations.get() + 1);
4702
4703                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4704                    match distribution {
4705                        0..=20 if !files.lock().is_empty() => {
4706                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4707                            let mut path = path.as_path();
4708                            while let Some(parent_path) = path.parent() {
4709                                path = parent_path;
4710                                if rng.lock().gen() {
4711                                    break;
4712                                }
4713                            }
4714
4715                            log::info!("Host: find/create local worktree {:?}", path);
4716                            project
4717                                .update(&mut cx, |project, cx| {
4718                                    project.find_or_create_local_worktree(path, false, cx)
4719                                })
4720                                .await
4721                                .unwrap();
4722                        }
4723                        10..=80 if !files.lock().is_empty() => {
4724                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4725                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4726                                let (worktree, path) = project
4727                                    .update(&mut cx, |project, cx| {
4728                                        project.find_or_create_local_worktree(
4729                                            file.clone(),
4730                                            false,
4731                                            cx,
4732                                        )
4733                                    })
4734                                    .await
4735                                    .unwrap();
4736                                let project_path =
4737                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4738                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
4739                                let buffer = project
4740                                    .update(&mut cx, |project, cx| {
4741                                        project.open_buffer(project_path, cx)
4742                                    })
4743                                    .await
4744                                    .unwrap();
4745                                self.buffers.insert(buffer.clone());
4746                                buffer
4747                            } else {
4748                                self.buffers
4749                                    .iter()
4750                                    .choose(&mut *rng.lock())
4751                                    .unwrap()
4752                                    .clone()
4753                            };
4754
4755                            if rng.lock().gen_bool(0.1) {
4756                                cx.update(|cx| {
4757                                    log::info!(
4758                                        "Host: dropping buffer {:?}",
4759                                        buffer.read(cx).file().unwrap().full_path(cx)
4760                                    );
4761                                    self.buffers.remove(&buffer);
4762                                    drop(buffer);
4763                                });
4764                            } else {
4765                                buffer.update(&mut cx, |buffer, cx| {
4766                                    log::info!(
4767                                        "Host: updating buffer {:?} ({})",
4768                                        buffer.file().unwrap().full_path(cx),
4769                                        buffer.remote_id()
4770                                    );
4771                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4772                                });
4773                            }
4774                        }
4775                        _ => loop {
4776                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4777                            let mut path = PathBuf::new();
4778                            path.push("/");
4779                            for _ in 0..path_component_count {
4780                                let letter = rng.lock().gen_range(b'a'..=b'z');
4781                                path.push(std::str::from_utf8(&[letter]).unwrap());
4782                            }
4783                            path.set_extension("rs");
4784                            let parent_path = path.parent().unwrap();
4785
4786                            log::info!("Host: creating file {:?}", path,);
4787
4788                            if fs.create_dir(&parent_path).await.is_ok()
4789                                && fs.create_file(&path, Default::default()).await.is_ok()
4790                            {
4791                                files.lock().push(path);
4792                                break;
4793                            } else {
4794                                log::info!("Host: cannot create file");
4795                            }
4796                        },
4797                    }
4798
4799                    cx.background().simulate_random_delay().await;
4800                }
4801
4802                log::info!("Host done");
4803
4804                self.project = Some(project);
4805                (self, cx)
4806            }
4807        }
4808
4809        pub async fn simulate_guest(
4810            mut self,
4811            guest_id: usize,
4812            project: ModelHandle<Project>,
4813            operations: Rc<Cell<usize>>,
4814            max_operations: usize,
4815            rng: Arc<Mutex<StdRng>>,
4816            mut cx: TestAppContext,
4817        ) -> (Self, TestAppContext) {
4818            while operations.get() < max_operations {
4819                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4820                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4821                        project
4822                            .worktrees(&cx)
4823                            .filter(|worktree| {
4824                                let worktree = worktree.read(cx);
4825                                !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
4826                            })
4827                            .choose(&mut *rng.lock())
4828                    }) {
4829                        worktree
4830                    } else {
4831                        cx.background().simulate_random_delay().await;
4832                        continue;
4833                    };
4834
4835                    operations.set(operations.get() + 1);
4836                    let (worktree_root_name, project_path) =
4837                        worktree.read_with(&cx, |worktree, _| {
4838                            let entry = worktree
4839                                .entries(false)
4840                                .filter(|e| e.is_file())
4841                                .choose(&mut *rng.lock())
4842                                .unwrap();
4843                            (
4844                                worktree.root_name().to_string(),
4845                                (worktree.id(), entry.path.clone()),
4846                            )
4847                        });
4848                    log::info!(
4849                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
4850                        guest_id,
4851                        project_path.0,
4852                        worktree_root_name,
4853                        project_path.1
4854                    );
4855                    let buffer = project
4856                        .update(&mut cx, |project, cx| {
4857                            project.open_buffer(project_path.clone(), cx)
4858                        })
4859                        .await
4860                        .unwrap();
4861                    log::info!(
4862                        "Guest {}: path in worktree {:?} {:?} {:?} opened with buffer id {:?}",
4863                        guest_id,
4864                        project_path.0,
4865                        worktree_root_name,
4866                        project_path.1,
4867                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4868                    );
4869                    self.buffers.insert(buffer.clone());
4870                    buffer
4871                } else {
4872                    operations.set(operations.get() + 1);
4873
4874                    self.buffers
4875                        .iter()
4876                        .choose(&mut *rng.lock())
4877                        .unwrap()
4878                        .clone()
4879                };
4880
4881                let choice = rng.lock().gen_range(0..100);
4882                match choice {
4883                    0..=9 => {
4884                        cx.update(|cx| {
4885                            log::info!(
4886                                "Guest {}: dropping buffer {:?}",
4887                                guest_id,
4888                                buffer.read(cx).file().unwrap().full_path(cx)
4889                            );
4890                            self.buffers.remove(&buffer);
4891                            drop(buffer);
4892                        });
4893                    }
4894                    10..=19 => {
4895                        let completions = project.update(&mut cx, |project, cx| {
4896                            log::info!(
4897                                "Guest {}: requesting completions for buffer {:?}",
4898                                guest_id,
4899                                buffer.read(cx).file().unwrap().full_path(cx)
4900                            );
4901                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4902                            project.completions(&buffer, offset, cx)
4903                        });
4904                        let completions = cx.background().spawn(async move {
4905                            completions.await.expect("completions request failed");
4906                        });
4907                        if rng.lock().gen_bool(0.3) {
4908                            log::info!("Guest {}: detaching completions request", guest_id);
4909                            completions.detach();
4910                        } else {
4911                            completions.await;
4912                        }
4913                    }
4914                    20..=29 => {
4915                        let code_actions = project.update(&mut cx, |project, cx| {
4916                            log::info!(
4917                                "Guest {}: requesting code actions for buffer {:?}",
4918                                guest_id,
4919                                buffer.read(cx).file().unwrap().full_path(cx)
4920                            );
4921                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4922                            project.code_actions(&buffer, range, cx)
4923                        });
4924                        let code_actions = cx.background().spawn(async move {
4925                            code_actions.await.expect("code actions request failed");
4926                        });
4927                        if rng.lock().gen_bool(0.3) {
4928                            log::info!("Guest {}: detaching code actions request", guest_id);
4929                            code_actions.detach();
4930                        } else {
4931                            code_actions.await;
4932                        }
4933                    }
4934                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4935                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4936                            log::info!(
4937                                "Guest {}: saving buffer {:?}",
4938                                guest_id,
4939                                buffer.file().unwrap().full_path(cx)
4940                            );
4941                            (buffer.version(), buffer.save(cx))
4942                        });
4943                        let save = cx.spawn(|cx| async move {
4944                            let (saved_version, _) = save.await.expect("save request failed");
4945                            buffer.read_with(&cx, |buffer, _| {
4946                                assert!(buffer.version().observed_all(&saved_version));
4947                                assert!(saved_version.observed_all(&requested_version));
4948                            });
4949                        });
4950                        if rng.lock().gen_bool(0.3) {
4951                            log::info!("Guest {}: detaching save request", guest_id);
4952                            save.detach();
4953                        } else {
4954                            save.await;
4955                        }
4956                    }
4957                    40..=44 => {
4958                        let prepare_rename = project.update(&mut cx, |project, cx| {
4959                            log::info!(
4960                                "Guest {}: preparing rename for buffer {:?}",
4961                                guest_id,
4962                                buffer.read(cx).file().unwrap().full_path(cx)
4963                            );
4964                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4965                            project.prepare_rename(buffer, offset, cx)
4966                        });
4967                        let prepare_rename = cx.background().spawn(async move {
4968                            prepare_rename.await.expect("prepare rename request failed");
4969                        });
4970                        if rng.lock().gen_bool(0.3) {
4971                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4972                            prepare_rename.detach();
4973                        } else {
4974                            prepare_rename.await;
4975                        }
4976                    }
4977                    45..=49 => {
4978                        let definitions = project.update(&mut cx, |project, cx| {
4979                            log::info!(
4980                                "Guest {}: requesting definitions for buffer {:?}",
4981                                guest_id,
4982                                buffer.read(cx).file().unwrap().full_path(cx)
4983                            );
4984                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4985                            project.definition(&buffer, offset, cx)
4986                        });
4987                        let definitions = cx.background().spawn(async move {
4988                            definitions.await.expect("definitions request failed")
4989                        });
4990                        if rng.lock().gen_bool(0.3) {
4991                            log::info!("Guest {}: detaching definitions request", guest_id);
4992                            definitions.detach();
4993                        } else {
4994                            self.buffers
4995                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
4996                        }
4997                    }
4998                    50..=54 => {
4999                        let highlights = project.update(&mut cx, |project, cx| {
5000                            log::info!(
5001                                "Guest {}: requesting highlights for buffer {:?}",
5002                                guest_id,
5003                                buffer.read(cx).file().unwrap().full_path(cx)
5004                            );
5005                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5006                            project.document_highlights(&buffer, offset, cx)
5007                        });
5008                        let highlights = cx.background().spawn(async move {
5009                            highlights.await.expect("highlights request failed");
5010                        });
5011                        if rng.lock().gen_bool(0.3) {
5012                            log::info!("Guest {}: detaching highlights request", guest_id);
5013                            highlights.detach();
5014                        } else {
5015                            highlights.await;
5016                        }
5017                    }
5018                    55..=59 => {
5019                        let search = project.update(&mut cx, |project, cx| {
5020                            let query = rng.lock().gen_range('a'..='z');
5021                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5022                            project.search(SearchQuery::text(query, false, false), cx)
5023                        });
5024                        let search = cx
5025                            .background()
5026                            .spawn(async move { search.await.expect("search request failed") });
5027                        if rng.lock().gen_bool(0.3) {
5028                            log::info!("Guest {}: detaching search request", guest_id);
5029                            search.detach();
5030                        } else {
5031                            self.buffers.extend(search.await.into_keys());
5032                        }
5033                    }
5034                    _ => {
5035                        buffer.update(&mut cx, |buffer, cx| {
5036                            log::info!(
5037                                "Guest {}: updating buffer {:?}",
5038                                guest_id,
5039                                buffer.file().unwrap().full_path(cx)
5040                            );
5041                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5042                        });
5043                    }
5044                }
5045                cx.background().simulate_random_delay().await;
5046            }
5047
5048            log::info!("Guest {} done", guest_id);
5049
5050            self.project = Some(project);
5051            (self, cx)
5052        }
5053    }
5054
5055    impl Drop for TestClient {
5056        fn drop(&mut self) {
5057            self.client.tear_down();
5058        }
5059    }
5060
5061    impl Executor for Arc<gpui::executor::Background> {
5062        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5063            self.spawn(future).detach();
5064        }
5065    }
5066
5067    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5068        channel
5069            .messages()
5070            .cursor::<()>()
5071            .map(|m| {
5072                (
5073                    m.sender.github_login.clone(),
5074                    m.body.clone(),
5075                    m.is_pending(),
5076                )
5077            })
5078            .collect()
5079    }
5080
5081    struct EmptyView;
5082
5083    impl gpui::Entity for EmptyView {
5084        type Event = ();
5085    }
5086
5087    impl gpui::View for EmptyView {
5088        fn ui_name() -> &'static str {
5089            "empty view"
5090        }
5091
5092        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5093            gpui::Element::boxed(gpui::elements::Empty)
5094        }
5095    }
5096}