rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  81            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  82            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  83            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  84            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  85            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  86            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  87            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  88            .add_request_handler(
  89                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  90            )
  91            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
  92            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
  93            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
  94            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
  95            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
  96            .add_message_handler(Server::close_buffer)
  97            .add_request_handler(Server::update_buffer)
  98            .add_message_handler(Server::update_buffer_file)
  99            .add_message_handler(Server::buffer_reloaded)
 100            .add_message_handler(Server::buffer_saved)
 101            .add_request_handler(Server::save_buffer)
 102            .add_request_handler(Server::get_channels)
 103            .add_request_handler(Server::get_users)
 104            .add_request_handler(Server::join_channel)
 105            .add_message_handler(Server::leave_channel)
 106            .add_request_handler(Server::send_channel_message)
 107            .add_request_handler(Server::get_channel_messages);
 108
 109        Arc::new(server)
 110    }
 111
 112    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 113    where
 114        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 115        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 116        M: EnvelopedMessage,
 117    {
 118        let prev_handler = self.handlers.insert(
 119            TypeId::of::<M>(),
 120            Box::new(move |server, envelope| {
 121                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 122                (handler)(server, *envelope).boxed()
 123            }),
 124        );
 125        if prev_handler.is_some() {
 126            panic!("registered a handler for the same message twice");
 127        }
 128        self
 129    }
 130
 131    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 132    where
 133        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 134        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 135        M: RequestMessage,
 136    {
 137        self.add_message_handler(move |server, envelope| {
 138            let receipt = envelope.receipt();
 139            let response = (handler)(server.clone(), envelope);
 140            async move {
 141                match response.await {
 142                    Ok(response) => {
 143                        server.peer.respond(receipt, response)?;
 144                        Ok(())
 145                    }
 146                    Err(error) => {
 147                        server.peer.respond_with_error(
 148                            receipt,
 149                            proto::Error {
 150                                message: error.to_string(),
 151                            },
 152                        )?;
 153                        Err(error)
 154                    }
 155                }
 156            }
 157        })
 158    }
 159
 160    pub fn handle_connection<E: Executor>(
 161        self: &Arc<Self>,
 162        connection: Connection,
 163        addr: String,
 164        user_id: UserId,
 165        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 166        executor: E,
 167    ) -> impl Future<Output = ()> {
 168        let mut this = self.clone();
 169        async move {
 170            let (connection_id, handle_io, mut incoming_rx) =
 171                this.peer.add_connection(connection).await;
 172
 173            if let Some(send_connection_id) = send_connection_id.as_mut() {
 174                let _ = send_connection_id.send(connection_id).await;
 175            }
 176
 177            this.state_mut().add_connection(connection_id, user_id);
 178            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 179                log::error!("error updating contacts for {:?}: {}", user_id, err);
 180            }
 181
 182            let handle_io = handle_io.fuse();
 183            futures::pin_mut!(handle_io);
 184            loop {
 185                let next_message = incoming_rx.next().fuse();
 186                futures::pin_mut!(next_message);
 187                futures::select_biased! {
 188                    result = handle_io => {
 189                        if let Err(err) = result {
 190                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 191                        }
 192                        break;
 193                    }
 194                    message = next_message => {
 195                        if let Some(message) = message {
 196                            let start_time = Instant::now();
 197                            let type_name = message.payload_type_name();
 198                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 199                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 200                                let notifications = this.notifications.clone();
 201                                let is_background = message.is_background();
 202                                let handle_message = (handler)(this.clone(), message);
 203                                let handle_message = async move {
 204                                    if let Err(err) = handle_message.await {
 205                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 206                                    } else {
 207                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 208                                    }
 209                                    if let Some(mut notifications) = notifications {
 210                                        let _ = notifications.send(()).await;
 211                                    }
 212                                };
 213                                if is_background {
 214                                    executor.spawn_detached(handle_message);
 215                                } else {
 216                                    handle_message.await;
 217                                }
 218                            } else {
 219                                log::warn!("unhandled message: {}", type_name);
 220                            }
 221                        } else {
 222                            log::info!("rpc connection closed {:?}", addr);
 223                            break;
 224                        }
 225                    }
 226                }
 227            }
 228
 229            if let Err(err) = this.sign_out(connection_id).await {
 230                log::error!("error signing out connection {:?} - {:?}", addr, err);
 231            }
 232        }
 233    }
 234
 235    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 236        self.peer.disconnect(connection_id);
 237        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 238
 239        for (project_id, project) in removed_connection.hosted_projects {
 240            if let Some(share) = project.share {
 241                broadcast(
 242                    connection_id,
 243                    share.guests.keys().copied().collect(),
 244                    |conn_id| {
 245                        self.peer
 246                            .send(conn_id, proto::UnshareProject { project_id })
 247                    },
 248                )?;
 249            }
 250        }
 251
 252        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 253            broadcast(connection_id, peer_ids, |conn_id| {
 254                self.peer.send(
 255                    conn_id,
 256                    proto::RemoveProjectCollaborator {
 257                        project_id,
 258                        peer_id: connection_id.0,
 259                    },
 260                )
 261            })?;
 262        }
 263
 264        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 265        Ok(())
 266    }
 267
 268    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 269        Ok(proto::Ack {})
 270    }
 271
 272    async fn register_project(
 273        mut self: Arc<Server>,
 274        request: TypedEnvelope<proto::RegisterProject>,
 275    ) -> tide::Result<proto::RegisterProjectResponse> {
 276        let project_id = {
 277            let mut state = self.state_mut();
 278            let user_id = state.user_id_for_connection(request.sender_id)?;
 279            state.register_project(request.sender_id, user_id)
 280        };
 281        Ok(proto::RegisterProjectResponse { project_id })
 282    }
 283
 284    async fn unregister_project(
 285        mut self: Arc<Server>,
 286        request: TypedEnvelope<proto::UnregisterProject>,
 287    ) -> tide::Result<()> {
 288        let project = self
 289            .state_mut()
 290            .unregister_project(request.payload.project_id, request.sender_id)?;
 291        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 292        Ok(())
 293    }
 294
 295    async fn share_project(
 296        mut self: Arc<Server>,
 297        request: TypedEnvelope<proto::ShareProject>,
 298    ) -> tide::Result<proto::Ack> {
 299        self.state_mut()
 300            .share_project(request.payload.project_id, request.sender_id);
 301        Ok(proto::Ack {})
 302    }
 303
 304    async fn unshare_project(
 305        mut self: Arc<Server>,
 306        request: TypedEnvelope<proto::UnshareProject>,
 307    ) -> tide::Result<()> {
 308        let project_id = request.payload.project_id;
 309        let project = self
 310            .state_mut()
 311            .unshare_project(project_id, request.sender_id)?;
 312
 313        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 314            self.peer
 315                .send(conn_id, proto::UnshareProject { project_id })
 316        })?;
 317        self.update_contacts_for_users(&project.authorized_user_ids)?;
 318        Ok(())
 319    }
 320
 321    async fn join_project(
 322        mut self: Arc<Server>,
 323        request: TypedEnvelope<proto::JoinProject>,
 324    ) -> tide::Result<proto::JoinProjectResponse> {
 325        let project_id = request.payload.project_id;
 326
 327        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 328        let (response, connection_ids, contact_user_ids) = self
 329            .state_mut()
 330            .join_project(request.sender_id, user_id, project_id)
 331            .and_then(|joined| {
 332                let share = joined.project.share()?;
 333                let peer_count = share.guests.len();
 334                let mut collaborators = Vec::with_capacity(peer_count);
 335                collaborators.push(proto::Collaborator {
 336                    peer_id: joined.project.host_connection_id.0,
 337                    replica_id: 0,
 338                    user_id: joined.project.host_user_id.to_proto(),
 339                });
 340                let worktrees = share
 341                    .worktrees
 342                    .iter()
 343                    .filter_map(|(id, shared_worktree)| {
 344                        let worktree = joined.project.worktrees.get(&id)?;
 345                        Some(proto::Worktree {
 346                            id: *id,
 347                            root_name: worktree.root_name.clone(),
 348                            entries: shared_worktree.entries.values().cloned().collect(),
 349                            diagnostic_summaries: shared_worktree
 350                                .diagnostic_summaries
 351                                .values()
 352                                .cloned()
 353                                .collect(),
 354                            weak: worktree.weak,
 355                        })
 356                    })
 357                    .collect();
 358                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 359                    if *peer_conn_id != request.sender_id {
 360                        collaborators.push(proto::Collaborator {
 361                            peer_id: peer_conn_id.0,
 362                            replica_id: *peer_replica_id as u32,
 363                            user_id: peer_user_id.to_proto(),
 364                        });
 365                    }
 366                }
 367                let response = proto::JoinProjectResponse {
 368                    worktrees,
 369                    replica_id: joined.replica_id as u32,
 370                    collaborators,
 371                };
 372                let connection_ids = joined.project.connection_ids();
 373                let contact_user_ids = joined.project.authorized_user_ids();
 374                Ok((response, connection_ids, contact_user_ids))
 375            })?;
 376
 377        broadcast(request.sender_id, connection_ids, |conn_id| {
 378            self.peer.send(
 379                conn_id,
 380                proto::AddProjectCollaborator {
 381                    project_id,
 382                    collaborator: Some(proto::Collaborator {
 383                        peer_id: request.sender_id.0,
 384                        replica_id: response.replica_id,
 385                        user_id: user_id.to_proto(),
 386                    }),
 387                },
 388            )
 389        })?;
 390        self.update_contacts_for_users(&contact_user_ids)?;
 391        Ok(response)
 392    }
 393
 394    async fn leave_project(
 395        mut self: Arc<Server>,
 396        request: TypedEnvelope<proto::LeaveProject>,
 397    ) -> tide::Result<()> {
 398        let sender_id = request.sender_id;
 399        let project_id = request.payload.project_id;
 400        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 401
 402        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 403            self.peer.send(
 404                conn_id,
 405                proto::RemoveProjectCollaborator {
 406                    project_id,
 407                    peer_id: sender_id.0,
 408                },
 409            )
 410        })?;
 411        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 412
 413        Ok(())
 414    }
 415
 416    async fn register_worktree(
 417        mut self: Arc<Server>,
 418        request: TypedEnvelope<proto::RegisterWorktree>,
 419    ) -> tide::Result<proto::Ack> {
 420        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 421
 422        let mut contact_user_ids = HashSet::default();
 423        contact_user_ids.insert(host_user_id);
 424        for github_login in &request.payload.authorized_logins {
 425            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 426            contact_user_ids.insert(contact_user_id);
 427        }
 428
 429        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 430        let guest_connection_ids;
 431        {
 432            let mut state = self.state_mut();
 433            guest_connection_ids = state
 434                .read_project(request.payload.project_id, request.sender_id)?
 435                .guest_connection_ids();
 436            state.register_worktree(
 437                request.payload.project_id,
 438                request.payload.worktree_id,
 439                request.sender_id,
 440                Worktree {
 441                    authorized_user_ids: contact_user_ids.clone(),
 442                    root_name: request.payload.root_name.clone(),
 443                    weak: request.payload.weak,
 444                },
 445            )?;
 446        }
 447        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 448            self.peer
 449                .forward_send(request.sender_id, connection_id, request.payload.clone())
 450        })?;
 451        self.update_contacts_for_users(&contact_user_ids)?;
 452        Ok(proto::Ack {})
 453    }
 454
 455    async fn unregister_worktree(
 456        mut self: Arc<Server>,
 457        request: TypedEnvelope<proto::UnregisterWorktree>,
 458    ) -> tide::Result<()> {
 459        let project_id = request.payload.project_id;
 460        let worktree_id = request.payload.worktree_id;
 461        let (worktree, guest_connection_ids) =
 462            self.state_mut()
 463                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 464        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 465            self.peer.send(
 466                conn_id,
 467                proto::UnregisterWorktree {
 468                    project_id,
 469                    worktree_id,
 470                },
 471            )
 472        })?;
 473        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 474        Ok(())
 475    }
 476
 477    async fn update_worktree(
 478        mut self: Arc<Server>,
 479        request: TypedEnvelope<proto::UpdateWorktree>,
 480    ) -> tide::Result<proto::Ack> {
 481        let connection_ids = self.state_mut().update_worktree(
 482            request.sender_id,
 483            request.payload.project_id,
 484            request.payload.worktree_id,
 485            &request.payload.removed_entries,
 486            &request.payload.updated_entries,
 487        )?;
 488
 489        broadcast(request.sender_id, connection_ids, |connection_id| {
 490            self.peer
 491                .forward_send(request.sender_id, connection_id, request.payload.clone())
 492        })?;
 493
 494        Ok(proto::Ack {})
 495    }
 496
 497    async fn update_diagnostic_summary(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 500    ) -> tide::Result<()> {
 501        let summary = request
 502            .payload
 503            .summary
 504            .clone()
 505            .ok_or_else(|| anyhow!("invalid summary"))?;
 506        let receiver_ids = self.state_mut().update_diagnostic_summary(
 507            request.payload.project_id,
 508            request.payload.worktree_id,
 509            request.sender_id,
 510            summary,
 511        )?;
 512
 513        broadcast(request.sender_id, receiver_ids, |connection_id| {
 514            self.peer
 515                .forward_send(request.sender_id, connection_id, request.payload.clone())
 516        })?;
 517        Ok(())
 518    }
 519
 520    async fn disk_based_diagnostics_updating(
 521        self: Arc<Server>,
 522        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 523    ) -> tide::Result<()> {
 524        let receiver_ids = self
 525            .state()
 526            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 527        broadcast(request.sender_id, receiver_ids, |connection_id| {
 528            self.peer
 529                .forward_send(request.sender_id, connection_id, request.payload.clone())
 530        })?;
 531        Ok(())
 532    }
 533
 534    async fn disk_based_diagnostics_updated(
 535        self: Arc<Server>,
 536        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 537    ) -> tide::Result<()> {
 538        let receiver_ids = self
 539            .state()
 540            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 541        broadcast(request.sender_id, receiver_ids, |connection_id| {
 542            self.peer
 543                .forward_send(request.sender_id, connection_id, request.payload.clone())
 544        })?;
 545        Ok(())
 546    }
 547
 548    async fn forward_project_request<T>(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<T>,
 551    ) -> tide::Result<T::Response>
 552    where
 553        T: EntityMessage + RequestMessage,
 554    {
 555        let host_connection_id = self
 556            .state()
 557            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 558            .host_connection_id;
 559        Ok(self
 560            .peer
 561            .forward_request(request.sender_id, host_connection_id, request.payload)
 562            .await?)
 563    }
 564
 565    async fn close_buffer(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::CloseBuffer>,
 568    ) -> tide::Result<()> {
 569        let host_connection_id = self
 570            .state()
 571            .read_project(request.payload.project_id, request.sender_id)?
 572            .host_connection_id;
 573        self.peer
 574            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 575        Ok(())
 576    }
 577
 578    async fn save_buffer(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::SaveBuffer>,
 581    ) -> tide::Result<proto::BufferSaved> {
 582        let host;
 583        let mut guests;
 584        {
 585            let state = self.state();
 586            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 587            host = project.host_connection_id;
 588            guests = project.guest_connection_ids()
 589        }
 590
 591        let response = self
 592            .peer
 593            .forward_request(request.sender_id, host, request.payload.clone())
 594            .await?;
 595
 596        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 597        broadcast(host, guests, |conn_id| {
 598            self.peer.forward_send(host, conn_id, response.clone())
 599        })?;
 600
 601        Ok(response)
 602    }
 603
 604    async fn update_buffer(
 605        self: Arc<Server>,
 606        request: TypedEnvelope<proto::UpdateBuffer>,
 607    ) -> tide::Result<proto::Ack> {
 608        let receiver_ids = self
 609            .state()
 610            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 611        broadcast(request.sender_id, receiver_ids, |connection_id| {
 612            self.peer
 613                .forward_send(request.sender_id, connection_id, request.payload.clone())
 614        })?;
 615        Ok(proto::Ack {})
 616    }
 617
 618    async fn update_buffer_file(
 619        self: Arc<Server>,
 620        request: TypedEnvelope<proto::UpdateBufferFile>,
 621    ) -> tide::Result<()> {
 622        let receiver_ids = self
 623            .state()
 624            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 625        broadcast(request.sender_id, receiver_ids, |connection_id| {
 626            self.peer
 627                .forward_send(request.sender_id, connection_id, request.payload.clone())
 628        })?;
 629        Ok(())
 630    }
 631
 632    async fn buffer_reloaded(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::BufferReloaded>,
 635    ) -> tide::Result<()> {
 636        let receiver_ids = self
 637            .state()
 638            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 639        broadcast(request.sender_id, receiver_ids, |connection_id| {
 640            self.peer
 641                .forward_send(request.sender_id, connection_id, request.payload.clone())
 642        })?;
 643        Ok(())
 644    }
 645
 646    async fn buffer_saved(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::BufferSaved>,
 649    ) -> tide::Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 653        broadcast(request.sender_id, receiver_ids, |connection_id| {
 654            self.peer
 655                .forward_send(request.sender_id, connection_id, request.payload.clone())
 656        })?;
 657        Ok(())
 658    }
 659
 660    async fn get_channels(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::GetChannels>,
 663    ) -> tide::Result<proto::GetChannelsResponse> {
 664        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 665        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 666        Ok(proto::GetChannelsResponse {
 667            channels: channels
 668                .into_iter()
 669                .map(|chan| proto::Channel {
 670                    id: chan.id.to_proto(),
 671                    name: chan.name,
 672                })
 673                .collect(),
 674        })
 675    }
 676
 677    async fn get_users(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::GetUsers>,
 680    ) -> tide::Result<proto::GetUsersResponse> {
 681        let user_ids = request
 682            .payload
 683            .user_ids
 684            .into_iter()
 685            .map(UserId::from_proto)
 686            .collect();
 687        let users = self
 688            .app_state
 689            .db
 690            .get_users_by_ids(user_ids)
 691            .await?
 692            .into_iter()
 693            .map(|user| proto::User {
 694                id: user.id.to_proto(),
 695                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 696                github_login: user.github_login,
 697            })
 698            .collect();
 699        Ok(proto::GetUsersResponse { users })
 700    }
 701
 702    fn update_contacts_for_users<'a>(
 703        self: &Arc<Server>,
 704        user_ids: impl IntoIterator<Item = &'a UserId>,
 705    ) -> anyhow::Result<()> {
 706        let mut result = Ok(());
 707        let state = self.state();
 708        for user_id in user_ids {
 709            let contacts = state.contacts_for_user(*user_id);
 710            for connection_id in state.connection_ids_for_user(*user_id) {
 711                if let Err(error) = self.peer.send(
 712                    connection_id,
 713                    proto::UpdateContacts {
 714                        contacts: contacts.clone(),
 715                    },
 716                ) {
 717                    result = Err(error);
 718                }
 719            }
 720        }
 721        result
 722    }
 723
 724    async fn join_channel(
 725        mut self: Arc<Self>,
 726        request: TypedEnvelope<proto::JoinChannel>,
 727    ) -> tide::Result<proto::JoinChannelResponse> {
 728        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 729        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 730        if !self
 731            .app_state
 732            .db
 733            .can_user_access_channel(user_id, channel_id)
 734            .await?
 735        {
 736            Err(anyhow!("access denied"))?;
 737        }
 738
 739        self.state_mut().join_channel(request.sender_id, channel_id);
 740        let messages = self
 741            .app_state
 742            .db
 743            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 744            .await?
 745            .into_iter()
 746            .map(|msg| proto::ChannelMessage {
 747                id: msg.id.to_proto(),
 748                body: msg.body,
 749                timestamp: msg.sent_at.unix_timestamp() as u64,
 750                sender_id: msg.sender_id.to_proto(),
 751                nonce: Some(msg.nonce.as_u128().into()),
 752            })
 753            .collect::<Vec<_>>();
 754        Ok(proto::JoinChannelResponse {
 755            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 756            messages,
 757        })
 758    }
 759
 760    async fn leave_channel(
 761        mut self: Arc<Self>,
 762        request: TypedEnvelope<proto::LeaveChannel>,
 763    ) -> tide::Result<()> {
 764        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 765        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 766        if !self
 767            .app_state
 768            .db
 769            .can_user_access_channel(user_id, channel_id)
 770            .await?
 771        {
 772            Err(anyhow!("access denied"))?;
 773        }
 774
 775        self.state_mut()
 776            .leave_channel(request.sender_id, channel_id);
 777
 778        Ok(())
 779    }
 780
 781    async fn send_channel_message(
 782        self: Arc<Self>,
 783        request: TypedEnvelope<proto::SendChannelMessage>,
 784    ) -> tide::Result<proto::SendChannelMessageResponse> {
 785        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 786        let user_id;
 787        let connection_ids;
 788        {
 789            let state = self.state();
 790            user_id = state.user_id_for_connection(request.sender_id)?;
 791            connection_ids = state.channel_connection_ids(channel_id)?;
 792        }
 793
 794        // Validate the message body.
 795        let body = request.payload.body.trim().to_string();
 796        if body.len() > MAX_MESSAGE_LEN {
 797            return Err(anyhow!("message is too long"))?;
 798        }
 799        if body.is_empty() {
 800            return Err(anyhow!("message can't be blank"))?;
 801        }
 802
 803        let timestamp = OffsetDateTime::now_utc();
 804        let nonce = request
 805            .payload
 806            .nonce
 807            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 808
 809        let message_id = self
 810            .app_state
 811            .db
 812            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 813            .await?
 814            .to_proto();
 815        let message = proto::ChannelMessage {
 816            sender_id: user_id.to_proto(),
 817            id: message_id,
 818            body,
 819            timestamp: timestamp.unix_timestamp() as u64,
 820            nonce: Some(nonce),
 821        };
 822        broadcast(request.sender_id, connection_ids, |conn_id| {
 823            self.peer.send(
 824                conn_id,
 825                proto::ChannelMessageSent {
 826                    channel_id: channel_id.to_proto(),
 827                    message: Some(message.clone()),
 828                },
 829            )
 830        })?;
 831        Ok(proto::SendChannelMessageResponse {
 832            message: Some(message),
 833        })
 834    }
 835
 836    async fn get_channel_messages(
 837        self: Arc<Self>,
 838        request: TypedEnvelope<proto::GetChannelMessages>,
 839    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 840        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 841        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 842        if !self
 843            .app_state
 844            .db
 845            .can_user_access_channel(user_id, channel_id)
 846            .await?
 847        {
 848            Err(anyhow!("access denied"))?;
 849        }
 850
 851        let messages = self
 852            .app_state
 853            .db
 854            .get_channel_messages(
 855                channel_id,
 856                MESSAGE_COUNT_PER_PAGE,
 857                Some(MessageId::from_proto(request.payload.before_message_id)),
 858            )
 859            .await?
 860            .into_iter()
 861            .map(|msg| proto::ChannelMessage {
 862                id: msg.id.to_proto(),
 863                body: msg.body,
 864                timestamp: msg.sent_at.unix_timestamp() as u64,
 865                sender_id: msg.sender_id.to_proto(),
 866                nonce: Some(msg.nonce.as_u128().into()),
 867            })
 868            .collect::<Vec<_>>();
 869
 870        Ok(proto::GetChannelMessagesResponse {
 871            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 872            messages,
 873        })
 874    }
 875
 876    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 877        self.store.read()
 878    }
 879
 880    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 881        self.store.write()
 882    }
 883}
 884
 885impl Executor for RealExecutor {
 886    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 887        task::spawn(future);
 888    }
 889}
 890
 891fn broadcast<F>(
 892    sender_id: ConnectionId,
 893    receiver_ids: Vec<ConnectionId>,
 894    mut f: F,
 895) -> anyhow::Result<()>
 896where
 897    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 898{
 899    let mut result = Ok(());
 900    for receiver_id in receiver_ids {
 901        if receiver_id != sender_id {
 902            if let Err(error) = f(receiver_id) {
 903                if result.is_ok() {
 904                    result = Err(error);
 905                }
 906            }
 907        }
 908    }
 909    result
 910}
 911
 912pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 913    let server = Server::new(app.state().clone(), rpc.clone(), None);
 914    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 915        let server = server.clone();
 916        async move {
 917            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 918
 919            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 920            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 921            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 922            let client_protocol_version: Option<u32> = request
 923                .header("X-Zed-Protocol-Version")
 924                .and_then(|v| v.as_str().parse().ok());
 925
 926            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 927                return Ok(Response::new(StatusCode::UpgradeRequired));
 928            }
 929
 930            let header = match request.header("Sec-Websocket-Key") {
 931                Some(h) => h.as_str(),
 932                None => return Err(anyhow!("expected sec-websocket-key"))?,
 933            };
 934
 935            let user_id = process_auth_header(&request).await?;
 936
 937            let mut response = Response::new(StatusCode::SwitchingProtocols);
 938            response.insert_header(UPGRADE, "websocket");
 939            response.insert_header(CONNECTION, "Upgrade");
 940            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 941            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 942            response.insert_header("Sec-Websocket-Version", "13");
 943
 944            let http_res: &mut tide::http::Response = response.as_mut();
 945            let upgrade_receiver = http_res.recv_upgrade().await;
 946            let addr = request.remote().unwrap_or("unknown").to_string();
 947            task::spawn(async move {
 948                if let Some(stream) = upgrade_receiver.await {
 949                    server
 950                        .handle_connection(
 951                            Connection::new(
 952                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 953                            ),
 954                            addr,
 955                            user_id,
 956                            None,
 957                            RealExecutor,
 958                        )
 959                        .await;
 960                }
 961            });
 962
 963            Ok(response)
 964        }
 965    });
 966}
 967
 968fn header_contains_ignore_case<T>(
 969    request: &tide::Request<T>,
 970    header_name: HeaderName,
 971    value: &str,
 972) -> bool {
 973    request
 974        .header(header_name)
 975        .map(|h| {
 976            h.as_str()
 977                .split(',')
 978                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 979        })
 980        .unwrap_or(false)
 981}
 982
 983#[cfg(test)]
 984mod tests {
 985    use super::*;
 986    use crate::{
 987        auth,
 988        db::{tests::TestDb, UserId},
 989        github, AppState, Config,
 990    };
 991    use ::rpc::Peer;
 992    use collections::BTreeMap;
 993    use gpui::{executor, ModelHandle, TestAppContext};
 994    use parking_lot::Mutex;
 995    use postage::{sink::Sink, watch};
 996    use rand::prelude::*;
 997    use rpc::PeerId;
 998    use serde_json::json;
 999    use sqlx::types::time::OffsetDateTime;
1000    use std::{
1001        cell::Cell,
1002        env,
1003        ops::Deref,
1004        path::{Path, PathBuf},
1005        rc::Rc,
1006        sync::{
1007            atomic::{AtomicBool, Ordering::SeqCst},
1008            Arc,
1009        },
1010        time::Duration,
1011    };
1012    use zed::{
1013        client::{
1014            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1015            EstablishConnectionError, UserStore,
1016        },
1017        editor::{
1018            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1019            Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1020        },
1021        fs::{FakeFs, Fs as _},
1022        language::{
1023            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1024            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1025        },
1026        lsp,
1027        project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath},
1028        workspace::{Settings, Workspace, WorkspaceParams},
1029    };
1030
1031    #[cfg(test)]
1032    #[ctor::ctor]
1033    fn init_logger() {
1034        if std::env::var("RUST_LOG").is_ok() {
1035            env_logger::init();
1036        }
1037    }
1038
1039    #[gpui::test(iterations = 10)]
1040    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1041        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1042        let lang_registry = Arc::new(LanguageRegistry::new());
1043        let fs = FakeFs::new(cx_a.background());
1044        cx_a.foreground().forbid_parking();
1045
1046        // Connect to a server as 2 clients.
1047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1048        let client_a = server.create_client(&mut cx_a, "user_a").await;
1049        let client_b = server.create_client(&mut cx_b, "user_b").await;
1050
1051        // Share a project as client A
1052        fs.insert_tree(
1053            "/a",
1054            json!({
1055                ".zed.toml": r#"collaborators = ["user_b"]"#,
1056                "a.txt": "a-contents",
1057                "b.txt": "b-contents",
1058            }),
1059        )
1060        .await;
1061        let project_a = cx_a.update(|cx| {
1062            Project::local(
1063                client_a.clone(),
1064                client_a.user_store.clone(),
1065                lang_registry.clone(),
1066                fs.clone(),
1067                cx,
1068            )
1069        });
1070        let (worktree_a, _) = project_a
1071            .update(&mut cx_a, |p, cx| {
1072                p.find_or_create_local_worktree("/a", false, cx)
1073            })
1074            .await
1075            .unwrap();
1076        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1077        worktree_a
1078            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1079            .await;
1080        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1081        project_a
1082            .update(&mut cx_a, |p, cx| p.share(cx))
1083            .await
1084            .unwrap();
1085
1086        // Join that project as client B
1087        let project_b = Project::remote(
1088            project_id,
1089            client_b.clone(),
1090            client_b.user_store.clone(),
1091            lang_registry.clone(),
1092            fs.clone(),
1093            &mut cx_b.to_async(),
1094        )
1095        .await
1096        .unwrap();
1097
1098        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1099            assert_eq!(
1100                project
1101                    .collaborators()
1102                    .get(&client_a.peer_id)
1103                    .unwrap()
1104                    .user
1105                    .github_login,
1106                "user_a"
1107            );
1108            project.replica_id()
1109        });
1110        project_a
1111            .condition(&cx_a, |tree, _| {
1112                tree.collaborators()
1113                    .get(&client_b.peer_id)
1114                    .map_or(false, |collaborator| {
1115                        collaborator.replica_id == replica_id_b
1116                            && collaborator.user.github_login == "user_b"
1117                    })
1118            })
1119            .await;
1120
1121        // Open the same file as client B and client A.
1122        let buffer_b = project_b
1123            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1124            .await
1125            .unwrap();
1126        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1127        buffer_b.read_with(&cx_b, |buf, cx| {
1128            assert_eq!(buf.read(cx).text(), "b-contents")
1129        });
1130        project_a.read_with(&cx_a, |project, cx| {
1131            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1132        });
1133        let buffer_a = project_a
1134            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1135            .await
1136            .unwrap();
1137
1138        let editor_b = cx_b.add_view(window_b, |cx| {
1139            Editor::for_buffer(
1140                buffer_b,
1141                None,
1142                watch::channel_with(Settings::test(cx)).1,
1143                cx,
1144            )
1145        });
1146
1147        // TODO
1148        // // Create a selection set as client B and see that selection set as client A.
1149        // buffer_a
1150        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1151        //     .await;
1152
1153        // Edit the buffer as client B and see that edit as client A.
1154        editor_b.update(&mut cx_b, |editor, cx| {
1155            editor.handle_input(&Input("ok, ".into()), cx)
1156        });
1157        buffer_a
1158            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1159            .await;
1160
1161        // TODO
1162        // // Remove the selection set as client B, see those selections disappear as client A.
1163        cx_b.update(move |_| drop(editor_b));
1164        // buffer_a
1165        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1166        //     .await;
1167
1168        // Close the buffer as client A, see that the buffer is closed.
1169        cx_a.update(move |_| drop(buffer_a));
1170        project_a
1171            .condition(&cx_a, |project, cx| {
1172                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1173            })
1174            .await;
1175
1176        // Dropping the client B's project removes client B from client A's collaborators.
1177        cx_b.update(move |_| drop(project_b));
1178        project_a
1179            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1180            .await;
1181    }
1182
1183    #[gpui::test(iterations = 10)]
1184    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1185        let lang_registry = Arc::new(LanguageRegistry::new());
1186        let fs = FakeFs::new(cx_a.background());
1187        cx_a.foreground().forbid_parking();
1188
1189        // Connect to a server as 2 clients.
1190        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1191        let client_a = server.create_client(&mut cx_a, "user_a").await;
1192        let client_b = server.create_client(&mut cx_b, "user_b").await;
1193
1194        // Share a project as client A
1195        fs.insert_tree(
1196            "/a",
1197            json!({
1198                ".zed.toml": r#"collaborators = ["user_b"]"#,
1199                "a.txt": "a-contents",
1200                "b.txt": "b-contents",
1201            }),
1202        )
1203        .await;
1204        let project_a = cx_a.update(|cx| {
1205            Project::local(
1206                client_a.clone(),
1207                client_a.user_store.clone(),
1208                lang_registry.clone(),
1209                fs.clone(),
1210                cx,
1211            )
1212        });
1213        let (worktree_a, _) = project_a
1214            .update(&mut cx_a, |p, cx| {
1215                p.find_or_create_local_worktree("/a", false, cx)
1216            })
1217            .await
1218            .unwrap();
1219        worktree_a
1220            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1221            .await;
1222        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1223        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1224        project_a
1225            .update(&mut cx_a, |p, cx| p.share(cx))
1226            .await
1227            .unwrap();
1228        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1229
1230        // Join that project as client B
1231        let project_b = Project::remote(
1232            project_id,
1233            client_b.clone(),
1234            client_b.user_store.clone(),
1235            lang_registry.clone(),
1236            fs.clone(),
1237            &mut cx_b.to_async(),
1238        )
1239        .await
1240        .unwrap();
1241        project_b
1242            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1243            .await
1244            .unwrap();
1245
1246        // Unshare the project as client A
1247        project_a
1248            .update(&mut cx_a, |project, cx| project.unshare(cx))
1249            .await
1250            .unwrap();
1251        project_b
1252            .condition(&mut cx_b, |project, _| project.is_read_only())
1253            .await;
1254        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1255        drop(project_b);
1256
1257        // Share the project again and ensure guests can still join.
1258        project_a
1259            .update(&mut cx_a, |project, cx| project.share(cx))
1260            .await
1261            .unwrap();
1262        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1263
1264        let project_c = Project::remote(
1265            project_id,
1266            client_b.clone(),
1267            client_b.user_store.clone(),
1268            lang_registry.clone(),
1269            fs.clone(),
1270            &mut cx_b.to_async(),
1271        )
1272        .await
1273        .unwrap();
1274        project_c
1275            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1276            .await
1277            .unwrap();
1278    }
1279
1280    #[gpui::test(iterations = 10)]
1281    async fn test_propagate_saves_and_fs_changes(
1282        mut cx_a: TestAppContext,
1283        mut cx_b: TestAppContext,
1284        mut cx_c: TestAppContext,
1285    ) {
1286        let lang_registry = Arc::new(LanguageRegistry::new());
1287        let fs = FakeFs::new(cx_a.background());
1288        cx_a.foreground().forbid_parking();
1289
1290        // Connect to a server as 3 clients.
1291        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1292        let client_a = server.create_client(&mut cx_a, "user_a").await;
1293        let client_b = server.create_client(&mut cx_b, "user_b").await;
1294        let client_c = server.create_client(&mut cx_c, "user_c").await;
1295
1296        // Share a worktree as client A.
1297        fs.insert_tree(
1298            "/a",
1299            json!({
1300                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1301                "file1": "",
1302                "file2": ""
1303            }),
1304        )
1305        .await;
1306        let project_a = cx_a.update(|cx| {
1307            Project::local(
1308                client_a.clone(),
1309                client_a.user_store.clone(),
1310                lang_registry.clone(),
1311                fs.clone(),
1312                cx,
1313            )
1314        });
1315        let (worktree_a, _) = project_a
1316            .update(&mut cx_a, |p, cx| {
1317                p.find_or_create_local_worktree("/a", false, cx)
1318            })
1319            .await
1320            .unwrap();
1321        worktree_a
1322            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1323            .await;
1324        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1325        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1326        project_a
1327            .update(&mut cx_a, |p, cx| p.share(cx))
1328            .await
1329            .unwrap();
1330
1331        // Join that worktree as clients B and C.
1332        let project_b = Project::remote(
1333            project_id,
1334            client_b.clone(),
1335            client_b.user_store.clone(),
1336            lang_registry.clone(),
1337            fs.clone(),
1338            &mut cx_b.to_async(),
1339        )
1340        .await
1341        .unwrap();
1342        let project_c = Project::remote(
1343            project_id,
1344            client_c.clone(),
1345            client_c.user_store.clone(),
1346            lang_registry.clone(),
1347            fs.clone(),
1348            &mut cx_c.to_async(),
1349        )
1350        .await
1351        .unwrap();
1352        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1353        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1354
1355        // Open and edit a buffer as both guests B and C.
1356        let buffer_b = project_b
1357            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1358            .await
1359            .unwrap();
1360        let buffer_c = project_c
1361            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1362            .await
1363            .unwrap();
1364        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1365        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1366
1367        // Open and edit that buffer as the host.
1368        let buffer_a = project_a
1369            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1370            .await
1371            .unwrap();
1372
1373        buffer_a
1374            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1375            .await;
1376        buffer_a.update(&mut cx_a, |buf, cx| {
1377            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1378        });
1379
1380        // Wait for edits to propagate
1381        buffer_a
1382            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1383            .await;
1384        buffer_b
1385            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1386            .await;
1387        buffer_c
1388            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1389            .await;
1390
1391        // Edit the buffer as the host and concurrently save as guest B.
1392        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1393        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1394        save_b.await.unwrap();
1395        assert_eq!(
1396            fs.load("/a/file1".as_ref()).await.unwrap(),
1397            "hi-a, i-am-c, i-am-b, i-am-a"
1398        );
1399        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1400        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1401        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1402
1403        // Make changes on host's file system, see those changes on guest worktrees.
1404        fs.rename(
1405            "/a/file1".as_ref(),
1406            "/a/file1-renamed".as_ref(),
1407            Default::default(),
1408        )
1409        .await
1410        .unwrap();
1411
1412        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1413            .await
1414            .unwrap();
1415        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1416
1417        worktree_a
1418            .condition(&cx_a, |tree, _| {
1419                tree.paths()
1420                    .map(|p| p.to_string_lossy())
1421                    .collect::<Vec<_>>()
1422                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1423            })
1424            .await;
1425        worktree_b
1426            .condition(&cx_b, |tree, _| {
1427                tree.paths()
1428                    .map(|p| p.to_string_lossy())
1429                    .collect::<Vec<_>>()
1430                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1431            })
1432            .await;
1433        worktree_c
1434            .condition(&cx_c, |tree, _| {
1435                tree.paths()
1436                    .map(|p| p.to_string_lossy())
1437                    .collect::<Vec<_>>()
1438                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1439            })
1440            .await;
1441
1442        // Ensure buffer files are updated as well.
1443        buffer_a
1444            .condition(&cx_a, |buf, _| {
1445                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1446            })
1447            .await;
1448        buffer_b
1449            .condition(&cx_b, |buf, _| {
1450                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1451            })
1452            .await;
1453        buffer_c
1454            .condition(&cx_c, |buf, _| {
1455                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1456            })
1457            .await;
1458    }
1459
1460    #[gpui::test(iterations = 10)]
1461    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1462        cx_a.foreground().forbid_parking();
1463        let lang_registry = Arc::new(LanguageRegistry::new());
1464        let fs = FakeFs::new(cx_a.background());
1465
1466        // Connect to a server as 2 clients.
1467        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1468        let client_a = server.create_client(&mut cx_a, "user_a").await;
1469        let client_b = server.create_client(&mut cx_b, "user_b").await;
1470
1471        // Share a project as client A
1472        fs.insert_tree(
1473            "/dir",
1474            json!({
1475                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1476                "a.txt": "a-contents",
1477            }),
1478        )
1479        .await;
1480
1481        let project_a = cx_a.update(|cx| {
1482            Project::local(
1483                client_a.clone(),
1484                client_a.user_store.clone(),
1485                lang_registry.clone(),
1486                fs.clone(),
1487                cx,
1488            )
1489        });
1490        let (worktree_a, _) = project_a
1491            .update(&mut cx_a, |p, cx| {
1492                p.find_or_create_local_worktree("/dir", false, cx)
1493            })
1494            .await
1495            .unwrap();
1496        worktree_a
1497            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1498            .await;
1499        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1500        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1501        project_a
1502            .update(&mut cx_a, |p, cx| p.share(cx))
1503            .await
1504            .unwrap();
1505
1506        // Join that project as client B
1507        let project_b = Project::remote(
1508            project_id,
1509            client_b.clone(),
1510            client_b.user_store.clone(),
1511            lang_registry.clone(),
1512            fs.clone(),
1513            &mut cx_b.to_async(),
1514        )
1515        .await
1516        .unwrap();
1517
1518        // Open a buffer as client B
1519        let buffer_b = project_b
1520            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1521            .await
1522            .unwrap();
1523
1524        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1525        buffer_b.read_with(&cx_b, |buf, _| {
1526            assert!(buf.is_dirty());
1527            assert!(!buf.has_conflict());
1528        });
1529
1530        buffer_b
1531            .update(&mut cx_b, |buf, cx| buf.save(cx))
1532            .await
1533            .unwrap();
1534        buffer_b
1535            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1536            .await;
1537        buffer_b.read_with(&cx_b, |buf, _| {
1538            assert!(!buf.has_conflict());
1539        });
1540
1541        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1542        buffer_b.read_with(&cx_b, |buf, _| {
1543            assert!(buf.is_dirty());
1544            assert!(!buf.has_conflict());
1545        });
1546    }
1547
1548    #[gpui::test(iterations = 10)]
1549    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1550        cx_a.foreground().forbid_parking();
1551        let lang_registry = Arc::new(LanguageRegistry::new());
1552        let fs = FakeFs::new(cx_a.background());
1553
1554        // Connect to a server as 2 clients.
1555        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1556        let client_a = server.create_client(&mut cx_a, "user_a").await;
1557        let client_b = server.create_client(&mut cx_b, "user_b").await;
1558
1559        // Share a project as client A
1560        fs.insert_tree(
1561            "/dir",
1562            json!({
1563                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1564                "a.txt": "a-contents",
1565            }),
1566        )
1567        .await;
1568
1569        let project_a = cx_a.update(|cx| {
1570            Project::local(
1571                client_a.clone(),
1572                client_a.user_store.clone(),
1573                lang_registry.clone(),
1574                fs.clone(),
1575                cx,
1576            )
1577        });
1578        let (worktree_a, _) = project_a
1579            .update(&mut cx_a, |p, cx| {
1580                p.find_or_create_local_worktree("/dir", false, cx)
1581            })
1582            .await
1583            .unwrap();
1584        worktree_a
1585            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1586            .await;
1587        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1588        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1589        project_a
1590            .update(&mut cx_a, |p, cx| p.share(cx))
1591            .await
1592            .unwrap();
1593
1594        // Join that project as client B
1595        let project_b = Project::remote(
1596            project_id,
1597            client_b.clone(),
1598            client_b.user_store.clone(),
1599            lang_registry.clone(),
1600            fs.clone(),
1601            &mut cx_b.to_async(),
1602        )
1603        .await
1604        .unwrap();
1605        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1606
1607        // Open a buffer as client B
1608        let buffer_b = project_b
1609            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1610            .await
1611            .unwrap();
1612        buffer_b.read_with(&cx_b, |buf, _| {
1613            assert!(!buf.is_dirty());
1614            assert!(!buf.has_conflict());
1615        });
1616
1617        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1618            .await
1619            .unwrap();
1620        buffer_b
1621            .condition(&cx_b, |buf, _| {
1622                buf.text() == "new contents" && !buf.is_dirty()
1623            })
1624            .await;
1625        buffer_b.read_with(&cx_b, |buf, _| {
1626            assert!(!buf.has_conflict());
1627        });
1628    }
1629
1630    #[gpui::test(iterations = 10)]
1631    async fn test_editing_while_guest_opens_buffer(
1632        mut cx_a: TestAppContext,
1633        mut cx_b: TestAppContext,
1634    ) {
1635        cx_a.foreground().forbid_parking();
1636        let lang_registry = Arc::new(LanguageRegistry::new());
1637        let fs = FakeFs::new(cx_a.background());
1638
1639        // Connect to a server as 2 clients.
1640        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1641        let client_a = server.create_client(&mut cx_a, "user_a").await;
1642        let client_b = server.create_client(&mut cx_b, "user_b").await;
1643
1644        // Share a project as client A
1645        fs.insert_tree(
1646            "/dir",
1647            json!({
1648                ".zed.toml": r#"collaborators = ["user_b"]"#,
1649                "a.txt": "a-contents",
1650            }),
1651        )
1652        .await;
1653        let project_a = cx_a.update(|cx| {
1654            Project::local(
1655                client_a.clone(),
1656                client_a.user_store.clone(),
1657                lang_registry.clone(),
1658                fs.clone(),
1659                cx,
1660            )
1661        });
1662        let (worktree_a, _) = project_a
1663            .update(&mut cx_a, |p, cx| {
1664                p.find_or_create_local_worktree("/dir", false, cx)
1665            })
1666            .await
1667            .unwrap();
1668        worktree_a
1669            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1670            .await;
1671        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1672        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1673        project_a
1674            .update(&mut cx_a, |p, cx| p.share(cx))
1675            .await
1676            .unwrap();
1677
1678        // Join that project as client B
1679        let project_b = Project::remote(
1680            project_id,
1681            client_b.clone(),
1682            client_b.user_store.clone(),
1683            lang_registry.clone(),
1684            fs.clone(),
1685            &mut cx_b.to_async(),
1686        )
1687        .await
1688        .unwrap();
1689
1690        // Open a buffer as client A
1691        let buffer_a = project_a
1692            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1693            .await
1694            .unwrap();
1695
1696        // Start opening the same buffer as client B
1697        let buffer_b = cx_b
1698            .background()
1699            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1700
1701        // Edit the buffer as client A while client B is still opening it.
1702        cx_b.background().simulate_random_delay().await;
1703        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1704        cx_b.background().simulate_random_delay().await;
1705        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1706
1707        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1708        let buffer_b = buffer_b.await.unwrap();
1709        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1710    }
1711
1712    #[gpui::test(iterations = 10)]
1713    async fn test_leaving_worktree_while_opening_buffer(
1714        mut cx_a: TestAppContext,
1715        mut cx_b: TestAppContext,
1716    ) {
1717        cx_a.foreground().forbid_parking();
1718        let lang_registry = Arc::new(LanguageRegistry::new());
1719        let fs = FakeFs::new(cx_a.background());
1720
1721        // Connect to a server as 2 clients.
1722        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1723        let client_a = server.create_client(&mut cx_a, "user_a").await;
1724        let client_b = server.create_client(&mut cx_b, "user_b").await;
1725
1726        // Share a project as client A
1727        fs.insert_tree(
1728            "/dir",
1729            json!({
1730                ".zed.toml": r#"collaborators = ["user_b"]"#,
1731                "a.txt": "a-contents",
1732            }),
1733        )
1734        .await;
1735        let project_a = cx_a.update(|cx| {
1736            Project::local(
1737                client_a.clone(),
1738                client_a.user_store.clone(),
1739                lang_registry.clone(),
1740                fs.clone(),
1741                cx,
1742            )
1743        });
1744        let (worktree_a, _) = project_a
1745            .update(&mut cx_a, |p, cx| {
1746                p.find_or_create_local_worktree("/dir", false, cx)
1747            })
1748            .await
1749            .unwrap();
1750        worktree_a
1751            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1752            .await;
1753        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1754        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1755        project_a
1756            .update(&mut cx_a, |p, cx| p.share(cx))
1757            .await
1758            .unwrap();
1759
1760        // Join that project as client B
1761        let project_b = Project::remote(
1762            project_id,
1763            client_b.clone(),
1764            client_b.user_store.clone(),
1765            lang_registry.clone(),
1766            fs.clone(),
1767            &mut cx_b.to_async(),
1768        )
1769        .await
1770        .unwrap();
1771
1772        // See that a guest has joined as client A.
1773        project_a
1774            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1775            .await;
1776
1777        // Begin opening a buffer as client B, but leave the project before the open completes.
1778        let buffer_b = cx_b
1779            .background()
1780            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1781        cx_b.update(|_| drop(project_b));
1782        drop(buffer_b);
1783
1784        // See that the guest has left.
1785        project_a
1786            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1787            .await;
1788    }
1789
1790    #[gpui::test(iterations = 10)]
1791    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1792        cx_a.foreground().forbid_parking();
1793        let lang_registry = Arc::new(LanguageRegistry::new());
1794        let fs = FakeFs::new(cx_a.background());
1795
1796        // Connect to a server as 2 clients.
1797        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1798        let client_a = server.create_client(&mut cx_a, "user_a").await;
1799        let client_b = server.create_client(&mut cx_b, "user_b").await;
1800
1801        // Share a project as client A
1802        fs.insert_tree(
1803            "/a",
1804            json!({
1805                ".zed.toml": r#"collaborators = ["user_b"]"#,
1806                "a.txt": "a-contents",
1807                "b.txt": "b-contents",
1808            }),
1809        )
1810        .await;
1811        let project_a = cx_a.update(|cx| {
1812            Project::local(
1813                client_a.clone(),
1814                client_a.user_store.clone(),
1815                lang_registry.clone(),
1816                fs.clone(),
1817                cx,
1818            )
1819        });
1820        let (worktree_a, _) = project_a
1821            .update(&mut cx_a, |p, cx| {
1822                p.find_or_create_local_worktree("/a", false, cx)
1823            })
1824            .await
1825            .unwrap();
1826        worktree_a
1827            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1828            .await;
1829        let project_id = project_a
1830            .update(&mut cx_a, |project, _| project.next_remote_id())
1831            .await;
1832        project_a
1833            .update(&mut cx_a, |project, cx| project.share(cx))
1834            .await
1835            .unwrap();
1836
1837        // Join that project as client B
1838        let _project_b = Project::remote(
1839            project_id,
1840            client_b.clone(),
1841            client_b.user_store.clone(),
1842            lang_registry.clone(),
1843            fs.clone(),
1844            &mut cx_b.to_async(),
1845        )
1846        .await
1847        .unwrap();
1848
1849        // See that a guest has joined as client A.
1850        project_a
1851            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1852            .await;
1853
1854        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1855        client_b.disconnect(&cx_b.to_async()).unwrap();
1856        project_a
1857            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1858            .await;
1859    }
1860
1861    #[gpui::test(iterations = 10)]
1862    async fn test_collaborating_with_diagnostics(
1863        mut cx_a: TestAppContext,
1864        mut cx_b: TestAppContext,
1865    ) {
1866        cx_a.foreground().forbid_parking();
1867        let mut lang_registry = Arc::new(LanguageRegistry::new());
1868        let fs = FakeFs::new(cx_a.background());
1869
1870        // Set up a fake language server.
1871        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1872        Arc::get_mut(&mut lang_registry)
1873            .unwrap()
1874            .add(Arc::new(Language::new(
1875                LanguageConfig {
1876                    name: "Rust".into(),
1877                    path_suffixes: vec!["rs".to_string()],
1878                    language_server: Some(language_server_config),
1879                    ..Default::default()
1880                },
1881                Some(tree_sitter_rust::language()),
1882            )));
1883
1884        // Connect to a server as 2 clients.
1885        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1886        let client_a = server.create_client(&mut cx_a, "user_a").await;
1887        let client_b = server.create_client(&mut cx_b, "user_b").await;
1888
1889        // Share a project as client A
1890        fs.insert_tree(
1891            "/a",
1892            json!({
1893                ".zed.toml": r#"collaborators = ["user_b"]"#,
1894                "a.rs": "let one = two",
1895                "other.rs": "",
1896            }),
1897        )
1898        .await;
1899        let project_a = cx_a.update(|cx| {
1900            Project::local(
1901                client_a.clone(),
1902                client_a.user_store.clone(),
1903                lang_registry.clone(),
1904                fs.clone(),
1905                cx,
1906            )
1907        });
1908        let (worktree_a, _) = project_a
1909            .update(&mut cx_a, |p, cx| {
1910                p.find_or_create_local_worktree("/a", false, cx)
1911            })
1912            .await
1913            .unwrap();
1914        worktree_a
1915            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1916            .await;
1917        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1918        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1919        project_a
1920            .update(&mut cx_a, |p, cx| p.share(cx))
1921            .await
1922            .unwrap();
1923
1924        // Cause the language server to start.
1925        let _ = cx_a
1926            .background()
1927            .spawn(project_a.update(&mut cx_a, |project, cx| {
1928                project.open_buffer(
1929                    ProjectPath {
1930                        worktree_id,
1931                        path: Path::new("other.rs").into(),
1932                    },
1933                    cx,
1934                )
1935            }))
1936            .await
1937            .unwrap();
1938
1939        // Simulate a language server reporting errors for a file.
1940        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1941        fake_language_server
1942            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1943                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1944                version: None,
1945                diagnostics: vec![lsp::Diagnostic {
1946                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1947                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1948                    message: "message 1".to_string(),
1949                    ..Default::default()
1950                }],
1951            })
1952            .await;
1953
1954        // Wait for server to see the diagnostics update.
1955        server
1956            .condition(|store| {
1957                let worktree = store
1958                    .project(project_id)
1959                    .unwrap()
1960                    .share
1961                    .as_ref()
1962                    .unwrap()
1963                    .worktrees
1964                    .get(&worktree_id.to_proto())
1965                    .unwrap();
1966
1967                !worktree.diagnostic_summaries.is_empty()
1968            })
1969            .await;
1970
1971        // Join the worktree as client B.
1972        let project_b = Project::remote(
1973            project_id,
1974            client_b.clone(),
1975            client_b.user_store.clone(),
1976            lang_registry.clone(),
1977            fs.clone(),
1978            &mut cx_b.to_async(),
1979        )
1980        .await
1981        .unwrap();
1982
1983        project_b.read_with(&cx_b, |project, cx| {
1984            assert_eq!(
1985                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1986                &[(
1987                    ProjectPath {
1988                        worktree_id,
1989                        path: Arc::from(Path::new("a.rs")),
1990                    },
1991                    DiagnosticSummary {
1992                        error_count: 1,
1993                        warning_count: 0,
1994                        ..Default::default()
1995                    },
1996                )]
1997            )
1998        });
1999
2000        // Simulate a language server reporting more errors for a file.
2001        fake_language_server
2002            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2003                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2004                version: None,
2005                diagnostics: vec![
2006                    lsp::Diagnostic {
2007                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2008                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2009                        message: "message 1".to_string(),
2010                        ..Default::default()
2011                    },
2012                    lsp::Diagnostic {
2013                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2014                        range: lsp::Range::new(
2015                            lsp::Position::new(0, 10),
2016                            lsp::Position::new(0, 13),
2017                        ),
2018                        message: "message 2".to_string(),
2019                        ..Default::default()
2020                    },
2021                ],
2022            })
2023            .await;
2024
2025        // Client b gets the updated summaries
2026        project_b
2027            .condition(&cx_b, |project, cx| {
2028                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2029                    == &[(
2030                        ProjectPath {
2031                            worktree_id,
2032                            path: Arc::from(Path::new("a.rs")),
2033                        },
2034                        DiagnosticSummary {
2035                            error_count: 1,
2036                            warning_count: 1,
2037                            ..Default::default()
2038                        },
2039                    )]
2040            })
2041            .await;
2042
2043        // Open the file with the errors on client B. They should be present.
2044        let buffer_b = cx_b
2045            .background()
2046            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2047            .await
2048            .unwrap();
2049
2050        buffer_b.read_with(&cx_b, |buffer, _| {
2051            assert_eq!(
2052                buffer
2053                    .snapshot()
2054                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2055                    .map(|entry| entry)
2056                    .collect::<Vec<_>>(),
2057                &[
2058                    DiagnosticEntry {
2059                        range: Point::new(0, 4)..Point::new(0, 7),
2060                        diagnostic: Diagnostic {
2061                            group_id: 0,
2062                            message: "message 1".to_string(),
2063                            severity: lsp::DiagnosticSeverity::ERROR,
2064                            is_primary: true,
2065                            ..Default::default()
2066                        }
2067                    },
2068                    DiagnosticEntry {
2069                        range: Point::new(0, 10)..Point::new(0, 13),
2070                        diagnostic: Diagnostic {
2071                            group_id: 1,
2072                            severity: lsp::DiagnosticSeverity::WARNING,
2073                            message: "message 2".to_string(),
2074                            is_primary: true,
2075                            ..Default::default()
2076                        }
2077                    }
2078                ]
2079            );
2080        });
2081    }
2082
2083    #[gpui::test(iterations = 10)]
2084    async fn test_collaborating_with_completion(
2085        mut cx_a: TestAppContext,
2086        mut cx_b: TestAppContext,
2087    ) {
2088        cx_a.foreground().forbid_parking();
2089        let mut lang_registry = Arc::new(LanguageRegistry::new());
2090        let fs = FakeFs::new(cx_a.background());
2091
2092        // Set up a fake language server.
2093        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2094        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2095            completion_provider: Some(lsp::CompletionOptions {
2096                trigger_characters: Some(vec![".".to_string()]),
2097                ..Default::default()
2098            }),
2099            ..Default::default()
2100        });
2101        Arc::get_mut(&mut lang_registry)
2102            .unwrap()
2103            .add(Arc::new(Language::new(
2104                LanguageConfig {
2105                    name: "Rust".into(),
2106                    path_suffixes: vec!["rs".to_string()],
2107                    language_server: Some(language_server_config),
2108                    ..Default::default()
2109                },
2110                Some(tree_sitter_rust::language()),
2111            )));
2112
2113        // Connect to a server as 2 clients.
2114        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2115        let client_a = server.create_client(&mut cx_a, "user_a").await;
2116        let client_b = server.create_client(&mut cx_b, "user_b").await;
2117
2118        // Share a project as client A
2119        fs.insert_tree(
2120            "/a",
2121            json!({
2122                ".zed.toml": r#"collaborators = ["user_b"]"#,
2123                "main.rs": "fn main() { a }",
2124                "other.rs": "",
2125            }),
2126        )
2127        .await;
2128        let project_a = cx_a.update(|cx| {
2129            Project::local(
2130                client_a.clone(),
2131                client_a.user_store.clone(),
2132                lang_registry.clone(),
2133                fs.clone(),
2134                cx,
2135            )
2136        });
2137        let (worktree_a, _) = project_a
2138            .update(&mut cx_a, |p, cx| {
2139                p.find_or_create_local_worktree("/a", false, cx)
2140            })
2141            .await
2142            .unwrap();
2143        worktree_a
2144            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2145            .await;
2146        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2147        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2148        project_a
2149            .update(&mut cx_a, |p, cx| p.share(cx))
2150            .await
2151            .unwrap();
2152
2153        // Join the worktree as client B.
2154        let project_b = Project::remote(
2155            project_id,
2156            client_b.clone(),
2157            client_b.user_store.clone(),
2158            lang_registry.clone(),
2159            fs.clone(),
2160            &mut cx_b.to_async(),
2161        )
2162        .await
2163        .unwrap();
2164
2165        // Open a file in an editor as the guest.
2166        let buffer_b = project_b
2167            .update(&mut cx_b, |p, cx| {
2168                p.open_buffer((worktree_id, "main.rs"), cx)
2169            })
2170            .await
2171            .unwrap();
2172        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2173        let editor_b = cx_b.add_view(window_b, |cx| {
2174            Editor::for_buffer(
2175                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2176                Some(project_b.clone()),
2177                watch::channel_with(Settings::test(cx)).1,
2178                cx,
2179            )
2180        });
2181
2182        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2183        buffer_b
2184            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2185            .await;
2186
2187        // Type a completion trigger character as the guest.
2188        editor_b.update(&mut cx_b, |editor, cx| {
2189            editor.select_ranges([13..13], None, cx);
2190            editor.handle_input(&Input(".".into()), cx);
2191            cx.focus(&editor_b);
2192        });
2193
2194        // Receive a completion request as the host's language server.
2195        // Return some completions from the host's language server.
2196        cx_a.foreground().start_waiting();
2197        fake_language_server
2198            .handle_request::<lsp::request::Completion, _>(|params, _| {
2199                assert_eq!(
2200                    params.text_document_position.text_document.uri,
2201                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2202                );
2203                assert_eq!(
2204                    params.text_document_position.position,
2205                    lsp::Position::new(0, 14),
2206                );
2207
2208                Some(lsp::CompletionResponse::Array(vec![
2209                    lsp::CompletionItem {
2210                        label: "first_method(…)".into(),
2211                        detail: Some("fn(&mut self, B) -> C".into()),
2212                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2213                            new_text: "first_method($1)".to_string(),
2214                            range: lsp::Range::new(
2215                                lsp::Position::new(0, 14),
2216                                lsp::Position::new(0, 14),
2217                            ),
2218                        })),
2219                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2220                        ..Default::default()
2221                    },
2222                    lsp::CompletionItem {
2223                        label: "second_method(…)".into(),
2224                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2225                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2226                            new_text: "second_method()".to_string(),
2227                            range: lsp::Range::new(
2228                                lsp::Position::new(0, 14),
2229                                lsp::Position::new(0, 14),
2230                            ),
2231                        })),
2232                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2233                        ..Default::default()
2234                    },
2235                ]))
2236            })
2237            .next()
2238            .await
2239            .unwrap();
2240        cx_a.foreground().finish_waiting();
2241
2242        // Open the buffer on the host.
2243        let buffer_a = project_a
2244            .update(&mut cx_a, |p, cx| {
2245                p.open_buffer((worktree_id, "main.rs"), cx)
2246            })
2247            .await
2248            .unwrap();
2249        buffer_a
2250            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2251            .await;
2252
2253        // Confirm a completion on the guest.
2254        editor_b
2255            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2256            .await;
2257        editor_b.update(&mut cx_b, |editor, cx| {
2258            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2259            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2260        });
2261
2262        // Return a resolved completion from the host's language server.
2263        // The resolved completion has an additional text edit.
2264        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2265            |params, _| {
2266                assert_eq!(params.label, "first_method(…)");
2267                lsp::CompletionItem {
2268                    label: "first_method(…)".into(),
2269                    detail: Some("fn(&mut self, B) -> C".into()),
2270                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2271                        new_text: "first_method($1)".to_string(),
2272                        range: lsp::Range::new(
2273                            lsp::Position::new(0, 14),
2274                            lsp::Position::new(0, 14),
2275                        ),
2276                    })),
2277                    additional_text_edits: Some(vec![lsp::TextEdit {
2278                        new_text: "use d::SomeTrait;\n".to_string(),
2279                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2280                    }]),
2281                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2282                    ..Default::default()
2283                }
2284            },
2285        );
2286
2287        // The additional edit is applied.
2288        buffer_a
2289            .condition(&cx_a, |buffer, _| {
2290                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2291            })
2292            .await;
2293        buffer_b
2294            .condition(&cx_b, |buffer, _| {
2295                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2296            })
2297            .await;
2298    }
2299
2300    #[gpui::test(iterations = 10)]
2301    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2302        cx_a.foreground().forbid_parking();
2303        let mut lang_registry = Arc::new(LanguageRegistry::new());
2304        let fs = FakeFs::new(cx_a.background());
2305
2306        // Set up a fake language server.
2307        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2308        Arc::get_mut(&mut lang_registry)
2309            .unwrap()
2310            .add(Arc::new(Language::new(
2311                LanguageConfig {
2312                    name: "Rust".into(),
2313                    path_suffixes: vec!["rs".to_string()],
2314                    language_server: Some(language_server_config),
2315                    ..Default::default()
2316                },
2317                Some(tree_sitter_rust::language()),
2318            )));
2319
2320        // Connect to a server as 2 clients.
2321        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2322        let client_a = server.create_client(&mut cx_a, "user_a").await;
2323        let client_b = server.create_client(&mut cx_b, "user_b").await;
2324
2325        // Share a project as client A
2326        fs.insert_tree(
2327            "/a",
2328            json!({
2329                ".zed.toml": r#"collaborators = ["user_b"]"#,
2330                "a.rs": "let one = two",
2331            }),
2332        )
2333        .await;
2334        let project_a = cx_a.update(|cx| {
2335            Project::local(
2336                client_a.clone(),
2337                client_a.user_store.clone(),
2338                lang_registry.clone(),
2339                fs.clone(),
2340                cx,
2341            )
2342        });
2343        let (worktree_a, _) = project_a
2344            .update(&mut cx_a, |p, cx| {
2345                p.find_or_create_local_worktree("/a", false, cx)
2346            })
2347            .await
2348            .unwrap();
2349        worktree_a
2350            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2351            .await;
2352        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2353        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2354        project_a
2355            .update(&mut cx_a, |p, cx| p.share(cx))
2356            .await
2357            .unwrap();
2358
2359        // Join the worktree as client B.
2360        let project_b = Project::remote(
2361            project_id,
2362            client_b.clone(),
2363            client_b.user_store.clone(),
2364            lang_registry.clone(),
2365            fs.clone(),
2366            &mut cx_b.to_async(),
2367        )
2368        .await
2369        .unwrap();
2370
2371        let buffer_b = cx_b
2372            .background()
2373            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2374            .await
2375            .unwrap();
2376
2377        let format = project_b.update(&mut cx_b, |project, cx| {
2378            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2379        });
2380
2381        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2382        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2383            Some(vec![
2384                lsp::TextEdit {
2385                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2386                    new_text: "h".to_string(),
2387                },
2388                lsp::TextEdit {
2389                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2390                    new_text: "y".to_string(),
2391                },
2392            ])
2393        });
2394
2395        format.await.unwrap();
2396        assert_eq!(
2397            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2398            "let honey = two"
2399        );
2400    }
2401
2402    #[gpui::test(iterations = 10)]
2403    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2404        cx_a.foreground().forbid_parking();
2405        let mut lang_registry = Arc::new(LanguageRegistry::new());
2406        let fs = FakeFs::new(cx_a.background());
2407        fs.insert_tree(
2408            "/root-1",
2409            json!({
2410                ".zed.toml": r#"collaborators = ["user_b"]"#,
2411                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2412            }),
2413        )
2414        .await;
2415        fs.insert_tree(
2416            "/root-2",
2417            json!({
2418                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2419            }),
2420        )
2421        .await;
2422
2423        // Set up a fake language server.
2424        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2425        Arc::get_mut(&mut lang_registry)
2426            .unwrap()
2427            .add(Arc::new(Language::new(
2428                LanguageConfig {
2429                    name: "Rust".into(),
2430                    path_suffixes: vec!["rs".to_string()],
2431                    language_server: Some(language_server_config),
2432                    ..Default::default()
2433                },
2434                Some(tree_sitter_rust::language()),
2435            )));
2436
2437        // Connect to a server as 2 clients.
2438        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2439        let client_a = server.create_client(&mut cx_a, "user_a").await;
2440        let client_b = server.create_client(&mut cx_b, "user_b").await;
2441
2442        // Share a project as client A
2443        let project_a = cx_a.update(|cx| {
2444            Project::local(
2445                client_a.clone(),
2446                client_a.user_store.clone(),
2447                lang_registry.clone(),
2448                fs.clone(),
2449                cx,
2450            )
2451        });
2452        let (worktree_a, _) = project_a
2453            .update(&mut cx_a, |p, cx| {
2454                p.find_or_create_local_worktree("/root-1", false, cx)
2455            })
2456            .await
2457            .unwrap();
2458        worktree_a
2459            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2460            .await;
2461        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2462        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2463        project_a
2464            .update(&mut cx_a, |p, cx| p.share(cx))
2465            .await
2466            .unwrap();
2467
2468        // Join the worktree as client B.
2469        let project_b = Project::remote(
2470            project_id,
2471            client_b.clone(),
2472            client_b.user_store.clone(),
2473            lang_registry.clone(),
2474            fs.clone(),
2475            &mut cx_b.to_async(),
2476        )
2477        .await
2478        .unwrap();
2479
2480        // Open the file on client B.
2481        let buffer_b = cx_b
2482            .background()
2483            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2484            .await
2485            .unwrap();
2486
2487        // Request the definition of a symbol as the guest.
2488        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2489
2490        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2491        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2492            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2493                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2494                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2495            )))
2496        });
2497
2498        let definitions_1 = definitions_1.await.unwrap();
2499        cx_b.read(|cx| {
2500            assert_eq!(definitions_1.len(), 1);
2501            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2502            let target_buffer = definitions_1[0].buffer.read(cx);
2503            assert_eq!(
2504                target_buffer.text(),
2505                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2506            );
2507            assert_eq!(
2508                definitions_1[0].range.to_point(target_buffer),
2509                Point::new(0, 6)..Point::new(0, 9)
2510            );
2511        });
2512
2513        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2514        // the previous call to `definition`.
2515        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2516        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2517            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2518                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2519                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2520            )))
2521        });
2522
2523        let definitions_2 = definitions_2.await.unwrap();
2524        cx_b.read(|cx| {
2525            assert_eq!(definitions_2.len(), 1);
2526            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2527            let target_buffer = definitions_2[0].buffer.read(cx);
2528            assert_eq!(
2529                target_buffer.text(),
2530                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2531            );
2532            assert_eq!(
2533                definitions_2[0].range.to_point(target_buffer),
2534                Point::new(1, 6)..Point::new(1, 11)
2535            );
2536        });
2537        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2538
2539        cx_b.update(|_| {
2540            drop(definitions_1);
2541            drop(definitions_2);
2542        });
2543        project_b
2544            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2545            .await;
2546    }
2547
2548    #[gpui::test(iterations = 10)]
2549    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2550        cx_a.foreground().forbid_parking();
2551        let mut lang_registry = Arc::new(LanguageRegistry::new());
2552        let fs = FakeFs::new(cx_a.background());
2553        fs.insert_tree(
2554            "/root-1",
2555            json!({
2556                ".zed.toml": r#"collaborators = ["user_b"]"#,
2557                "one.rs": "const ONE: usize = 1;",
2558                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2559            }),
2560        )
2561        .await;
2562        fs.insert_tree(
2563            "/root-2",
2564            json!({
2565                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2566            }),
2567        )
2568        .await;
2569
2570        // Set up a fake language server.
2571        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2572        Arc::get_mut(&mut lang_registry)
2573            .unwrap()
2574            .add(Arc::new(Language::new(
2575                LanguageConfig {
2576                    name: "Rust".into(),
2577                    path_suffixes: vec!["rs".to_string()],
2578                    language_server: Some(language_server_config),
2579                    ..Default::default()
2580                },
2581                Some(tree_sitter_rust::language()),
2582            )));
2583
2584        // Connect to a server as 2 clients.
2585        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2586        let client_a = server.create_client(&mut cx_a, "user_a").await;
2587        let client_b = server.create_client(&mut cx_b, "user_b").await;
2588
2589        // Share a project as client A
2590        let project_a = cx_a.update(|cx| {
2591            Project::local(
2592                client_a.clone(),
2593                client_a.user_store.clone(),
2594                lang_registry.clone(),
2595                fs.clone(),
2596                cx,
2597            )
2598        });
2599        let (worktree_a, _) = project_a
2600            .update(&mut cx_a, |p, cx| {
2601                p.find_or_create_local_worktree("/root-1", false, cx)
2602            })
2603            .await
2604            .unwrap();
2605        worktree_a
2606            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2607            .await;
2608        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2609        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2610        project_a
2611            .update(&mut cx_a, |p, cx| p.share(cx))
2612            .await
2613            .unwrap();
2614
2615        // Join the worktree as client B.
2616        let project_b = Project::remote(
2617            project_id,
2618            client_b.clone(),
2619            client_b.user_store.clone(),
2620            lang_registry.clone(),
2621            fs.clone(),
2622            &mut cx_b.to_async(),
2623        )
2624        .await
2625        .unwrap();
2626
2627        // Open the file on client B.
2628        let buffer_b = cx_b
2629            .background()
2630            .spawn(project_b.update(&mut cx_b, |p, cx| {
2631                p.open_buffer((worktree_id, "one.rs"), cx)
2632            }))
2633            .await
2634            .unwrap();
2635
2636        // Request references to a symbol as the guest.
2637        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2638
2639        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2640        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2641            assert_eq!(
2642                params.text_document_position.text_document.uri.as_str(),
2643                "file:///root-1/one.rs"
2644            );
2645            Some(vec![
2646                lsp::Location {
2647                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2648                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2649                },
2650                lsp::Location {
2651                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2652                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2653                },
2654                lsp::Location {
2655                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2656                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2657                },
2658            ])
2659        });
2660
2661        let references = references.await.unwrap();
2662        cx_b.read(|cx| {
2663            assert_eq!(references.len(), 3);
2664            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2665
2666            let two_buffer = references[0].buffer.read(cx);
2667            let three_buffer = references[2].buffer.read(cx);
2668            assert_eq!(
2669                two_buffer.file().unwrap().path().as_ref(),
2670                Path::new("two.rs")
2671            );
2672            assert_eq!(references[1].buffer, references[0].buffer);
2673            assert_eq!(
2674                three_buffer.file().unwrap().full_path(cx),
2675                Path::new("three.rs")
2676            );
2677
2678            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2679            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2680            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2681        });
2682    }
2683
2684    #[gpui::test(iterations = 10)]
2685    async fn test_project_search(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2686        cx_a.foreground().forbid_parking();
2687        let lang_registry = Arc::new(LanguageRegistry::new());
2688        let fs = FakeFs::new(cx_a.background());
2689        fs.insert_tree(
2690            "/root-1",
2691            json!({
2692                ".zed.toml": r#"collaborators = ["user_b"]"#,
2693                "a": "hello world",
2694                "b": "goodnight moon",
2695                "c": "a world of goo",
2696                "d": "world champion of clown world",
2697            }),
2698        )
2699        .await;
2700        fs.insert_tree(
2701            "/root-2",
2702            json!({
2703                "e": "disney world is fun",
2704            }),
2705        )
2706        .await;
2707
2708        // Connect to a server as 2 clients.
2709        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2710        let client_a = server.create_client(&mut cx_a, "user_a").await;
2711        let client_b = server.create_client(&mut cx_b, "user_b").await;
2712
2713        // Share a project as client A
2714        let project_a = cx_a.update(|cx| {
2715            Project::local(
2716                client_a.clone(),
2717                client_a.user_store.clone(),
2718                lang_registry.clone(),
2719                fs.clone(),
2720                cx,
2721            )
2722        });
2723        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2724
2725        let (worktree_1, _) = project_a
2726            .update(&mut cx_a, |p, cx| {
2727                p.find_or_create_local_worktree("/root-1", false, cx)
2728            })
2729            .await
2730            .unwrap();
2731        worktree_1
2732            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2733            .await;
2734        let (worktree_2, _) = project_a
2735            .update(&mut cx_a, |p, cx| {
2736                p.find_or_create_local_worktree("/root-2", false, cx)
2737            })
2738            .await
2739            .unwrap();
2740        worktree_2
2741            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2742            .await;
2743
2744        eprintln!("sharing");
2745
2746        project_a
2747            .update(&mut cx_a, |p, cx| p.share(cx))
2748            .await
2749            .unwrap();
2750
2751        // Join the worktree as client B.
2752        let project_b = Project::remote(
2753            project_id,
2754            client_b.clone(),
2755            client_b.user_store.clone(),
2756            lang_registry.clone(),
2757            fs.clone(),
2758            &mut cx_b.to_async(),
2759        )
2760        .await
2761        .unwrap();
2762
2763        let results = project_b
2764            .update(&mut cx_b, |project, cx| {
2765                project.search(SearchQuery::text("world", false, false), cx)
2766            })
2767            .await
2768            .unwrap();
2769
2770        let mut ranges_by_path = results
2771            .into_iter()
2772            .map(|(buffer, ranges)| {
2773                buffer.read_with(&cx_b, |buffer, cx| {
2774                    let path = buffer.file().unwrap().full_path(cx);
2775                    let offset_ranges = ranges
2776                        .into_iter()
2777                        .map(|range| range.to_offset(buffer))
2778                        .collect::<Vec<_>>();
2779                    (path, offset_ranges)
2780                })
2781            })
2782            .collect::<Vec<_>>();
2783        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2784
2785        assert_eq!(
2786            ranges_by_path,
2787            &[
2788                (PathBuf::from("root-1/a"), vec![6..11]),
2789                (PathBuf::from("root-1/c"), vec![2..7]),
2790                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2791                (PathBuf::from("root-2/e"), vec![7..12]),
2792            ]
2793        );
2794    }
2795
2796    #[gpui::test(iterations = 10)]
2797    async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2798        cx_a.foreground().forbid_parking();
2799        let lang_registry = Arc::new(LanguageRegistry::new());
2800        let fs = FakeFs::new(cx_a.background());
2801        fs.insert_tree(
2802            "/root-1",
2803            json!({
2804                ".zed.toml": r#"collaborators = ["user_b"]"#,
2805                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2806            }),
2807        )
2808        .await;
2809
2810        // Set up a fake language server.
2811        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2812        lang_registry.add(Arc::new(Language::new(
2813            LanguageConfig {
2814                name: "Rust".into(),
2815                path_suffixes: vec!["rs".to_string()],
2816                language_server: Some(language_server_config),
2817                ..Default::default()
2818            },
2819            Some(tree_sitter_rust::language()),
2820        )));
2821
2822        // Connect to a server as 2 clients.
2823        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2824        let client_a = server.create_client(&mut cx_a, "user_a").await;
2825        let client_b = server.create_client(&mut cx_b, "user_b").await;
2826
2827        // Share a project as client A
2828        let project_a = cx_a.update(|cx| {
2829            Project::local(
2830                client_a.clone(),
2831                client_a.user_store.clone(),
2832                lang_registry.clone(),
2833                fs.clone(),
2834                cx,
2835            )
2836        });
2837        let (worktree_a, _) = project_a
2838            .update(&mut cx_a, |p, cx| {
2839                p.find_or_create_local_worktree("/root-1", false, cx)
2840            })
2841            .await
2842            .unwrap();
2843        worktree_a
2844            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2845            .await;
2846        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2847        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2848        project_a
2849            .update(&mut cx_a, |p, cx| p.share(cx))
2850            .await
2851            .unwrap();
2852
2853        // Join the worktree as client B.
2854        let project_b = Project::remote(
2855            project_id,
2856            client_b.clone(),
2857            client_b.user_store.clone(),
2858            lang_registry.clone(),
2859            fs.clone(),
2860            &mut cx_b.to_async(),
2861        )
2862        .await
2863        .unwrap();
2864
2865        // Open the file on client B.
2866        let buffer_b = cx_b
2867            .background()
2868            .spawn(project_b.update(&mut cx_b, |p, cx| {
2869                p.open_buffer((worktree_id, "main.rs"), cx)
2870            }))
2871            .await
2872            .unwrap();
2873
2874        // Request document highlights as the guest.
2875        let highlights =
2876            project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2877
2878        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2879        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2880            |params, _| {
2881                assert_eq!(
2882                    params
2883                        .text_document_position_params
2884                        .text_document
2885                        .uri
2886                        .as_str(),
2887                    "file:///root-1/main.rs"
2888                );
2889                assert_eq!(
2890                    params.text_document_position_params.position,
2891                    lsp::Position::new(0, 34)
2892                );
2893                Some(vec![
2894                    lsp::DocumentHighlight {
2895                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2896                        range: lsp::Range::new(
2897                            lsp::Position::new(0, 10),
2898                            lsp::Position::new(0, 16),
2899                        ),
2900                    },
2901                    lsp::DocumentHighlight {
2902                        kind: Some(lsp::DocumentHighlightKind::READ),
2903                        range: lsp::Range::new(
2904                            lsp::Position::new(0, 32),
2905                            lsp::Position::new(0, 38),
2906                        ),
2907                    },
2908                    lsp::DocumentHighlight {
2909                        kind: Some(lsp::DocumentHighlightKind::READ),
2910                        range: lsp::Range::new(
2911                            lsp::Position::new(0, 41),
2912                            lsp::Position::new(0, 47),
2913                        ),
2914                    },
2915                ])
2916            },
2917        );
2918
2919        let highlights = highlights.await.unwrap();
2920        buffer_b.read_with(&cx_b, |buffer, _| {
2921            let snapshot = buffer.snapshot();
2922
2923            let highlights = highlights
2924                .into_iter()
2925                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2926                .collect::<Vec<_>>();
2927            assert_eq!(
2928                highlights,
2929                &[
2930                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2931                    (lsp::DocumentHighlightKind::READ, 32..38),
2932                    (lsp::DocumentHighlightKind::READ, 41..47)
2933                ]
2934            )
2935        });
2936    }
2937
2938    #[gpui::test(iterations = 10)]
2939    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2940        cx_a.foreground().forbid_parking();
2941        let mut lang_registry = Arc::new(LanguageRegistry::new());
2942        let fs = FakeFs::new(cx_a.background());
2943        fs.insert_tree(
2944            "/code",
2945            json!({
2946                "crate-1": {
2947                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2948                    "one.rs": "const ONE: usize = 1;",
2949                },
2950                "crate-2": {
2951                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2952                },
2953                "private": {
2954                    "passwords.txt": "the-password",
2955                }
2956            }),
2957        )
2958        .await;
2959
2960        // Set up a fake language server.
2961        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2962        Arc::get_mut(&mut lang_registry)
2963            .unwrap()
2964            .add(Arc::new(Language::new(
2965                LanguageConfig {
2966                    name: "Rust".into(),
2967                    path_suffixes: vec!["rs".to_string()],
2968                    language_server: Some(language_server_config),
2969                    ..Default::default()
2970                },
2971                Some(tree_sitter_rust::language()),
2972            )));
2973
2974        // Connect to a server as 2 clients.
2975        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2976        let client_a = server.create_client(&mut cx_a, "user_a").await;
2977        let client_b = server.create_client(&mut cx_b, "user_b").await;
2978
2979        // Share a project as client A
2980        let project_a = cx_a.update(|cx| {
2981            Project::local(
2982                client_a.clone(),
2983                client_a.user_store.clone(),
2984                lang_registry.clone(),
2985                fs.clone(),
2986                cx,
2987            )
2988        });
2989        let (worktree_a, _) = project_a
2990            .update(&mut cx_a, |p, cx| {
2991                p.find_or_create_local_worktree("/code/crate-1", false, cx)
2992            })
2993            .await
2994            .unwrap();
2995        worktree_a
2996            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2997            .await;
2998        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2999        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3000        project_a
3001            .update(&mut cx_a, |p, cx| p.share(cx))
3002            .await
3003            .unwrap();
3004
3005        // Join the worktree as client B.
3006        let project_b = Project::remote(
3007            project_id,
3008            client_b.clone(),
3009            client_b.user_store.clone(),
3010            lang_registry.clone(),
3011            fs.clone(),
3012            &mut cx_b.to_async(),
3013        )
3014        .await
3015        .unwrap();
3016
3017        // Cause the language server to start.
3018        let _buffer = cx_b
3019            .background()
3020            .spawn(project_b.update(&mut cx_b, |p, cx| {
3021                p.open_buffer((worktree_id, "one.rs"), cx)
3022            }))
3023            .await
3024            .unwrap();
3025
3026        // Request the definition of a symbol as the guest.
3027        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3028        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3029        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3030            #[allow(deprecated)]
3031            Some(vec![lsp::SymbolInformation {
3032                name: "TWO".into(),
3033                location: lsp::Location {
3034                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3035                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3036                },
3037                kind: lsp::SymbolKind::CONSTANT,
3038                tags: None,
3039                container_name: None,
3040                deprecated: None,
3041            }])
3042        });
3043
3044        let symbols = symbols.await.unwrap();
3045        assert_eq!(symbols.len(), 1);
3046        assert_eq!(symbols[0].name, "TWO");
3047
3048        // Open one of the returned symbols.
3049        let buffer_b_2 = project_b
3050            .update(&mut cx_b, |project, cx| {
3051                project.open_buffer_for_symbol(&symbols[0], cx)
3052            })
3053            .await
3054            .unwrap();
3055        buffer_b_2.read_with(&cx_b, |buffer, _| {
3056            assert_eq!(
3057                buffer.file().unwrap().path().as_ref(),
3058                Path::new("../crate-2/two.rs")
3059            );
3060        });
3061
3062        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3063        let mut fake_symbol = symbols[0].clone();
3064        fake_symbol.path = Path::new("/code/secrets").into();
3065        let error = project_b
3066            .update(&mut cx_b, |project, cx| {
3067                project.open_buffer_for_symbol(&fake_symbol, cx)
3068            })
3069            .await
3070            .unwrap_err();
3071        assert!(error.to_string().contains("invalid symbol signature"));
3072    }
3073
3074    #[gpui::test(iterations = 10)]
3075    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3076        mut cx_a: TestAppContext,
3077        mut cx_b: TestAppContext,
3078        mut rng: StdRng,
3079    ) {
3080        cx_a.foreground().forbid_parking();
3081        let mut lang_registry = Arc::new(LanguageRegistry::new());
3082        let fs = FakeFs::new(cx_a.background());
3083        fs.insert_tree(
3084            "/root",
3085            json!({
3086                ".zed.toml": r#"collaborators = ["user_b"]"#,
3087                "a.rs": "const ONE: usize = b::TWO;",
3088                "b.rs": "const TWO: usize = 2",
3089            }),
3090        )
3091        .await;
3092
3093        // Set up a fake language server.
3094        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3095
3096        Arc::get_mut(&mut lang_registry)
3097            .unwrap()
3098            .add(Arc::new(Language::new(
3099                LanguageConfig {
3100                    name: "Rust".into(),
3101                    path_suffixes: vec!["rs".to_string()],
3102                    language_server: Some(language_server_config),
3103                    ..Default::default()
3104                },
3105                Some(tree_sitter_rust::language()),
3106            )));
3107
3108        // Connect to a server as 2 clients.
3109        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3110        let client_a = server.create_client(&mut cx_a, "user_a").await;
3111        let client_b = server.create_client(&mut cx_b, "user_b").await;
3112
3113        // Share a project as client A
3114        let project_a = cx_a.update(|cx| {
3115            Project::local(
3116                client_a.clone(),
3117                client_a.user_store.clone(),
3118                lang_registry.clone(),
3119                fs.clone(),
3120                cx,
3121            )
3122        });
3123
3124        let (worktree_a, _) = project_a
3125            .update(&mut cx_a, |p, cx| {
3126                p.find_or_create_local_worktree("/root", false, cx)
3127            })
3128            .await
3129            .unwrap();
3130        worktree_a
3131            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3132            .await;
3133        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3134        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3135        project_a
3136            .update(&mut cx_a, |p, cx| p.share(cx))
3137            .await
3138            .unwrap();
3139
3140        // Join the worktree as client B.
3141        let project_b = Project::remote(
3142            project_id,
3143            client_b.clone(),
3144            client_b.user_store.clone(),
3145            lang_registry.clone(),
3146            fs.clone(),
3147            &mut cx_b.to_async(),
3148        )
3149        .await
3150        .unwrap();
3151
3152        let buffer_b1 = cx_b
3153            .background()
3154            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3155            .await
3156            .unwrap();
3157
3158        let definitions;
3159        let buffer_b2;
3160        if rng.gen() {
3161            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3162            buffer_b2 =
3163                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3164        } else {
3165            buffer_b2 =
3166                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3167            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3168        }
3169
3170        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3171        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3172            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3173                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3174                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3175            )))
3176        });
3177
3178        let buffer_b2 = buffer_b2.await.unwrap();
3179        let definitions = definitions.await.unwrap();
3180        assert_eq!(definitions.len(), 1);
3181        assert_eq!(definitions[0].buffer, buffer_b2);
3182    }
3183
3184    #[gpui::test(iterations = 10)]
3185    async fn test_collaborating_with_code_actions(
3186        mut cx_a: TestAppContext,
3187        mut cx_b: TestAppContext,
3188    ) {
3189        cx_a.foreground().forbid_parking();
3190        let mut lang_registry = Arc::new(LanguageRegistry::new());
3191        let fs = FakeFs::new(cx_a.background());
3192        let mut path_openers_b = Vec::new();
3193        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3194
3195        // Set up a fake language server.
3196        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3197        Arc::get_mut(&mut lang_registry)
3198            .unwrap()
3199            .add(Arc::new(Language::new(
3200                LanguageConfig {
3201                    name: "Rust".into(),
3202                    path_suffixes: vec!["rs".to_string()],
3203                    language_server: Some(language_server_config),
3204                    ..Default::default()
3205                },
3206                Some(tree_sitter_rust::language()),
3207            )));
3208
3209        // Connect to a server as 2 clients.
3210        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3211        let client_a = server.create_client(&mut cx_a, "user_a").await;
3212        let client_b = server.create_client(&mut cx_b, "user_b").await;
3213
3214        // Share a project as client A
3215        fs.insert_tree(
3216            "/a",
3217            json!({
3218                ".zed.toml": r#"collaborators = ["user_b"]"#,
3219                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3220                "other.rs": "pub fn foo() -> usize { 4 }",
3221            }),
3222        )
3223        .await;
3224        let project_a = cx_a.update(|cx| {
3225            Project::local(
3226                client_a.clone(),
3227                client_a.user_store.clone(),
3228                lang_registry.clone(),
3229                fs.clone(),
3230                cx,
3231            )
3232        });
3233        let (worktree_a, _) = project_a
3234            .update(&mut cx_a, |p, cx| {
3235                p.find_or_create_local_worktree("/a", false, cx)
3236            })
3237            .await
3238            .unwrap();
3239        worktree_a
3240            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3241            .await;
3242        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3243        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3244        project_a
3245            .update(&mut cx_a, |p, cx| p.share(cx))
3246            .await
3247            .unwrap();
3248
3249        // Join the worktree as client B.
3250        let project_b = Project::remote(
3251            project_id,
3252            client_b.clone(),
3253            client_b.user_store.clone(),
3254            lang_registry.clone(),
3255            fs.clone(),
3256            &mut cx_b.to_async(),
3257        )
3258        .await
3259        .unwrap();
3260        let mut params = cx_b.update(WorkspaceParams::test);
3261        params.languages = lang_registry.clone();
3262        params.client = client_b.client.clone();
3263        params.user_store = client_b.user_store.clone();
3264        params.project = project_b;
3265        params.path_openers = path_openers_b.into();
3266
3267        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3268        let editor_b = workspace_b
3269            .update(&mut cx_b, |workspace, cx| {
3270                workspace.open_path((worktree_id, "main.rs").into(), cx)
3271            })
3272            .await
3273            .unwrap()
3274            .downcast::<Editor>()
3275            .unwrap();
3276
3277        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3278        fake_language_server
3279            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3280                assert_eq!(
3281                    params.text_document.uri,
3282                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3283                );
3284                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3285                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3286                None
3287            })
3288            .next()
3289            .await;
3290
3291        // Move cursor to a location that contains code actions.
3292        editor_b.update(&mut cx_b, |editor, cx| {
3293            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3294            cx.focus(&editor_b);
3295        });
3296
3297        fake_language_server
3298            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3299                assert_eq!(
3300                    params.text_document.uri,
3301                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3302                );
3303                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3304                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3305
3306                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3307                    lsp::CodeAction {
3308                        title: "Inline into all callers".to_string(),
3309                        edit: Some(lsp::WorkspaceEdit {
3310                            changes: Some(
3311                                [
3312                                    (
3313                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3314                                        vec![lsp::TextEdit::new(
3315                                            lsp::Range::new(
3316                                                lsp::Position::new(1, 22),
3317                                                lsp::Position::new(1, 34),
3318                                            ),
3319                                            "4".to_string(),
3320                                        )],
3321                                    ),
3322                                    (
3323                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3324                                        vec![lsp::TextEdit::new(
3325                                            lsp::Range::new(
3326                                                lsp::Position::new(0, 0),
3327                                                lsp::Position::new(0, 27),
3328                                            ),
3329                                            "".to_string(),
3330                                        )],
3331                                    ),
3332                                ]
3333                                .into_iter()
3334                                .collect(),
3335                            ),
3336                            ..Default::default()
3337                        }),
3338                        data: Some(json!({
3339                            "codeActionParams": {
3340                                "range": {
3341                                    "start": {"line": 1, "column": 31},
3342                                    "end": {"line": 1, "column": 31},
3343                                }
3344                            }
3345                        })),
3346                        ..Default::default()
3347                    },
3348                )])
3349            })
3350            .next()
3351            .await;
3352
3353        // Toggle code actions and wait for them to display.
3354        editor_b.update(&mut cx_b, |editor, cx| {
3355            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3356        });
3357        editor_b
3358            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3359            .await;
3360
3361        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3362
3363        // Confirming the code action will trigger a resolve request.
3364        let confirm_action = workspace_b
3365            .update(&mut cx_b, |workspace, cx| {
3366                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3367            })
3368            .unwrap();
3369        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3370            lsp::CodeAction {
3371                title: "Inline into all callers".to_string(),
3372                edit: Some(lsp::WorkspaceEdit {
3373                    changes: Some(
3374                        [
3375                            (
3376                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3377                                vec![lsp::TextEdit::new(
3378                                    lsp::Range::new(
3379                                        lsp::Position::new(1, 22),
3380                                        lsp::Position::new(1, 34),
3381                                    ),
3382                                    "4".to_string(),
3383                                )],
3384                            ),
3385                            (
3386                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3387                                vec![lsp::TextEdit::new(
3388                                    lsp::Range::new(
3389                                        lsp::Position::new(0, 0),
3390                                        lsp::Position::new(0, 27),
3391                                    ),
3392                                    "".to_string(),
3393                                )],
3394                            ),
3395                        ]
3396                        .into_iter()
3397                        .collect(),
3398                    ),
3399                    ..Default::default()
3400                }),
3401                ..Default::default()
3402            }
3403        });
3404
3405        // After the action is confirmed, an editor containing both modified files is opened.
3406        confirm_action.await.unwrap();
3407        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3408            workspace
3409                .active_item(cx)
3410                .unwrap()
3411                .downcast::<Editor>()
3412                .unwrap()
3413        });
3414        code_action_editor.update(&mut cx_b, |editor, cx| {
3415            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3416            editor.undo(&Undo, cx);
3417            assert_eq!(
3418                editor.text(cx),
3419                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3420            );
3421            editor.redo(&Redo, cx);
3422            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3423        });
3424    }
3425
3426    #[gpui::test(iterations = 10)]
3427    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3428        cx_a.foreground().forbid_parking();
3429        let mut lang_registry = Arc::new(LanguageRegistry::new());
3430        let fs = FakeFs::new(cx_a.background());
3431        let mut path_openers_b = Vec::new();
3432        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3433
3434        // Set up a fake language server.
3435        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3436        Arc::get_mut(&mut lang_registry)
3437            .unwrap()
3438            .add(Arc::new(Language::new(
3439                LanguageConfig {
3440                    name: "Rust".into(),
3441                    path_suffixes: vec!["rs".to_string()],
3442                    language_server: Some(language_server_config),
3443                    ..Default::default()
3444                },
3445                Some(tree_sitter_rust::language()),
3446            )));
3447
3448        // Connect to a server as 2 clients.
3449        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3450        let client_a = server.create_client(&mut cx_a, "user_a").await;
3451        let client_b = server.create_client(&mut cx_b, "user_b").await;
3452
3453        // Share a project as client A
3454        fs.insert_tree(
3455            "/dir",
3456            json!({
3457                ".zed.toml": r#"collaborators = ["user_b"]"#,
3458                "one.rs": "const ONE: usize = 1;",
3459                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3460            }),
3461        )
3462        .await;
3463        let project_a = cx_a.update(|cx| {
3464            Project::local(
3465                client_a.clone(),
3466                client_a.user_store.clone(),
3467                lang_registry.clone(),
3468                fs.clone(),
3469                cx,
3470            )
3471        });
3472        let (worktree_a, _) = project_a
3473            .update(&mut cx_a, |p, cx| {
3474                p.find_or_create_local_worktree("/dir", false, cx)
3475            })
3476            .await
3477            .unwrap();
3478        worktree_a
3479            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3480            .await;
3481        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3482        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3483        project_a
3484            .update(&mut cx_a, |p, cx| p.share(cx))
3485            .await
3486            .unwrap();
3487
3488        // Join the worktree as client B.
3489        let project_b = Project::remote(
3490            project_id,
3491            client_b.clone(),
3492            client_b.user_store.clone(),
3493            lang_registry.clone(),
3494            fs.clone(),
3495            &mut cx_b.to_async(),
3496        )
3497        .await
3498        .unwrap();
3499        let mut params = cx_b.update(WorkspaceParams::test);
3500        params.languages = lang_registry.clone();
3501        params.client = client_b.client.clone();
3502        params.user_store = client_b.user_store.clone();
3503        params.project = project_b;
3504        params.path_openers = path_openers_b.into();
3505
3506        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3507        let editor_b = workspace_b
3508            .update(&mut cx_b, |workspace, cx| {
3509                workspace.open_path((worktree_id, "one.rs").into(), cx)
3510            })
3511            .await
3512            .unwrap()
3513            .downcast::<Editor>()
3514            .unwrap();
3515        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3516
3517        // Move cursor to a location that can be renamed.
3518        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3519            editor.select_ranges([7..7], None, cx);
3520            editor.rename(&Rename, cx).unwrap()
3521        });
3522
3523        fake_language_server
3524            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3525                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3526                assert_eq!(params.position, lsp::Position::new(0, 7));
3527                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3528                    lsp::Position::new(0, 6),
3529                    lsp::Position::new(0, 9),
3530                )))
3531            })
3532            .next()
3533            .await
3534            .unwrap();
3535        prepare_rename.await.unwrap();
3536        editor_b.update(&mut cx_b, |editor, cx| {
3537            let rename = editor.pending_rename().unwrap();
3538            let buffer = editor.buffer().read(cx).snapshot(cx);
3539            assert_eq!(
3540                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3541                6..9
3542            );
3543            rename.editor.update(cx, |rename_editor, cx| {
3544                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3545                    rename_buffer.edit([0..3], "THREE", cx);
3546                });
3547            });
3548        });
3549
3550        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3551            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3552        });
3553        fake_language_server
3554            .handle_request::<lsp::request::Rename, _>(|params, _| {
3555                assert_eq!(
3556                    params.text_document_position.text_document.uri.as_str(),
3557                    "file:///dir/one.rs"
3558                );
3559                assert_eq!(
3560                    params.text_document_position.position,
3561                    lsp::Position::new(0, 6)
3562                );
3563                assert_eq!(params.new_name, "THREE");
3564                Some(lsp::WorkspaceEdit {
3565                    changes: Some(
3566                        [
3567                            (
3568                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3569                                vec![lsp::TextEdit::new(
3570                                    lsp::Range::new(
3571                                        lsp::Position::new(0, 6),
3572                                        lsp::Position::new(0, 9),
3573                                    ),
3574                                    "THREE".to_string(),
3575                                )],
3576                            ),
3577                            (
3578                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3579                                vec![
3580                                    lsp::TextEdit::new(
3581                                        lsp::Range::new(
3582                                            lsp::Position::new(0, 24),
3583                                            lsp::Position::new(0, 27),
3584                                        ),
3585                                        "THREE".to_string(),
3586                                    ),
3587                                    lsp::TextEdit::new(
3588                                        lsp::Range::new(
3589                                            lsp::Position::new(0, 35),
3590                                            lsp::Position::new(0, 38),
3591                                        ),
3592                                        "THREE".to_string(),
3593                                    ),
3594                                ],
3595                            ),
3596                        ]
3597                        .into_iter()
3598                        .collect(),
3599                    ),
3600                    ..Default::default()
3601                })
3602            })
3603            .next()
3604            .await
3605            .unwrap();
3606        confirm_rename.await.unwrap();
3607
3608        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3609            workspace
3610                .active_item(cx)
3611                .unwrap()
3612                .downcast::<Editor>()
3613                .unwrap()
3614        });
3615        rename_editor.update(&mut cx_b, |editor, cx| {
3616            assert_eq!(
3617                editor.text(cx),
3618                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3619            );
3620            editor.undo(&Undo, cx);
3621            assert_eq!(
3622                editor.text(cx),
3623                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3624            );
3625            editor.redo(&Redo, cx);
3626            assert_eq!(
3627                editor.text(cx),
3628                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3629            );
3630        });
3631
3632        // Ensure temporary rename edits cannot be undone/redone.
3633        editor_b.update(&mut cx_b, |editor, cx| {
3634            editor.undo(&Undo, cx);
3635            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3636            editor.undo(&Undo, cx);
3637            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3638            editor.redo(&Redo, cx);
3639            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3640        })
3641    }
3642
3643    #[gpui::test(iterations = 10)]
3644    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3645        cx_a.foreground().forbid_parking();
3646
3647        // Connect to a server as 2 clients.
3648        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3649        let client_a = server.create_client(&mut cx_a, "user_a").await;
3650        let client_b = server.create_client(&mut cx_b, "user_b").await;
3651
3652        // Create an org that includes these 2 users.
3653        let db = &server.app_state.db;
3654        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3655        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3656            .await
3657            .unwrap();
3658        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3659            .await
3660            .unwrap();
3661
3662        // Create a channel that includes all the users.
3663        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3664        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3665            .await
3666            .unwrap();
3667        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3668            .await
3669            .unwrap();
3670        db.create_channel_message(
3671            channel_id,
3672            client_b.current_user_id(&cx_b),
3673            "hello A, it's B.",
3674            OffsetDateTime::now_utc(),
3675            1,
3676        )
3677        .await
3678        .unwrap();
3679
3680        let channels_a = cx_a
3681            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3682        channels_a
3683            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3684            .await;
3685        channels_a.read_with(&cx_a, |list, _| {
3686            assert_eq!(
3687                list.available_channels().unwrap(),
3688                &[ChannelDetails {
3689                    id: channel_id.to_proto(),
3690                    name: "test-channel".to_string()
3691                }]
3692            )
3693        });
3694        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3695            this.get_channel(channel_id.to_proto(), cx).unwrap()
3696        });
3697        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3698        channel_a
3699            .condition(&cx_a, |channel, _| {
3700                channel_messages(channel)
3701                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3702            })
3703            .await;
3704
3705        let channels_b = cx_b
3706            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3707        channels_b
3708            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3709            .await;
3710        channels_b.read_with(&cx_b, |list, _| {
3711            assert_eq!(
3712                list.available_channels().unwrap(),
3713                &[ChannelDetails {
3714                    id: channel_id.to_proto(),
3715                    name: "test-channel".to_string()
3716                }]
3717            )
3718        });
3719
3720        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3721            this.get_channel(channel_id.to_proto(), cx).unwrap()
3722        });
3723        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3724        channel_b
3725            .condition(&cx_b, |channel, _| {
3726                channel_messages(channel)
3727                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3728            })
3729            .await;
3730
3731        channel_a
3732            .update(&mut cx_a, |channel, cx| {
3733                channel
3734                    .send_message("oh, hi B.".to_string(), cx)
3735                    .unwrap()
3736                    .detach();
3737                let task = channel.send_message("sup".to_string(), cx).unwrap();
3738                assert_eq!(
3739                    channel_messages(channel),
3740                    &[
3741                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3742                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3743                        ("user_a".to_string(), "sup".to_string(), true)
3744                    ]
3745                );
3746                task
3747            })
3748            .await
3749            .unwrap();
3750
3751        channel_b
3752            .condition(&cx_b, |channel, _| {
3753                channel_messages(channel)
3754                    == [
3755                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3756                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3757                        ("user_a".to_string(), "sup".to_string(), false),
3758                    ]
3759            })
3760            .await;
3761
3762        assert_eq!(
3763            server
3764                .state()
3765                .await
3766                .channel(channel_id)
3767                .unwrap()
3768                .connection_ids
3769                .len(),
3770            2
3771        );
3772        cx_b.update(|_| drop(channel_b));
3773        server
3774            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3775            .await;
3776
3777        cx_a.update(|_| drop(channel_a));
3778        server
3779            .condition(|state| state.channel(channel_id).is_none())
3780            .await;
3781    }
3782
3783    #[gpui::test(iterations = 10)]
3784    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3785        cx_a.foreground().forbid_parking();
3786
3787        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3788        let client_a = server.create_client(&mut cx_a, "user_a").await;
3789
3790        let db = &server.app_state.db;
3791        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3792        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3793        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3794            .await
3795            .unwrap();
3796        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3797            .await
3798            .unwrap();
3799
3800        let channels_a = cx_a
3801            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3802        channels_a
3803            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3804            .await;
3805        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3806            this.get_channel(channel_id.to_proto(), cx).unwrap()
3807        });
3808
3809        // Messages aren't allowed to be too long.
3810        channel_a
3811            .update(&mut cx_a, |channel, cx| {
3812                let long_body = "this is long.\n".repeat(1024);
3813                channel.send_message(long_body, cx).unwrap()
3814            })
3815            .await
3816            .unwrap_err();
3817
3818        // Messages aren't allowed to be blank.
3819        channel_a.update(&mut cx_a, |channel, cx| {
3820            channel.send_message(String::new(), cx).unwrap_err()
3821        });
3822
3823        // Leading and trailing whitespace are trimmed.
3824        channel_a
3825            .update(&mut cx_a, |channel, cx| {
3826                channel
3827                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3828                    .unwrap()
3829            })
3830            .await
3831            .unwrap();
3832        assert_eq!(
3833            db.get_channel_messages(channel_id, 10, None)
3834                .await
3835                .unwrap()
3836                .iter()
3837                .map(|m| &m.body)
3838                .collect::<Vec<_>>(),
3839            &["surrounded by whitespace"]
3840        );
3841    }
3842
3843    #[gpui::test(iterations = 10)]
3844    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3845        cx_a.foreground().forbid_parking();
3846
3847        // Connect to a server as 2 clients.
3848        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3849        let client_a = server.create_client(&mut cx_a, "user_a").await;
3850        let client_b = server.create_client(&mut cx_b, "user_b").await;
3851        let mut status_b = client_b.status();
3852
3853        // Create an org that includes these 2 users.
3854        let db = &server.app_state.db;
3855        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3856        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3857            .await
3858            .unwrap();
3859        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3860            .await
3861            .unwrap();
3862
3863        // Create a channel that includes all the users.
3864        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3865        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3866            .await
3867            .unwrap();
3868        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3869            .await
3870            .unwrap();
3871        db.create_channel_message(
3872            channel_id,
3873            client_b.current_user_id(&cx_b),
3874            "hello A, it's B.",
3875            OffsetDateTime::now_utc(),
3876            2,
3877        )
3878        .await
3879        .unwrap();
3880
3881        let channels_a = cx_a
3882            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3883        channels_a
3884            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3885            .await;
3886
3887        channels_a.read_with(&cx_a, |list, _| {
3888            assert_eq!(
3889                list.available_channels().unwrap(),
3890                &[ChannelDetails {
3891                    id: channel_id.to_proto(),
3892                    name: "test-channel".to_string()
3893                }]
3894            )
3895        });
3896        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3897            this.get_channel(channel_id.to_proto(), cx).unwrap()
3898        });
3899        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3900        channel_a
3901            .condition(&cx_a, |channel, _| {
3902                channel_messages(channel)
3903                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3904            })
3905            .await;
3906
3907        let channels_b = cx_b
3908            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3909        channels_b
3910            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3911            .await;
3912        channels_b.read_with(&cx_b, |list, _| {
3913            assert_eq!(
3914                list.available_channels().unwrap(),
3915                &[ChannelDetails {
3916                    id: channel_id.to_proto(),
3917                    name: "test-channel".to_string()
3918                }]
3919            )
3920        });
3921
3922        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3923            this.get_channel(channel_id.to_proto(), cx).unwrap()
3924        });
3925        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3926        channel_b
3927            .condition(&cx_b, |channel, _| {
3928                channel_messages(channel)
3929                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3930            })
3931            .await;
3932
3933        // Disconnect client B, ensuring we can still access its cached channel data.
3934        server.forbid_connections();
3935        server.disconnect_client(client_b.current_user_id(&cx_b));
3936        while !matches!(
3937            status_b.next().await,
3938            Some(client::Status::ReconnectionError { .. })
3939        ) {}
3940
3941        channels_b.read_with(&cx_b, |channels, _| {
3942            assert_eq!(
3943                channels.available_channels().unwrap(),
3944                [ChannelDetails {
3945                    id: channel_id.to_proto(),
3946                    name: "test-channel".to_string()
3947                }]
3948            )
3949        });
3950        channel_b.read_with(&cx_b, |channel, _| {
3951            assert_eq!(
3952                channel_messages(channel),
3953                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3954            )
3955        });
3956
3957        // Send a message from client B while it is disconnected.
3958        channel_b
3959            .update(&mut cx_b, |channel, cx| {
3960                let task = channel
3961                    .send_message("can you see this?".to_string(), cx)
3962                    .unwrap();
3963                assert_eq!(
3964                    channel_messages(channel),
3965                    &[
3966                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3967                        ("user_b".to_string(), "can you see this?".to_string(), true)
3968                    ]
3969                );
3970                task
3971            })
3972            .await
3973            .unwrap_err();
3974
3975        // Send a message from client A while B is disconnected.
3976        channel_a
3977            .update(&mut cx_a, |channel, cx| {
3978                channel
3979                    .send_message("oh, hi B.".to_string(), cx)
3980                    .unwrap()
3981                    .detach();
3982                let task = channel.send_message("sup".to_string(), cx).unwrap();
3983                assert_eq!(
3984                    channel_messages(channel),
3985                    &[
3986                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3987                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3988                        ("user_a".to_string(), "sup".to_string(), true)
3989                    ]
3990                );
3991                task
3992            })
3993            .await
3994            .unwrap();
3995
3996        // Give client B a chance to reconnect.
3997        server.allow_connections();
3998        cx_b.foreground().advance_clock(Duration::from_secs(10));
3999
4000        // Verify that B sees the new messages upon reconnection, as well as the message client B
4001        // sent while offline.
4002        channel_b
4003            .condition(&cx_b, |channel, _| {
4004                channel_messages(channel)
4005                    == [
4006                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4007                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4008                        ("user_a".to_string(), "sup".to_string(), false),
4009                        ("user_b".to_string(), "can you see this?".to_string(), false),
4010                    ]
4011            })
4012            .await;
4013
4014        // Ensure client A and B can communicate normally after reconnection.
4015        channel_a
4016            .update(&mut cx_a, |channel, cx| {
4017                channel.send_message("you online?".to_string(), cx).unwrap()
4018            })
4019            .await
4020            .unwrap();
4021        channel_b
4022            .condition(&cx_b, |channel, _| {
4023                channel_messages(channel)
4024                    == [
4025                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4026                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4027                        ("user_a".to_string(), "sup".to_string(), false),
4028                        ("user_b".to_string(), "can you see this?".to_string(), false),
4029                        ("user_a".to_string(), "you online?".to_string(), false),
4030                    ]
4031            })
4032            .await;
4033
4034        channel_b
4035            .update(&mut cx_b, |channel, cx| {
4036                channel.send_message("yep".to_string(), cx).unwrap()
4037            })
4038            .await
4039            .unwrap();
4040        channel_a
4041            .condition(&cx_a, |channel, _| {
4042                channel_messages(channel)
4043                    == [
4044                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4045                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4046                        ("user_a".to_string(), "sup".to_string(), false),
4047                        ("user_b".to_string(), "can you see this?".to_string(), false),
4048                        ("user_a".to_string(), "you online?".to_string(), false),
4049                        ("user_b".to_string(), "yep".to_string(), false),
4050                    ]
4051            })
4052            .await;
4053    }
4054
4055    #[gpui::test(iterations = 10)]
4056    async fn test_contacts(
4057        mut cx_a: TestAppContext,
4058        mut cx_b: TestAppContext,
4059        mut cx_c: TestAppContext,
4060    ) {
4061        cx_a.foreground().forbid_parking();
4062        let lang_registry = Arc::new(LanguageRegistry::new());
4063        let fs = FakeFs::new(cx_a.background());
4064
4065        // Connect to a server as 3 clients.
4066        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4067        let client_a = server.create_client(&mut cx_a, "user_a").await;
4068        let client_b = server.create_client(&mut cx_b, "user_b").await;
4069        let client_c = server.create_client(&mut cx_c, "user_c").await;
4070
4071        // Share a worktree as client A.
4072        fs.insert_tree(
4073            "/a",
4074            json!({
4075                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4076            }),
4077        )
4078        .await;
4079
4080        let project_a = cx_a.update(|cx| {
4081            Project::local(
4082                client_a.clone(),
4083                client_a.user_store.clone(),
4084                lang_registry.clone(),
4085                fs.clone(),
4086                cx,
4087            )
4088        });
4089        let (worktree_a, _) = project_a
4090            .update(&mut cx_a, |p, cx| {
4091                p.find_or_create_local_worktree("/a", false, cx)
4092            })
4093            .await
4094            .unwrap();
4095        worktree_a
4096            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4097            .await;
4098
4099        client_a
4100            .user_store
4101            .condition(&cx_a, |user_store, _| {
4102                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4103            })
4104            .await;
4105        client_b
4106            .user_store
4107            .condition(&cx_b, |user_store, _| {
4108                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4109            })
4110            .await;
4111        client_c
4112            .user_store
4113            .condition(&cx_c, |user_store, _| {
4114                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4115            })
4116            .await;
4117
4118        let project_id = project_a
4119            .update(&mut cx_a, |project, _| project.next_remote_id())
4120            .await;
4121        project_a
4122            .update(&mut cx_a, |project, cx| project.share(cx))
4123            .await
4124            .unwrap();
4125
4126        let _project_b = Project::remote(
4127            project_id,
4128            client_b.clone(),
4129            client_b.user_store.clone(),
4130            lang_registry.clone(),
4131            fs.clone(),
4132            &mut cx_b.to_async(),
4133        )
4134        .await
4135        .unwrap();
4136
4137        client_a
4138            .user_store
4139            .condition(&cx_a, |user_store, _| {
4140                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4141            })
4142            .await;
4143        client_b
4144            .user_store
4145            .condition(&cx_b, |user_store, _| {
4146                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4147            })
4148            .await;
4149        client_c
4150            .user_store
4151            .condition(&cx_c, |user_store, _| {
4152                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4153            })
4154            .await;
4155
4156        project_a
4157            .condition(&cx_a, |project, _| {
4158                project.collaborators().contains_key(&client_b.peer_id)
4159            })
4160            .await;
4161
4162        cx_a.update(move |_| drop(project_a));
4163        client_a
4164            .user_store
4165            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4166            .await;
4167        client_b
4168            .user_store
4169            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4170            .await;
4171        client_c
4172            .user_store
4173            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4174            .await;
4175
4176        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4177            user_store
4178                .contacts()
4179                .iter()
4180                .map(|contact| {
4181                    let worktrees = contact
4182                        .projects
4183                        .iter()
4184                        .map(|p| {
4185                            (
4186                                p.worktree_root_names[0].as_str(),
4187                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4188                            )
4189                        })
4190                        .collect();
4191                    (contact.user.github_login.as_str(), worktrees)
4192                })
4193                .collect()
4194        }
4195    }
4196
4197    #[gpui::test(iterations = 100)]
4198    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4199        cx.foreground().forbid_parking();
4200        let max_peers = env::var("MAX_PEERS")
4201            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4202            .unwrap_or(5);
4203        let max_operations = env::var("OPERATIONS")
4204            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4205            .unwrap_or(10);
4206
4207        let rng = Arc::new(Mutex::new(rng));
4208
4209        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4210        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4211
4212        let fs = FakeFs::new(cx.background());
4213        fs.insert_tree(
4214            "/_collab",
4215            json!({
4216                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4217            }),
4218        )
4219        .await;
4220
4221        let operations = Rc::new(Cell::new(0));
4222        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4223        let mut clients = Vec::new();
4224
4225        let mut next_entity_id = 100000;
4226        let mut host_cx = TestAppContext::new(
4227            cx.foreground_platform(),
4228            cx.platform(),
4229            cx.foreground(),
4230            cx.background(),
4231            cx.font_cache(),
4232            next_entity_id,
4233        );
4234        let host = server.create_client(&mut host_cx, "host").await;
4235        let host_project = host_cx.update(|cx| {
4236            Project::local(
4237                host.client.clone(),
4238                host.user_store.clone(),
4239                Arc::new(LanguageRegistry::new()),
4240                fs.clone(),
4241                cx,
4242            )
4243        });
4244        let host_project_id = host_project
4245            .update(&mut host_cx, |p, _| p.next_remote_id())
4246            .await;
4247
4248        let (collab_worktree, _) = host_project
4249            .update(&mut host_cx, |project, cx| {
4250                project.find_or_create_local_worktree("/_collab", false, cx)
4251            })
4252            .await
4253            .unwrap();
4254        collab_worktree
4255            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4256            .await;
4257        host_project
4258            .update(&mut host_cx, |project, cx| project.share(cx))
4259            .await
4260            .unwrap();
4261
4262        clients.push(cx.foreground().spawn(host.simulate_host(
4263            host_project.clone(),
4264            language_server_config,
4265            operations.clone(),
4266            max_operations,
4267            rng.clone(),
4268            host_cx.clone(),
4269        )));
4270
4271        while operations.get() < max_operations {
4272            cx.background().simulate_random_delay().await;
4273            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4274                operations.set(operations.get() + 1);
4275
4276                let guest_id = clients.len();
4277                log::info!("Adding guest {}", guest_id);
4278                next_entity_id += 100000;
4279                let mut guest_cx = TestAppContext::new(
4280                    cx.foreground_platform(),
4281                    cx.platform(),
4282                    cx.foreground(),
4283                    cx.background(),
4284                    cx.font_cache(),
4285                    next_entity_id,
4286                );
4287                let guest = server
4288                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4289                    .await;
4290                let guest_project = Project::remote(
4291                    host_project_id,
4292                    guest.client.clone(),
4293                    guest.user_store.clone(),
4294                    guest_lang_registry.clone(),
4295                    fs.clone(),
4296                    &mut guest_cx.to_async(),
4297                )
4298                .await
4299                .unwrap();
4300                clients.push(cx.foreground().spawn(guest.simulate_guest(
4301                    guest_id,
4302                    guest_project,
4303                    operations.clone(),
4304                    max_operations,
4305                    rng.clone(),
4306                    guest_cx,
4307                )));
4308
4309                log::info!("Guest {} added", guest_id);
4310            }
4311        }
4312
4313        let clients = futures::future::join_all(clients).await;
4314        cx.foreground().run_until_parked();
4315
4316        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4317            project
4318                .worktrees(cx)
4319                .map(|worktree| {
4320                    let snapshot = worktree.read(cx).snapshot();
4321                    (snapshot.id(), snapshot)
4322                })
4323                .collect::<BTreeMap<_, _>>()
4324        });
4325
4326        for (guest_client, guest_cx) in clients.iter().skip(1) {
4327            let guest_id = guest_client.client.id();
4328            let worktree_snapshots =
4329                guest_client
4330                    .project
4331                    .as_ref()
4332                    .unwrap()
4333                    .read_with(guest_cx, |project, cx| {
4334                        project
4335                            .worktrees(cx)
4336                            .map(|worktree| {
4337                                let worktree = worktree.read(cx);
4338                                (worktree.id(), worktree.snapshot())
4339                            })
4340                            .collect::<BTreeMap<_, _>>()
4341                    });
4342
4343            assert_eq!(
4344                worktree_snapshots.keys().collect::<Vec<_>>(),
4345                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4346                "guest {} has different worktrees than the host",
4347                guest_id
4348            );
4349            for (id, host_snapshot) in &host_worktree_snapshots {
4350                let guest_snapshot = &worktree_snapshots[id];
4351                assert_eq!(
4352                    guest_snapshot.root_name(),
4353                    host_snapshot.root_name(),
4354                    "guest {} has different root name than the host for worktree {}",
4355                    guest_id,
4356                    id
4357                );
4358                assert_eq!(
4359                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4360                    host_snapshot.entries(false).collect::<Vec<_>>(),
4361                    "guest {} has different snapshot than the host for worktree {}",
4362                    guest_id,
4363                    id
4364                );
4365            }
4366
4367            guest_client
4368                .project
4369                .as_ref()
4370                .unwrap()
4371                .read_with(guest_cx, |project, cx| {
4372                    assert!(
4373                        !project.has_buffered_operations(cx),
4374                        "guest {} has buffered operations ",
4375                        guest_id,
4376                    );
4377                });
4378
4379            for guest_buffer in &guest_client.buffers {
4380                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4381                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4382                    project
4383                        .shared_buffer(guest_client.peer_id, buffer_id)
4384                        .expect(&format!(
4385                            "host does not have buffer for guest:{}, peer:{}, id:{}",
4386                            guest_id, guest_client.peer_id, buffer_id
4387                        ))
4388                });
4389                assert_eq!(
4390                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4391                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4392                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4393                    guest_id,
4394                    buffer_id,
4395                    host_buffer
4396                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4397                );
4398            }
4399        }
4400    }
4401
4402    struct TestServer {
4403        peer: Arc<Peer>,
4404        app_state: Arc<AppState>,
4405        server: Arc<Server>,
4406        foreground: Rc<executor::Foreground>,
4407        notifications: mpsc::UnboundedReceiver<()>,
4408        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4409        forbid_connections: Arc<AtomicBool>,
4410        _test_db: TestDb,
4411    }
4412
4413    impl TestServer {
4414        async fn start(
4415            foreground: Rc<executor::Foreground>,
4416            background: Arc<executor::Background>,
4417        ) -> Self {
4418            let test_db = TestDb::fake(background);
4419            let app_state = Self::build_app_state(&test_db).await;
4420            let peer = Peer::new();
4421            let notifications = mpsc::unbounded();
4422            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4423            Self {
4424                peer,
4425                app_state,
4426                server,
4427                foreground,
4428                notifications: notifications.1,
4429                connection_killers: Default::default(),
4430                forbid_connections: Default::default(),
4431                _test_db: test_db,
4432            }
4433        }
4434
4435        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4436            let http = FakeHttpClient::with_404_response();
4437            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4438            let client_name = name.to_string();
4439            let mut client = Client::new(http.clone());
4440            let server = self.server.clone();
4441            let connection_killers = self.connection_killers.clone();
4442            let forbid_connections = self.forbid_connections.clone();
4443            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4444
4445            Arc::get_mut(&mut client)
4446                .unwrap()
4447                .override_authenticate(move |cx| {
4448                    cx.spawn(|_| async move {
4449                        let access_token = "the-token".to_string();
4450                        Ok(Credentials {
4451                            user_id: user_id.0 as u64,
4452                            access_token,
4453                        })
4454                    })
4455                })
4456                .override_establish_connection(move |credentials, cx| {
4457                    assert_eq!(credentials.user_id, user_id.0 as u64);
4458                    assert_eq!(credentials.access_token, "the-token");
4459
4460                    let server = server.clone();
4461                    let connection_killers = connection_killers.clone();
4462                    let forbid_connections = forbid_connections.clone();
4463                    let client_name = client_name.clone();
4464                    let connection_id_tx = connection_id_tx.clone();
4465                    cx.spawn(move |cx| async move {
4466                        if forbid_connections.load(SeqCst) {
4467                            Err(EstablishConnectionError::other(anyhow!(
4468                                "server is forbidding connections"
4469                            )))
4470                        } else {
4471                            let (client_conn, server_conn, kill_conn) =
4472                                Connection::in_memory(cx.background());
4473                            connection_killers.lock().insert(user_id, kill_conn);
4474                            cx.background()
4475                                .spawn(server.handle_connection(
4476                                    server_conn,
4477                                    client_name,
4478                                    user_id,
4479                                    Some(connection_id_tx),
4480                                    cx.background(),
4481                                ))
4482                                .detach();
4483                            Ok(client_conn)
4484                        }
4485                    })
4486                });
4487
4488            client
4489                .authenticate_and_connect(&cx.to_async())
4490                .await
4491                .unwrap();
4492
4493            Channel::init(&client);
4494            Project::init(&client);
4495
4496            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4497            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4498            let mut authed_user =
4499                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4500            while authed_user.next().await.unwrap().is_none() {}
4501
4502            TestClient {
4503                client,
4504                peer_id,
4505                user_store,
4506                project: Default::default(),
4507                buffers: Default::default(),
4508            }
4509        }
4510
4511        fn disconnect_client(&self, user_id: UserId) {
4512            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4513                let _ = kill_conn.try_send(Some(()));
4514            }
4515        }
4516
4517        fn forbid_connections(&self) {
4518            self.forbid_connections.store(true, SeqCst);
4519        }
4520
4521        fn allow_connections(&self) {
4522            self.forbid_connections.store(false, SeqCst);
4523        }
4524
4525        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4526            let mut config = Config::default();
4527            config.session_secret = "a".repeat(32);
4528            config.database_url = test_db.url.clone();
4529            let github_client = github::AppClient::test();
4530            Arc::new(AppState {
4531                db: test_db.db().clone(),
4532                handlebars: Default::default(),
4533                auth_client: auth::build_client("", ""),
4534                repo_client: github::RepoClient::test(&github_client),
4535                github_client,
4536                config,
4537            })
4538        }
4539
4540        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4541            self.server.store.read()
4542        }
4543
4544        async fn condition<F>(&mut self, mut predicate: F)
4545        where
4546            F: FnMut(&Store) -> bool,
4547        {
4548            async_std::future::timeout(Duration::from_millis(500), async {
4549                while !(predicate)(&*self.server.store.read()) {
4550                    self.foreground.start_waiting();
4551                    self.notifications.next().await;
4552                    self.foreground.finish_waiting();
4553                }
4554            })
4555            .await
4556            .expect("condition timed out");
4557        }
4558    }
4559
4560    impl Drop for TestServer {
4561        fn drop(&mut self) {
4562            self.peer.reset();
4563        }
4564    }
4565
4566    struct TestClient {
4567        client: Arc<Client>,
4568        pub peer_id: PeerId,
4569        pub user_store: ModelHandle<UserStore>,
4570        project: Option<ModelHandle<Project>>,
4571        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4572    }
4573
4574    impl Deref for TestClient {
4575        type Target = Arc<Client>;
4576
4577        fn deref(&self) -> &Self::Target {
4578            &self.client
4579        }
4580    }
4581
4582    impl TestClient {
4583        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4584            UserId::from_proto(
4585                self.user_store
4586                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4587            )
4588        }
4589
4590        fn simulate_host(
4591            mut self,
4592            project: ModelHandle<Project>,
4593            mut language_server_config: LanguageServerConfig,
4594            operations: Rc<Cell<usize>>,
4595            max_operations: usize,
4596            rng: Arc<Mutex<StdRng>>,
4597            mut cx: TestAppContext,
4598        ) -> impl Future<Output = (Self, TestAppContext)> {
4599            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4600
4601            // Set up a fake language server.
4602            language_server_config.set_fake_initializer({
4603                let rng = rng.clone();
4604                let files = files.clone();
4605                let project = project.clone();
4606                move |fake_server| {
4607                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4608                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4609                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4610                                range: lsp::Range::new(
4611                                    lsp::Position::new(0, 0),
4612                                    lsp::Position::new(0, 0),
4613                                ),
4614                                new_text: "the-new-text".to_string(),
4615                            })),
4616                            ..Default::default()
4617                        }]))
4618                    });
4619
4620                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4621                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4622                            lsp::CodeAction {
4623                                title: "the-code-action".to_string(),
4624                                ..Default::default()
4625                            },
4626                        )])
4627                    });
4628
4629                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4630                        |params, _| {
4631                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4632                                params.position,
4633                                params.position,
4634                            )))
4635                        },
4636                    );
4637
4638                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4639                        let files = files.clone();
4640                        let rng = rng.clone();
4641                        move |_, _| {
4642                            let files = files.lock();
4643                            let mut rng = rng.lock();
4644                            let count = rng.gen_range::<usize, _>(1..3);
4645                            let files = (0..count)
4646                                .map(|_| files.choose(&mut *rng).unwrap())
4647                                .collect::<Vec<_>>();
4648                            log::info!("LSP: Returning definitions in files {:?}", &files);
4649                            Some(lsp::GotoDefinitionResponse::Array(
4650                                files
4651                                    .into_iter()
4652                                    .map(|file| lsp::Location {
4653                                        uri: lsp::Url::from_file_path(file).unwrap(),
4654                                        range: Default::default(),
4655                                    })
4656                                    .collect(),
4657                            ))
4658                        }
4659                    });
4660
4661                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4662                        let rng = rng.clone();
4663                        let project = project.clone();
4664                        move |params, mut cx| {
4665                            project.update(&mut cx, |project, cx| {
4666                                let path = params
4667                                    .text_document_position_params
4668                                    .text_document
4669                                    .uri
4670                                    .to_file_path()
4671                                    .unwrap();
4672                                let (worktree, relative_path) =
4673                                    project.find_local_worktree(&path, cx)?;
4674                                let project_path =
4675                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4676                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4677
4678                                let mut highlights = Vec::new();
4679                                let highlight_count = rng.lock().gen_range(1..=5);
4680                                let mut prev_end = 0;
4681                                for _ in 0..highlight_count {
4682                                    let range =
4683                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4684                                    let start =
4685                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4686                                    let end =
4687                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4688                                    highlights.push(lsp::DocumentHighlight {
4689                                        range: lsp::Range::new(start, end),
4690                                        kind: Some(lsp::DocumentHighlightKind::READ),
4691                                    });
4692                                    prev_end = range.end;
4693                                }
4694                                Some(highlights)
4695                            })
4696                        }
4697                    });
4698                }
4699            });
4700
4701            project.update(&mut cx, |project, _| {
4702                project.languages().add(Arc::new(Language::new(
4703                    LanguageConfig {
4704                        name: "Rust".into(),
4705                        path_suffixes: vec!["rs".to_string()],
4706                        language_server: Some(language_server_config),
4707                        ..Default::default()
4708                    },
4709                    None,
4710                )));
4711            });
4712
4713            async move {
4714                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4715                while operations.get() < max_operations {
4716                    operations.set(operations.get() + 1);
4717
4718                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4719                    match distribution {
4720                        0..=20 if !files.lock().is_empty() => {
4721                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4722                            let mut path = path.as_path();
4723                            while let Some(parent_path) = path.parent() {
4724                                path = parent_path;
4725                                if rng.lock().gen() {
4726                                    break;
4727                                }
4728                            }
4729
4730                            log::info!("Host: find/create local worktree {:?}", path);
4731                            project
4732                                .update(&mut cx, |project, cx| {
4733                                    project.find_or_create_local_worktree(path, false, cx)
4734                                })
4735                                .await
4736                                .unwrap();
4737                        }
4738                        10..=80 if !files.lock().is_empty() => {
4739                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4740                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4741                                let (worktree, path) = project
4742                                    .update(&mut cx, |project, cx| {
4743                                        project.find_or_create_local_worktree(
4744                                            file.clone(),
4745                                            false,
4746                                            cx,
4747                                        )
4748                                    })
4749                                    .await
4750                                    .unwrap();
4751                                let project_path =
4752                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4753                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
4754                                let buffer = project
4755                                    .update(&mut cx, |project, cx| {
4756                                        project.open_buffer(project_path, cx)
4757                                    })
4758                                    .await
4759                                    .unwrap();
4760                                self.buffers.insert(buffer.clone());
4761                                buffer
4762                            } else {
4763                                self.buffers
4764                                    .iter()
4765                                    .choose(&mut *rng.lock())
4766                                    .unwrap()
4767                                    .clone()
4768                            };
4769
4770                            if rng.lock().gen_bool(0.1) {
4771                                cx.update(|cx| {
4772                                    log::info!(
4773                                        "Host: dropping buffer {:?}",
4774                                        buffer.read(cx).file().unwrap().full_path(cx)
4775                                    );
4776                                    self.buffers.remove(&buffer);
4777                                    drop(buffer);
4778                                });
4779                            } else {
4780                                buffer.update(&mut cx, |buffer, cx| {
4781                                    log::info!(
4782                                        "Host: updating buffer {:?}",
4783                                        buffer.file().unwrap().full_path(cx)
4784                                    );
4785                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4786                                });
4787                            }
4788                        }
4789                        _ => loop {
4790                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4791                            let mut path = PathBuf::new();
4792                            path.push("/");
4793                            for _ in 0..path_component_count {
4794                                let letter = rng.lock().gen_range(b'a'..=b'z');
4795                                path.push(std::str::from_utf8(&[letter]).unwrap());
4796                            }
4797                            path.set_extension("rs");
4798                            let parent_path = path.parent().unwrap();
4799
4800                            log::info!("Host: creating file {:?}", path,);
4801
4802                            if fs.create_dir(&parent_path).await.is_ok()
4803                                && fs.create_file(&path, Default::default()).await.is_ok()
4804                            {
4805                                files.lock().push(path);
4806                                break;
4807                            } else {
4808                                log::info!("Host: cannot create file");
4809                            }
4810                        },
4811                    }
4812
4813                    cx.background().simulate_random_delay().await;
4814                }
4815
4816                log::info!("Host done");
4817
4818                self.project = Some(project);
4819                (self, cx)
4820            }
4821        }
4822
4823        pub async fn simulate_guest(
4824            mut self,
4825            guest_id: usize,
4826            project: ModelHandle<Project>,
4827            operations: Rc<Cell<usize>>,
4828            max_operations: usize,
4829            rng: Arc<Mutex<StdRng>>,
4830            mut cx: TestAppContext,
4831        ) -> (Self, TestAppContext) {
4832            while operations.get() < max_operations {
4833                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4834                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4835                        project
4836                            .worktrees(&cx)
4837                            .filter(|worktree| {
4838                                let worktree = worktree.read(cx);
4839                                !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
4840                            })
4841                            .choose(&mut *rng.lock())
4842                    }) {
4843                        worktree
4844                    } else {
4845                        cx.background().simulate_random_delay().await;
4846                        continue;
4847                    };
4848
4849                    operations.set(operations.get() + 1);
4850                    let (worktree_root_name, project_path) =
4851                        worktree.read_with(&cx, |worktree, _| {
4852                            let entry = worktree
4853                                .entries(false)
4854                                .filter(|e| e.is_file())
4855                                .choose(&mut *rng.lock())
4856                                .unwrap();
4857                            (
4858                                worktree.root_name().to_string(),
4859                                (worktree.id(), entry.path.clone()),
4860                            )
4861                        });
4862                    log::info!(
4863                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
4864                        guest_id,
4865                        project_path.0,
4866                        worktree_root_name,
4867                        project_path.1
4868                    );
4869                    let buffer = project
4870                        .update(&mut cx, |project, cx| {
4871                            project.open_buffer(project_path.clone(), cx)
4872                        })
4873                        .await
4874                        .unwrap();
4875                    log::info!(
4876                        "Guest {}: path in worktree {:?} {:?} {:?} opened with buffer id {:?}",
4877                        guest_id,
4878                        project_path.0,
4879                        worktree_root_name,
4880                        project_path.1,
4881                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4882                    );
4883                    self.buffers.insert(buffer.clone());
4884                    buffer
4885                } else {
4886                    operations.set(operations.get() + 1);
4887
4888                    self.buffers
4889                        .iter()
4890                        .choose(&mut *rng.lock())
4891                        .unwrap()
4892                        .clone()
4893                };
4894
4895                let choice = rng.lock().gen_range(0..100);
4896                match choice {
4897                    0..=9 => {
4898                        cx.update(|cx| {
4899                            log::info!(
4900                                "Guest {}: dropping buffer {:?}",
4901                                guest_id,
4902                                buffer.read(cx).file().unwrap().full_path(cx)
4903                            );
4904                            self.buffers.remove(&buffer);
4905                            drop(buffer);
4906                        });
4907                    }
4908                    10..=19 => {
4909                        let completions = project.update(&mut cx, |project, cx| {
4910                            log::info!(
4911                                "Guest {}: requesting completions for buffer {:?}",
4912                                guest_id,
4913                                buffer.read(cx).file().unwrap().full_path(cx)
4914                            );
4915                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4916                            project.completions(&buffer, offset, cx)
4917                        });
4918                        let completions = cx.background().spawn(async move {
4919                            completions.await.expect("completions request failed");
4920                        });
4921                        if rng.lock().gen_bool(0.3) {
4922                            log::info!("Guest {}: detaching completions request", guest_id);
4923                            completions.detach();
4924                        } else {
4925                            completions.await;
4926                        }
4927                    }
4928                    20..=29 => {
4929                        let code_actions = project.update(&mut cx, |project, cx| {
4930                            log::info!(
4931                                "Guest {}: requesting code actions for buffer {:?}",
4932                                guest_id,
4933                                buffer.read(cx).file().unwrap().full_path(cx)
4934                            );
4935                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4936                            project.code_actions(&buffer, range, cx)
4937                        });
4938                        let code_actions = cx.background().spawn(async move {
4939                            code_actions.await.expect("code actions request failed");
4940                        });
4941                        if rng.lock().gen_bool(0.3) {
4942                            log::info!("Guest {}: detaching code actions request", guest_id);
4943                            code_actions.detach();
4944                        } else {
4945                            code_actions.await;
4946                        }
4947                    }
4948                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4949                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4950                            log::info!(
4951                                "Guest {}: saving buffer {:?}",
4952                                guest_id,
4953                                buffer.file().unwrap().full_path(cx)
4954                            );
4955                            (buffer.version(), buffer.save(cx))
4956                        });
4957                        let save = cx.spawn(|cx| async move {
4958                            let (saved_version, _) = save.await.expect("save request failed");
4959                            buffer.read_with(&cx, |buffer, _| {
4960                                assert!(buffer.version().observed_all(&saved_version));
4961                                assert!(saved_version.observed_all(&requested_version));
4962                            });
4963                        });
4964                        if rng.lock().gen_bool(0.3) {
4965                            log::info!("Guest {}: detaching save request", guest_id);
4966                            save.detach();
4967                        } else {
4968                            save.await;
4969                        }
4970                    }
4971                    40..=44 => {
4972                        let prepare_rename = project.update(&mut cx, |project, cx| {
4973                            log::info!(
4974                                "Guest {}: preparing rename for buffer {:?}",
4975                                guest_id,
4976                                buffer.read(cx).file().unwrap().full_path(cx)
4977                            );
4978                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4979                            project.prepare_rename(buffer, offset, cx)
4980                        });
4981                        let prepare_rename = cx.background().spawn(async move {
4982                            prepare_rename.await.expect("prepare rename request failed");
4983                        });
4984                        if rng.lock().gen_bool(0.3) {
4985                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4986                            prepare_rename.detach();
4987                        } else {
4988                            prepare_rename.await;
4989                        }
4990                    }
4991                    45..=49 => {
4992                        let definitions = project.update(&mut cx, |project, cx| {
4993                            log::info!(
4994                                "Guest {}: requesting definitions for buffer {:?}",
4995                                guest_id,
4996                                buffer.read(cx).file().unwrap().full_path(cx)
4997                            );
4998                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4999                            project.definition(&buffer, offset, cx)
5000                        });
5001                        let definitions = cx.background().spawn(async move {
5002                            definitions.await.expect("definitions request failed")
5003                        });
5004                        if rng.lock().gen_bool(0.3) {
5005                            log::info!("Guest {}: detaching definitions request", guest_id);
5006                            definitions.detach();
5007                        } else {
5008                            self.buffers
5009                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5010                        }
5011                    }
5012                    50..=54 => {
5013                        let highlights = project.update(&mut cx, |project, cx| {
5014                            log::info!(
5015                                "Guest {}: requesting highlights for buffer {:?}",
5016                                guest_id,
5017                                buffer.read(cx).file().unwrap().full_path(cx)
5018                            );
5019                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5020                            project.document_highlights(&buffer, offset, cx)
5021                        });
5022                        let highlights = cx.background().spawn(async move {
5023                            highlights.await.expect("highlights request failed");
5024                        });
5025                        if rng.lock().gen_bool(0.3) {
5026                            log::info!("Guest {}: detaching highlights request", guest_id);
5027                            highlights.detach();
5028                        } else {
5029                            highlights.await;
5030                        }
5031                    }
5032                    _ => {
5033                        buffer.update(&mut cx, |buffer, cx| {
5034                            log::info!(
5035                                "Guest {}: updating buffer {:?}",
5036                                guest_id,
5037                                buffer.file().unwrap().full_path(cx)
5038                            );
5039                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5040                        });
5041                    }
5042                }
5043                cx.background().simulate_random_delay().await;
5044            }
5045
5046            log::info!("Guest {} done", guest_id);
5047
5048            self.project = Some(project);
5049            (self, cx)
5050        }
5051    }
5052
5053    impl Executor for Arc<gpui::executor::Background> {
5054        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5055            self.spawn(future).detach();
5056        }
5057    }
5058
5059    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5060        channel
5061            .messages()
5062            .cursor::<()>()
5063            .map(|m| {
5064                (
5065                    m.sender.github_login.clone(),
5066                    m.body.clone(),
5067                    m.is_pending(),
5068                )
5069            })
5070            .collect()
5071    }
5072
5073    struct EmptyView;
5074
5075    impl gpui::Entity for EmptyView {
5076        type Event = ();
5077    }
5078
5079    impl gpui::View for EmptyView {
5080        fn ui_name() -> &'static str {
5081            "empty view"
5082        }
5083
5084        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5085            gpui::Element::boxed(gpui::elements::Empty)
5086        }
5087    }
5088}