rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  81            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  82            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  83            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  84            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  85            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  86            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  87            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  88            .add_request_handler(
  89                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  90            )
  91            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
  92            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
  93            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
  94            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
  95            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
  96            .add_message_handler(Server::close_buffer)
  97            .add_request_handler(Server::update_buffer)
  98            .add_message_handler(Server::update_buffer_file)
  99            .add_message_handler(Server::buffer_reloaded)
 100            .add_message_handler(Server::buffer_saved)
 101            .add_request_handler(Server::save_buffer)
 102            .add_request_handler(Server::get_channels)
 103            .add_request_handler(Server::get_users)
 104            .add_request_handler(Server::join_channel)
 105            .add_message_handler(Server::leave_channel)
 106            .add_request_handler(Server::send_channel_message)
 107            .add_request_handler(Server::get_channel_messages);
 108
 109        Arc::new(server)
 110    }
 111
 112    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 113    where
 114        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 115        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 116        M: EnvelopedMessage,
 117    {
 118        let prev_handler = self.handlers.insert(
 119            TypeId::of::<M>(),
 120            Box::new(move |server, envelope| {
 121                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 122                (handler)(server, *envelope).boxed()
 123            }),
 124        );
 125        if prev_handler.is_some() {
 126            panic!("registered a handler for the same message twice");
 127        }
 128        self
 129    }
 130
 131    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 132    where
 133        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 134        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 135        M: RequestMessage,
 136    {
 137        self.add_message_handler(move |server, envelope| {
 138            let receipt = envelope.receipt();
 139            let response = (handler)(server.clone(), envelope);
 140            async move {
 141                match response.await {
 142                    Ok(response) => {
 143                        server.peer.respond(receipt, response)?;
 144                        Ok(())
 145                    }
 146                    Err(error) => {
 147                        server.peer.respond_with_error(
 148                            receipt,
 149                            proto::Error {
 150                                message: error.to_string(),
 151                            },
 152                        )?;
 153                        Err(error)
 154                    }
 155                }
 156            }
 157        })
 158    }
 159
 160    pub fn handle_connection<E: Executor>(
 161        self: &Arc<Self>,
 162        connection: Connection,
 163        addr: String,
 164        user_id: UserId,
 165        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 166        executor: E,
 167    ) -> impl Future<Output = ()> {
 168        let mut this = self.clone();
 169        async move {
 170            let (connection_id, handle_io, mut incoming_rx) =
 171                this.peer.add_connection(connection).await;
 172
 173            if let Some(send_connection_id) = send_connection_id.as_mut() {
 174                let _ = send_connection_id.send(connection_id).await;
 175            }
 176
 177            this.state_mut().add_connection(connection_id, user_id);
 178            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 179                log::error!("error updating contacts for {:?}: {}", user_id, err);
 180            }
 181
 182            let handle_io = handle_io.fuse();
 183            futures::pin_mut!(handle_io);
 184            loop {
 185                let next_message = incoming_rx.next().fuse();
 186                futures::pin_mut!(next_message);
 187                futures::select_biased! {
 188                    result = handle_io => {
 189                        if let Err(err) = result {
 190                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 191                        }
 192                        break;
 193                    }
 194                    message = next_message => {
 195                        if let Some(message) = message {
 196                            let start_time = Instant::now();
 197                            let type_name = message.payload_type_name();
 198                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 199                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 200                                let notifications = this.notifications.clone();
 201                                let is_background = message.is_background();
 202                                let handle_message = (handler)(this.clone(), message);
 203                                let handle_message = async move {
 204                                    if let Err(err) = handle_message.await {
 205                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 206                                    } else {
 207                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 208                                    }
 209                                    if let Some(mut notifications) = notifications {
 210                                        let _ = notifications.send(()).await;
 211                                    }
 212                                };
 213                                if is_background {
 214                                    executor.spawn_detached(handle_message);
 215                                } else {
 216                                    handle_message.await;
 217                                }
 218                            } else {
 219                                log::warn!("unhandled message: {}", type_name);
 220                            }
 221                        } else {
 222                            log::info!("rpc connection closed {:?}", addr);
 223                            break;
 224                        }
 225                    }
 226                }
 227            }
 228
 229            if let Err(err) = this.sign_out(connection_id).await {
 230                log::error!("error signing out connection {:?} - {:?}", addr, err);
 231            }
 232        }
 233    }
 234
 235    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 236        self.peer.disconnect(connection_id);
 237        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 238
 239        for (project_id, project) in removed_connection.hosted_projects {
 240            if let Some(share) = project.share {
 241                broadcast(
 242                    connection_id,
 243                    share.guests.keys().copied().collect(),
 244                    |conn_id| {
 245                        self.peer
 246                            .send(conn_id, proto::UnshareProject { project_id })
 247                    },
 248                )?;
 249            }
 250        }
 251
 252        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 253            broadcast(connection_id, peer_ids, |conn_id| {
 254                self.peer.send(
 255                    conn_id,
 256                    proto::RemoveProjectCollaborator {
 257                        project_id,
 258                        peer_id: connection_id.0,
 259                    },
 260                )
 261            })?;
 262        }
 263
 264        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 265        Ok(())
 266    }
 267
 268    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 269        Ok(proto::Ack {})
 270    }
 271
 272    async fn register_project(
 273        mut self: Arc<Server>,
 274        request: TypedEnvelope<proto::RegisterProject>,
 275    ) -> tide::Result<proto::RegisterProjectResponse> {
 276        let project_id = {
 277            let mut state = self.state_mut();
 278            let user_id = state.user_id_for_connection(request.sender_id)?;
 279            state.register_project(request.sender_id, user_id)
 280        };
 281        Ok(proto::RegisterProjectResponse { project_id })
 282    }
 283
 284    async fn unregister_project(
 285        mut self: Arc<Server>,
 286        request: TypedEnvelope<proto::UnregisterProject>,
 287    ) -> tide::Result<()> {
 288        let project = self
 289            .state_mut()
 290            .unregister_project(request.payload.project_id, request.sender_id)?;
 291        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 292        Ok(())
 293    }
 294
 295    async fn share_project(
 296        mut self: Arc<Server>,
 297        request: TypedEnvelope<proto::ShareProject>,
 298    ) -> tide::Result<proto::Ack> {
 299        self.state_mut()
 300            .share_project(request.payload.project_id, request.sender_id);
 301        Ok(proto::Ack {})
 302    }
 303
 304    async fn unshare_project(
 305        mut self: Arc<Server>,
 306        request: TypedEnvelope<proto::UnshareProject>,
 307    ) -> tide::Result<()> {
 308        let project_id = request.payload.project_id;
 309        let project = self
 310            .state_mut()
 311            .unshare_project(project_id, request.sender_id)?;
 312
 313        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 314            self.peer
 315                .send(conn_id, proto::UnshareProject { project_id })
 316        })?;
 317        self.update_contacts_for_users(&project.authorized_user_ids)?;
 318        Ok(())
 319    }
 320
 321    async fn join_project(
 322        mut self: Arc<Server>,
 323        request: TypedEnvelope<proto::JoinProject>,
 324    ) -> tide::Result<proto::JoinProjectResponse> {
 325        let project_id = request.payload.project_id;
 326
 327        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 328        let (response, connection_ids, contact_user_ids) = self
 329            .state_mut()
 330            .join_project(request.sender_id, user_id, project_id)
 331            .and_then(|joined| {
 332                let share = joined.project.share()?;
 333                let peer_count = share.guests.len();
 334                let mut collaborators = Vec::with_capacity(peer_count);
 335                collaborators.push(proto::Collaborator {
 336                    peer_id: joined.project.host_connection_id.0,
 337                    replica_id: 0,
 338                    user_id: joined.project.host_user_id.to_proto(),
 339                });
 340                let worktrees = share
 341                    .worktrees
 342                    .iter()
 343                    .filter_map(|(id, shared_worktree)| {
 344                        let worktree = joined.project.worktrees.get(&id)?;
 345                        Some(proto::Worktree {
 346                            id: *id,
 347                            root_name: worktree.root_name.clone(),
 348                            entries: shared_worktree.entries.values().cloned().collect(),
 349                            diagnostic_summaries: shared_worktree
 350                                .diagnostic_summaries
 351                                .values()
 352                                .cloned()
 353                                .collect(),
 354                            weak: worktree.weak,
 355                        })
 356                    })
 357                    .collect();
 358                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 359                    if *peer_conn_id != request.sender_id {
 360                        collaborators.push(proto::Collaborator {
 361                            peer_id: peer_conn_id.0,
 362                            replica_id: *peer_replica_id as u32,
 363                            user_id: peer_user_id.to_proto(),
 364                        });
 365                    }
 366                }
 367                let response = proto::JoinProjectResponse {
 368                    worktrees,
 369                    replica_id: joined.replica_id as u32,
 370                    collaborators,
 371                };
 372                let connection_ids = joined.project.connection_ids();
 373                let contact_user_ids = joined.project.authorized_user_ids();
 374                Ok((response, connection_ids, contact_user_ids))
 375            })?;
 376
 377        broadcast(request.sender_id, connection_ids, |conn_id| {
 378            self.peer.send(
 379                conn_id,
 380                proto::AddProjectCollaborator {
 381                    project_id,
 382                    collaborator: Some(proto::Collaborator {
 383                        peer_id: request.sender_id.0,
 384                        replica_id: response.replica_id,
 385                        user_id: user_id.to_proto(),
 386                    }),
 387                },
 388            )
 389        })?;
 390        self.update_contacts_for_users(&contact_user_ids)?;
 391        Ok(response)
 392    }
 393
 394    async fn leave_project(
 395        mut self: Arc<Server>,
 396        request: TypedEnvelope<proto::LeaveProject>,
 397    ) -> tide::Result<()> {
 398        let sender_id = request.sender_id;
 399        let project_id = request.payload.project_id;
 400        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 401
 402        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 403            self.peer.send(
 404                conn_id,
 405                proto::RemoveProjectCollaborator {
 406                    project_id,
 407                    peer_id: sender_id.0,
 408                },
 409            )
 410        })?;
 411        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 412
 413        Ok(())
 414    }
 415
 416    async fn register_worktree(
 417        mut self: Arc<Server>,
 418        request: TypedEnvelope<proto::RegisterWorktree>,
 419    ) -> tide::Result<proto::Ack> {
 420        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 421
 422        let mut contact_user_ids = HashSet::default();
 423        contact_user_ids.insert(host_user_id);
 424        for github_login in &request.payload.authorized_logins {
 425            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 426            contact_user_ids.insert(contact_user_id);
 427        }
 428
 429        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 430        let guest_connection_ids;
 431        {
 432            let mut state = self.state_mut();
 433            guest_connection_ids = state
 434                .read_project(request.payload.project_id, request.sender_id)?
 435                .guest_connection_ids();
 436            state.register_worktree(
 437                request.payload.project_id,
 438                request.payload.worktree_id,
 439                request.sender_id,
 440                Worktree {
 441                    authorized_user_ids: contact_user_ids.clone(),
 442                    root_name: request.payload.root_name.clone(),
 443                    weak: request.payload.weak,
 444                },
 445            )?;
 446        }
 447        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 448            self.peer
 449                .forward_send(request.sender_id, connection_id, request.payload.clone())
 450        })?;
 451        self.update_contacts_for_users(&contact_user_ids)?;
 452        Ok(proto::Ack {})
 453    }
 454
 455    async fn unregister_worktree(
 456        mut self: Arc<Server>,
 457        request: TypedEnvelope<proto::UnregisterWorktree>,
 458    ) -> tide::Result<()> {
 459        let project_id = request.payload.project_id;
 460        let worktree_id = request.payload.worktree_id;
 461        let (worktree, guest_connection_ids) =
 462            self.state_mut()
 463                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 464        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 465            self.peer.send(
 466                conn_id,
 467                proto::UnregisterWorktree {
 468                    project_id,
 469                    worktree_id,
 470                },
 471            )
 472        })?;
 473        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 474        Ok(())
 475    }
 476
 477    async fn update_worktree(
 478        mut self: Arc<Server>,
 479        request: TypedEnvelope<proto::UpdateWorktree>,
 480    ) -> tide::Result<proto::Ack> {
 481        let connection_ids = self.state_mut().update_worktree(
 482            request.sender_id,
 483            request.payload.project_id,
 484            request.payload.worktree_id,
 485            &request.payload.removed_entries,
 486            &request.payload.updated_entries,
 487        )?;
 488
 489        broadcast(request.sender_id, connection_ids, |connection_id| {
 490            self.peer
 491                .forward_send(request.sender_id, connection_id, request.payload.clone())
 492        })?;
 493
 494        Ok(proto::Ack {})
 495    }
 496
 497    async fn update_diagnostic_summary(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 500    ) -> tide::Result<()> {
 501        let summary = request
 502            .payload
 503            .summary
 504            .clone()
 505            .ok_or_else(|| anyhow!("invalid summary"))?;
 506        let receiver_ids = self.state_mut().update_diagnostic_summary(
 507            request.payload.project_id,
 508            request.payload.worktree_id,
 509            request.sender_id,
 510            summary,
 511        )?;
 512
 513        broadcast(request.sender_id, receiver_ids, |connection_id| {
 514            self.peer
 515                .forward_send(request.sender_id, connection_id, request.payload.clone())
 516        })?;
 517        Ok(())
 518    }
 519
 520    async fn disk_based_diagnostics_updating(
 521        self: Arc<Server>,
 522        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 523    ) -> tide::Result<()> {
 524        let receiver_ids = self
 525            .state()
 526            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 527        broadcast(request.sender_id, receiver_ids, |connection_id| {
 528            self.peer
 529                .forward_send(request.sender_id, connection_id, request.payload.clone())
 530        })?;
 531        Ok(())
 532    }
 533
 534    async fn disk_based_diagnostics_updated(
 535        self: Arc<Server>,
 536        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 537    ) -> tide::Result<()> {
 538        let receiver_ids = self
 539            .state()
 540            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 541        broadcast(request.sender_id, receiver_ids, |connection_id| {
 542            self.peer
 543                .forward_send(request.sender_id, connection_id, request.payload.clone())
 544        })?;
 545        Ok(())
 546    }
 547
 548    async fn forward_project_request<T>(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<T>,
 551    ) -> tide::Result<T::Response>
 552    where
 553        T: EntityMessage + RequestMessage,
 554    {
 555        let host_connection_id = self
 556            .state()
 557            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 558            .host_connection_id;
 559        Ok(self
 560            .peer
 561            .forward_request(request.sender_id, host_connection_id, request.payload)
 562            .await?)
 563    }
 564
 565    async fn close_buffer(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::CloseBuffer>,
 568    ) -> tide::Result<()> {
 569        let host_connection_id = self
 570            .state()
 571            .read_project(request.payload.project_id, request.sender_id)?
 572            .host_connection_id;
 573        self.peer
 574            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 575        Ok(())
 576    }
 577
 578    async fn save_buffer(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::SaveBuffer>,
 581    ) -> tide::Result<proto::BufferSaved> {
 582        let host;
 583        let mut guests;
 584        {
 585            let state = self.state();
 586            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 587            host = project.host_connection_id;
 588            guests = project.guest_connection_ids()
 589        }
 590
 591        let response = self
 592            .peer
 593            .forward_request(request.sender_id, host, request.payload.clone())
 594            .await?;
 595
 596        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 597        broadcast(host, guests, |conn_id| {
 598            self.peer.forward_send(host, conn_id, response.clone())
 599        })?;
 600
 601        Ok(response)
 602    }
 603
 604    async fn update_buffer(
 605        self: Arc<Server>,
 606        request: TypedEnvelope<proto::UpdateBuffer>,
 607    ) -> tide::Result<proto::Ack> {
 608        let receiver_ids = self
 609            .state()
 610            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 611        broadcast(request.sender_id, receiver_ids, |connection_id| {
 612            self.peer
 613                .forward_send(request.sender_id, connection_id, request.payload.clone())
 614        })?;
 615        Ok(proto::Ack {})
 616    }
 617
 618    async fn update_buffer_file(
 619        self: Arc<Server>,
 620        request: TypedEnvelope<proto::UpdateBufferFile>,
 621    ) -> tide::Result<()> {
 622        let receiver_ids = self
 623            .state()
 624            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 625        broadcast(request.sender_id, receiver_ids, |connection_id| {
 626            self.peer
 627                .forward_send(request.sender_id, connection_id, request.payload.clone())
 628        })?;
 629        Ok(())
 630    }
 631
 632    async fn buffer_reloaded(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::BufferReloaded>,
 635    ) -> tide::Result<()> {
 636        let receiver_ids = self
 637            .state()
 638            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 639        broadcast(request.sender_id, receiver_ids, |connection_id| {
 640            self.peer
 641                .forward_send(request.sender_id, connection_id, request.payload.clone())
 642        })?;
 643        Ok(())
 644    }
 645
 646    async fn buffer_saved(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::BufferSaved>,
 649    ) -> tide::Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 653        broadcast(request.sender_id, receiver_ids, |connection_id| {
 654            self.peer
 655                .forward_send(request.sender_id, connection_id, request.payload.clone())
 656        })?;
 657        Ok(())
 658    }
 659
 660    async fn get_channels(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::GetChannels>,
 663    ) -> tide::Result<proto::GetChannelsResponse> {
 664        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 665        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 666        Ok(proto::GetChannelsResponse {
 667            channels: channels
 668                .into_iter()
 669                .map(|chan| proto::Channel {
 670                    id: chan.id.to_proto(),
 671                    name: chan.name,
 672                })
 673                .collect(),
 674        })
 675    }
 676
 677    async fn get_users(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::GetUsers>,
 680    ) -> tide::Result<proto::GetUsersResponse> {
 681        let user_ids = request
 682            .payload
 683            .user_ids
 684            .into_iter()
 685            .map(UserId::from_proto)
 686            .collect();
 687        let users = self
 688            .app_state
 689            .db
 690            .get_users_by_ids(user_ids)
 691            .await?
 692            .into_iter()
 693            .map(|user| proto::User {
 694                id: user.id.to_proto(),
 695                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 696                github_login: user.github_login,
 697            })
 698            .collect();
 699        Ok(proto::GetUsersResponse { users })
 700    }
 701
 702    fn update_contacts_for_users<'a>(
 703        self: &Arc<Server>,
 704        user_ids: impl IntoIterator<Item = &'a UserId>,
 705    ) -> anyhow::Result<()> {
 706        let mut result = Ok(());
 707        let state = self.state();
 708        for user_id in user_ids {
 709            let contacts = state.contacts_for_user(*user_id);
 710            for connection_id in state.connection_ids_for_user(*user_id) {
 711                if let Err(error) = self.peer.send(
 712                    connection_id,
 713                    proto::UpdateContacts {
 714                        contacts: contacts.clone(),
 715                    },
 716                ) {
 717                    result = Err(error);
 718                }
 719            }
 720        }
 721        result
 722    }
 723
 724    async fn join_channel(
 725        mut self: Arc<Self>,
 726        request: TypedEnvelope<proto::JoinChannel>,
 727    ) -> tide::Result<proto::JoinChannelResponse> {
 728        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 729        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 730        if !self
 731            .app_state
 732            .db
 733            .can_user_access_channel(user_id, channel_id)
 734            .await?
 735        {
 736            Err(anyhow!("access denied"))?;
 737        }
 738
 739        self.state_mut().join_channel(request.sender_id, channel_id);
 740        let messages = self
 741            .app_state
 742            .db
 743            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 744            .await?
 745            .into_iter()
 746            .map(|msg| proto::ChannelMessage {
 747                id: msg.id.to_proto(),
 748                body: msg.body,
 749                timestamp: msg.sent_at.unix_timestamp() as u64,
 750                sender_id: msg.sender_id.to_proto(),
 751                nonce: Some(msg.nonce.as_u128().into()),
 752            })
 753            .collect::<Vec<_>>();
 754        Ok(proto::JoinChannelResponse {
 755            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 756            messages,
 757        })
 758    }
 759
 760    async fn leave_channel(
 761        mut self: Arc<Self>,
 762        request: TypedEnvelope<proto::LeaveChannel>,
 763    ) -> tide::Result<()> {
 764        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 765        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 766        if !self
 767            .app_state
 768            .db
 769            .can_user_access_channel(user_id, channel_id)
 770            .await?
 771        {
 772            Err(anyhow!("access denied"))?;
 773        }
 774
 775        self.state_mut()
 776            .leave_channel(request.sender_id, channel_id);
 777
 778        Ok(())
 779    }
 780
 781    async fn send_channel_message(
 782        self: Arc<Self>,
 783        request: TypedEnvelope<proto::SendChannelMessage>,
 784    ) -> tide::Result<proto::SendChannelMessageResponse> {
 785        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 786        let user_id;
 787        let connection_ids;
 788        {
 789            let state = self.state();
 790            user_id = state.user_id_for_connection(request.sender_id)?;
 791            connection_ids = state.channel_connection_ids(channel_id)?;
 792        }
 793
 794        // Validate the message body.
 795        let body = request.payload.body.trim().to_string();
 796        if body.len() > MAX_MESSAGE_LEN {
 797            return Err(anyhow!("message is too long"))?;
 798        }
 799        if body.is_empty() {
 800            return Err(anyhow!("message can't be blank"))?;
 801        }
 802
 803        let timestamp = OffsetDateTime::now_utc();
 804        let nonce = request
 805            .payload
 806            .nonce
 807            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 808
 809        let message_id = self
 810            .app_state
 811            .db
 812            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 813            .await?
 814            .to_proto();
 815        let message = proto::ChannelMessage {
 816            sender_id: user_id.to_proto(),
 817            id: message_id,
 818            body,
 819            timestamp: timestamp.unix_timestamp() as u64,
 820            nonce: Some(nonce),
 821        };
 822        broadcast(request.sender_id, connection_ids, |conn_id| {
 823            self.peer.send(
 824                conn_id,
 825                proto::ChannelMessageSent {
 826                    channel_id: channel_id.to_proto(),
 827                    message: Some(message.clone()),
 828                },
 829            )
 830        })?;
 831        Ok(proto::SendChannelMessageResponse {
 832            message: Some(message),
 833        })
 834    }
 835
 836    async fn get_channel_messages(
 837        self: Arc<Self>,
 838        request: TypedEnvelope<proto::GetChannelMessages>,
 839    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 840        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 841        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 842        if !self
 843            .app_state
 844            .db
 845            .can_user_access_channel(user_id, channel_id)
 846            .await?
 847        {
 848            Err(anyhow!("access denied"))?;
 849        }
 850
 851        let messages = self
 852            .app_state
 853            .db
 854            .get_channel_messages(
 855                channel_id,
 856                MESSAGE_COUNT_PER_PAGE,
 857                Some(MessageId::from_proto(request.payload.before_message_id)),
 858            )
 859            .await?
 860            .into_iter()
 861            .map(|msg| proto::ChannelMessage {
 862                id: msg.id.to_proto(),
 863                body: msg.body,
 864                timestamp: msg.sent_at.unix_timestamp() as u64,
 865                sender_id: msg.sender_id.to_proto(),
 866                nonce: Some(msg.nonce.as_u128().into()),
 867            })
 868            .collect::<Vec<_>>();
 869
 870        Ok(proto::GetChannelMessagesResponse {
 871            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 872            messages,
 873        })
 874    }
 875
 876    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 877        self.store.read()
 878    }
 879
 880    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 881        self.store.write()
 882    }
 883}
 884
 885impl Executor for RealExecutor {
 886    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 887        task::spawn(future);
 888    }
 889}
 890
 891fn broadcast<F>(
 892    sender_id: ConnectionId,
 893    receiver_ids: Vec<ConnectionId>,
 894    mut f: F,
 895) -> anyhow::Result<()>
 896where
 897    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 898{
 899    let mut result = Ok(());
 900    for receiver_id in receiver_ids {
 901        if receiver_id != sender_id {
 902            if let Err(error) = f(receiver_id) {
 903                if result.is_ok() {
 904                    result = Err(error);
 905                }
 906            }
 907        }
 908    }
 909    result
 910}
 911
 912pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 913    let server = Server::new(app.state().clone(), rpc.clone(), None);
 914    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 915        let server = server.clone();
 916        async move {
 917            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 918
 919            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 920            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 921            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 922            let client_protocol_version: Option<u32> = request
 923                .header("X-Zed-Protocol-Version")
 924                .and_then(|v| v.as_str().parse().ok());
 925
 926            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 927                return Ok(Response::new(StatusCode::UpgradeRequired));
 928            }
 929
 930            let header = match request.header("Sec-Websocket-Key") {
 931                Some(h) => h.as_str(),
 932                None => return Err(anyhow!("expected sec-websocket-key"))?,
 933            };
 934
 935            let user_id = process_auth_header(&request).await?;
 936
 937            let mut response = Response::new(StatusCode::SwitchingProtocols);
 938            response.insert_header(UPGRADE, "websocket");
 939            response.insert_header(CONNECTION, "Upgrade");
 940            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 941            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 942            response.insert_header("Sec-Websocket-Version", "13");
 943
 944            let http_res: &mut tide::http::Response = response.as_mut();
 945            let upgrade_receiver = http_res.recv_upgrade().await;
 946            let addr = request.remote().unwrap_or("unknown").to_string();
 947            task::spawn(async move {
 948                if let Some(stream) = upgrade_receiver.await {
 949                    server
 950                        .handle_connection(
 951                            Connection::new(
 952                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 953                            ),
 954                            addr,
 955                            user_id,
 956                            None,
 957                            RealExecutor,
 958                        )
 959                        .await;
 960                }
 961            });
 962
 963            Ok(response)
 964        }
 965    });
 966}
 967
 968fn header_contains_ignore_case<T>(
 969    request: &tide::Request<T>,
 970    header_name: HeaderName,
 971    value: &str,
 972) -> bool {
 973    request
 974        .header(header_name)
 975        .map(|h| {
 976            h.as_str()
 977                .split(',')
 978                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 979        })
 980        .unwrap_or(false)
 981}
 982
 983#[cfg(test)]
 984mod tests {
 985    use super::*;
 986    use crate::{
 987        auth,
 988        db::{tests::TestDb, UserId},
 989        github, AppState, Config,
 990    };
 991    use ::rpc::Peer;
 992    use collections::BTreeMap;
 993    use gpui::{executor, ModelHandle, TestAppContext};
 994    use parking_lot::Mutex;
 995    use postage::{sink::Sink, watch};
 996    use rand::prelude::*;
 997    use rpc::PeerId;
 998    use serde_json::json;
 999    use sqlx::types::time::OffsetDateTime;
1000    use std::{
1001        cell::Cell,
1002        env,
1003        ops::Deref,
1004        path::{Path, PathBuf},
1005        rc::Rc,
1006        sync::{
1007            atomic::{AtomicBool, Ordering::SeqCst},
1008            Arc,
1009        },
1010        time::Duration,
1011    };
1012    use zed::{
1013        client::{
1014            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1015            EstablishConnectionError, UserStore,
1016        },
1017        editor::{
1018            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1019            Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1020        },
1021        fs::{FakeFs, Fs as _},
1022        language::{
1023            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1024            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1025        },
1026        lsp,
1027        project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath},
1028        workspace::{Settings, Workspace, WorkspaceParams},
1029    };
1030
1031    #[cfg(test)]
1032    #[ctor::ctor]
1033    fn init_logger() {
1034        if std::env::var("RUST_LOG").is_ok() {
1035            env_logger::init();
1036        }
1037    }
1038
1039    #[gpui::test(iterations = 10)]
1040    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1041        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1042        let lang_registry = Arc::new(LanguageRegistry::new());
1043        let fs = FakeFs::new(cx_a.background());
1044        cx_a.foreground().forbid_parking();
1045
1046        // Connect to a server as 2 clients.
1047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1048        let client_a = server.create_client(cx_a, "user_a").await;
1049        let client_b = server.create_client(cx_b, "user_b").await;
1050
1051        // Share a project as client A
1052        fs.insert_tree(
1053            "/a",
1054            json!({
1055                ".zed.toml": r#"collaborators = ["user_b"]"#,
1056                "a.txt": "a-contents",
1057                "b.txt": "b-contents",
1058            }),
1059        )
1060        .await;
1061        let project_a = cx_a.update(|cx| {
1062            Project::local(
1063                client_a.clone(),
1064                client_a.user_store.clone(),
1065                lang_registry.clone(),
1066                fs.clone(),
1067                cx,
1068            )
1069        });
1070        let (worktree_a, _) = project_a
1071            .update(cx_a, |p, cx| {
1072                p.find_or_create_local_worktree("/a", false, cx)
1073            })
1074            .await
1075            .unwrap();
1076        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1077        worktree_a
1078            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1079            .await;
1080        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1081        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1082
1083        // Join that project as client B
1084        let project_b = Project::remote(
1085            project_id,
1086            client_b.clone(),
1087            client_b.user_store.clone(),
1088            lang_registry.clone(),
1089            fs.clone(),
1090            &mut cx_b.to_async(),
1091        )
1092        .await
1093        .unwrap();
1094
1095        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1096            assert_eq!(
1097                project
1098                    .collaborators()
1099                    .get(&client_a.peer_id)
1100                    .unwrap()
1101                    .user
1102                    .github_login,
1103                "user_a"
1104            );
1105            project.replica_id()
1106        });
1107        project_a
1108            .condition(&cx_a, |tree, _| {
1109                tree.collaborators()
1110                    .get(&client_b.peer_id)
1111                    .map_or(false, |collaborator| {
1112                        collaborator.replica_id == replica_id_b
1113                            && collaborator.user.github_login == "user_b"
1114                    })
1115            })
1116            .await;
1117
1118        // Open the same file as client B and client A.
1119        let buffer_b = project_b
1120            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1121            .await
1122            .unwrap();
1123        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1124        buffer_b.read_with(cx_b, |buf, cx| {
1125            assert_eq!(buf.read(cx).text(), "b-contents")
1126        });
1127        project_a.read_with(cx_a, |project, cx| {
1128            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1129        });
1130        let buffer_a = project_a
1131            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1132            .await
1133            .unwrap();
1134
1135        let editor_b = cx_b.add_view(window_b, |cx| {
1136            Editor::for_buffer(
1137                buffer_b,
1138                None,
1139                watch::channel_with(Settings::test(cx)).1,
1140                cx,
1141            )
1142        });
1143
1144        // TODO
1145        // // Create a selection set as client B and see that selection set as client A.
1146        // buffer_a
1147        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1148        //     .await;
1149
1150        // Edit the buffer as client B and see that edit as client A.
1151        editor_b.update(cx_b, |editor, cx| {
1152            editor.handle_input(&Input("ok, ".into()), cx)
1153        });
1154        buffer_a
1155            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1156            .await;
1157
1158        // TODO
1159        // // Remove the selection set as client B, see those selections disappear as client A.
1160        cx_b.update(move |_| drop(editor_b));
1161        // buffer_a
1162        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1163        //     .await;
1164
1165        // Dropping the client B's project removes client B from client A's collaborators.
1166        cx_b.update(move |_| drop(project_b));
1167        project_a
1168            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1169            .await;
1170    }
1171
1172    #[gpui::test(iterations = 10)]
1173    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1174        let lang_registry = Arc::new(LanguageRegistry::new());
1175        let fs = FakeFs::new(cx_a.background());
1176        cx_a.foreground().forbid_parking();
1177
1178        // Connect to a server as 2 clients.
1179        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1180        let client_a = server.create_client(cx_a, "user_a").await;
1181        let client_b = server.create_client(cx_b, "user_b").await;
1182
1183        // Share a project as client A
1184        fs.insert_tree(
1185            "/a",
1186            json!({
1187                ".zed.toml": r#"collaborators = ["user_b"]"#,
1188                "a.txt": "a-contents",
1189                "b.txt": "b-contents",
1190            }),
1191        )
1192        .await;
1193        let project_a = cx_a.update(|cx| {
1194            Project::local(
1195                client_a.clone(),
1196                client_a.user_store.clone(),
1197                lang_registry.clone(),
1198                fs.clone(),
1199                cx,
1200            )
1201        });
1202        let (worktree_a, _) = project_a
1203            .update(cx_a, |p, cx| {
1204                p.find_or_create_local_worktree("/a", false, cx)
1205            })
1206            .await
1207            .unwrap();
1208        worktree_a
1209            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1210            .await;
1211        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1212        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1213        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1214        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1215
1216        // Join that project as client B
1217        let project_b = Project::remote(
1218            project_id,
1219            client_b.clone(),
1220            client_b.user_store.clone(),
1221            lang_registry.clone(),
1222            fs.clone(),
1223            &mut cx_b.to_async(),
1224        )
1225        .await
1226        .unwrap();
1227        project_b
1228            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1229            .await
1230            .unwrap();
1231
1232        // Unshare the project as client A
1233        project_a
1234            .update(cx_a, |project, cx| project.unshare(cx))
1235            .await
1236            .unwrap();
1237        project_b
1238            .condition(cx_b, |project, _| project.is_read_only())
1239            .await;
1240        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1241        drop(project_b);
1242
1243        // Share the project again and ensure guests can still join.
1244        project_a
1245            .update(cx_a, |project, cx| project.share(cx))
1246            .await
1247            .unwrap();
1248        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1249
1250        let project_c = Project::remote(
1251            project_id,
1252            client_b.clone(),
1253            client_b.user_store.clone(),
1254            lang_registry.clone(),
1255            fs.clone(),
1256            &mut cx_b.to_async(),
1257        )
1258        .await
1259        .unwrap();
1260        project_c
1261            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1262            .await
1263            .unwrap();
1264    }
1265
1266    #[gpui::test(iterations = 10)]
1267    async fn test_propagate_saves_and_fs_changes(
1268        cx_a: &mut TestAppContext,
1269        cx_b: &mut TestAppContext,
1270        cx_c: &mut TestAppContext,
1271    ) {
1272        let lang_registry = Arc::new(LanguageRegistry::new());
1273        let fs = FakeFs::new(cx_a.background());
1274        cx_a.foreground().forbid_parking();
1275
1276        // Connect to a server as 3 clients.
1277        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1278        let client_a = server.create_client(cx_a, "user_a").await;
1279        let client_b = server.create_client(cx_b, "user_b").await;
1280        let client_c = server.create_client(cx_c, "user_c").await;
1281
1282        // Share a worktree as client A.
1283        fs.insert_tree(
1284            "/a",
1285            json!({
1286                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1287                "file1": "",
1288                "file2": ""
1289            }),
1290        )
1291        .await;
1292        let project_a = cx_a.update(|cx| {
1293            Project::local(
1294                client_a.clone(),
1295                client_a.user_store.clone(),
1296                lang_registry.clone(),
1297                fs.clone(),
1298                cx,
1299            )
1300        });
1301        let (worktree_a, _) = project_a
1302            .update(cx_a, |p, cx| {
1303                p.find_or_create_local_worktree("/a", false, cx)
1304            })
1305            .await
1306            .unwrap();
1307        worktree_a
1308            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1309            .await;
1310        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1311        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1312        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1313
1314        // Join that worktree as clients B and C.
1315        let project_b = Project::remote(
1316            project_id,
1317            client_b.clone(),
1318            client_b.user_store.clone(),
1319            lang_registry.clone(),
1320            fs.clone(),
1321            &mut cx_b.to_async(),
1322        )
1323        .await
1324        .unwrap();
1325        let project_c = Project::remote(
1326            project_id,
1327            client_c.clone(),
1328            client_c.user_store.clone(),
1329            lang_registry.clone(),
1330            fs.clone(),
1331            &mut cx_c.to_async(),
1332        )
1333        .await
1334        .unwrap();
1335        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1336        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1337
1338        // Open and edit a buffer as both guests B and C.
1339        let buffer_b = project_b
1340            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1341            .await
1342            .unwrap();
1343        let buffer_c = project_c
1344            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1345            .await
1346            .unwrap();
1347        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1348        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1349
1350        // Open and edit that buffer as the host.
1351        let buffer_a = project_a
1352            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1353            .await
1354            .unwrap();
1355
1356        buffer_a
1357            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1358            .await;
1359        buffer_a.update(cx_a, |buf, cx| {
1360            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1361        });
1362
1363        // Wait for edits to propagate
1364        buffer_a
1365            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1366            .await;
1367        buffer_b
1368            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1369            .await;
1370        buffer_c
1371            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1372            .await;
1373
1374        // Edit the buffer as the host and concurrently save as guest B.
1375        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1376        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1377        save_b.await.unwrap();
1378        assert_eq!(
1379            fs.load("/a/file1".as_ref()).await.unwrap(),
1380            "hi-a, i-am-c, i-am-b, i-am-a"
1381        );
1382        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1383        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1384        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1385
1386        // Make changes on host's file system, see those changes on guest worktrees.
1387        fs.rename(
1388            "/a/file1".as_ref(),
1389            "/a/file1-renamed".as_ref(),
1390            Default::default(),
1391        )
1392        .await
1393        .unwrap();
1394
1395        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1396            .await
1397            .unwrap();
1398        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1399
1400        worktree_a
1401            .condition(&cx_a, |tree, _| {
1402                tree.paths()
1403                    .map(|p| p.to_string_lossy())
1404                    .collect::<Vec<_>>()
1405                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1406            })
1407            .await;
1408        worktree_b
1409            .condition(&cx_b, |tree, _| {
1410                tree.paths()
1411                    .map(|p| p.to_string_lossy())
1412                    .collect::<Vec<_>>()
1413                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1414            })
1415            .await;
1416        worktree_c
1417            .condition(&cx_c, |tree, _| {
1418                tree.paths()
1419                    .map(|p| p.to_string_lossy())
1420                    .collect::<Vec<_>>()
1421                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1422            })
1423            .await;
1424
1425        // Ensure buffer files are updated as well.
1426        buffer_a
1427            .condition(&cx_a, |buf, _| {
1428                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1429            })
1430            .await;
1431        buffer_b
1432            .condition(&cx_b, |buf, _| {
1433                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1434            })
1435            .await;
1436        buffer_c
1437            .condition(&cx_c, |buf, _| {
1438                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1439            })
1440            .await;
1441    }
1442
1443    #[gpui::test(iterations = 10)]
1444    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1445        cx_a.foreground().forbid_parking();
1446        let lang_registry = Arc::new(LanguageRegistry::new());
1447        let fs = FakeFs::new(cx_a.background());
1448
1449        // Connect to a server as 2 clients.
1450        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1451        let client_a = server.create_client(cx_a, "user_a").await;
1452        let client_b = server.create_client(cx_b, "user_b").await;
1453
1454        // Share a project as client A
1455        fs.insert_tree(
1456            "/dir",
1457            json!({
1458                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1459                "a.txt": "a-contents",
1460            }),
1461        )
1462        .await;
1463
1464        let project_a = cx_a.update(|cx| {
1465            Project::local(
1466                client_a.clone(),
1467                client_a.user_store.clone(),
1468                lang_registry.clone(),
1469                fs.clone(),
1470                cx,
1471            )
1472        });
1473        let (worktree_a, _) = project_a
1474            .update(cx_a, |p, cx| {
1475                p.find_or_create_local_worktree("/dir", false, cx)
1476            })
1477            .await
1478            .unwrap();
1479        worktree_a
1480            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1481            .await;
1482        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1483        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1484        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1485
1486        // Join that project as client B
1487        let project_b = Project::remote(
1488            project_id,
1489            client_b.clone(),
1490            client_b.user_store.clone(),
1491            lang_registry.clone(),
1492            fs.clone(),
1493            &mut cx_b.to_async(),
1494        )
1495        .await
1496        .unwrap();
1497
1498        // Open a buffer as client B
1499        let buffer_b = project_b
1500            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1501            .await
1502            .unwrap();
1503
1504        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1505        buffer_b.read_with(cx_b, |buf, _| {
1506            assert!(buf.is_dirty());
1507            assert!(!buf.has_conflict());
1508        });
1509
1510        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1511        buffer_b
1512            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1513            .await;
1514        buffer_b.read_with(cx_b, |buf, _| {
1515            assert!(!buf.has_conflict());
1516        });
1517
1518        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1519        buffer_b.read_with(cx_b, |buf, _| {
1520            assert!(buf.is_dirty());
1521            assert!(!buf.has_conflict());
1522        });
1523    }
1524
1525    #[gpui::test(iterations = 10)]
1526    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1527        cx_a.foreground().forbid_parking();
1528        let lang_registry = Arc::new(LanguageRegistry::new());
1529        let fs = FakeFs::new(cx_a.background());
1530
1531        // Connect to a server as 2 clients.
1532        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1533        let client_a = server.create_client(cx_a, "user_a").await;
1534        let client_b = server.create_client(cx_b, "user_b").await;
1535
1536        // Share a project as client A
1537        fs.insert_tree(
1538            "/dir",
1539            json!({
1540                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1541                "a.txt": "a-contents",
1542            }),
1543        )
1544        .await;
1545
1546        let project_a = cx_a.update(|cx| {
1547            Project::local(
1548                client_a.clone(),
1549                client_a.user_store.clone(),
1550                lang_registry.clone(),
1551                fs.clone(),
1552                cx,
1553            )
1554        });
1555        let (worktree_a, _) = project_a
1556            .update(cx_a, |p, cx| {
1557                p.find_or_create_local_worktree("/dir", false, cx)
1558            })
1559            .await
1560            .unwrap();
1561        worktree_a
1562            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1563            .await;
1564        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1565        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1566        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1567
1568        // Join that project as client B
1569        let project_b = Project::remote(
1570            project_id,
1571            client_b.clone(),
1572            client_b.user_store.clone(),
1573            lang_registry.clone(),
1574            fs.clone(),
1575            &mut cx_b.to_async(),
1576        )
1577        .await
1578        .unwrap();
1579        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1580
1581        // Open a buffer as client B
1582        let buffer_b = project_b
1583            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1584            .await
1585            .unwrap();
1586        buffer_b.read_with(cx_b, |buf, _| {
1587            assert!(!buf.is_dirty());
1588            assert!(!buf.has_conflict());
1589        });
1590
1591        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1592            .await
1593            .unwrap();
1594        buffer_b
1595            .condition(&cx_b, |buf, _| {
1596                buf.text() == "new contents" && !buf.is_dirty()
1597            })
1598            .await;
1599        buffer_b.read_with(cx_b, |buf, _| {
1600            assert!(!buf.has_conflict());
1601        });
1602    }
1603
1604    #[gpui::test(iterations = 10)]
1605    async fn test_editing_while_guest_opens_buffer(
1606        cx_a: &mut TestAppContext,
1607        cx_b: &mut TestAppContext,
1608    ) {
1609        cx_a.foreground().forbid_parking();
1610        let lang_registry = Arc::new(LanguageRegistry::new());
1611        let fs = FakeFs::new(cx_a.background());
1612
1613        // Connect to a server as 2 clients.
1614        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1615        let client_a = server.create_client(cx_a, "user_a").await;
1616        let client_b = server.create_client(cx_b, "user_b").await;
1617
1618        // Share a project as client A
1619        fs.insert_tree(
1620            "/dir",
1621            json!({
1622                ".zed.toml": r#"collaborators = ["user_b"]"#,
1623                "a.txt": "a-contents",
1624            }),
1625        )
1626        .await;
1627        let project_a = cx_a.update(|cx| {
1628            Project::local(
1629                client_a.clone(),
1630                client_a.user_store.clone(),
1631                lang_registry.clone(),
1632                fs.clone(),
1633                cx,
1634            )
1635        });
1636        let (worktree_a, _) = project_a
1637            .update(cx_a, |p, cx| {
1638                p.find_or_create_local_worktree("/dir", false, cx)
1639            })
1640            .await
1641            .unwrap();
1642        worktree_a
1643            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1644            .await;
1645        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1646        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1647        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1648
1649        // Join that project as client B
1650        let project_b = Project::remote(
1651            project_id,
1652            client_b.clone(),
1653            client_b.user_store.clone(),
1654            lang_registry.clone(),
1655            fs.clone(),
1656            &mut cx_b.to_async(),
1657        )
1658        .await
1659        .unwrap();
1660
1661        // Open a buffer as client A
1662        let buffer_a = project_a
1663            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1664            .await
1665            .unwrap();
1666
1667        // Start opening the same buffer as client B
1668        let buffer_b = cx_b
1669            .background()
1670            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1671
1672        // Edit the buffer as client A while client B is still opening it.
1673        cx_b.background().simulate_random_delay().await;
1674        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1675        cx_b.background().simulate_random_delay().await;
1676        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1677
1678        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1679        let buffer_b = buffer_b.await.unwrap();
1680        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1681    }
1682
1683    #[gpui::test(iterations = 10)]
1684    async fn test_leaving_worktree_while_opening_buffer(
1685        cx_a: &mut TestAppContext,
1686        cx_b: &mut TestAppContext,
1687    ) {
1688        cx_a.foreground().forbid_parking();
1689        let lang_registry = Arc::new(LanguageRegistry::new());
1690        let fs = FakeFs::new(cx_a.background());
1691
1692        // Connect to a server as 2 clients.
1693        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1694        let client_a = server.create_client(cx_a, "user_a").await;
1695        let client_b = server.create_client(cx_b, "user_b").await;
1696
1697        // Share a project as client A
1698        fs.insert_tree(
1699            "/dir",
1700            json!({
1701                ".zed.toml": r#"collaborators = ["user_b"]"#,
1702                "a.txt": "a-contents",
1703            }),
1704        )
1705        .await;
1706        let project_a = cx_a.update(|cx| {
1707            Project::local(
1708                client_a.clone(),
1709                client_a.user_store.clone(),
1710                lang_registry.clone(),
1711                fs.clone(),
1712                cx,
1713            )
1714        });
1715        let (worktree_a, _) = project_a
1716            .update(cx_a, |p, cx| {
1717                p.find_or_create_local_worktree("/dir", false, cx)
1718            })
1719            .await
1720            .unwrap();
1721        worktree_a
1722            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1723            .await;
1724        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1725        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1726        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1727
1728        // Join that project as client B
1729        let project_b = Project::remote(
1730            project_id,
1731            client_b.clone(),
1732            client_b.user_store.clone(),
1733            lang_registry.clone(),
1734            fs.clone(),
1735            &mut cx_b.to_async(),
1736        )
1737        .await
1738        .unwrap();
1739
1740        // See that a guest has joined as client A.
1741        project_a
1742            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1743            .await;
1744
1745        // Begin opening a buffer as client B, but leave the project before the open completes.
1746        let buffer_b = cx_b
1747            .background()
1748            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1749        cx_b.update(|_| drop(project_b));
1750        drop(buffer_b);
1751
1752        // See that the guest has left.
1753        project_a
1754            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1755            .await;
1756    }
1757
1758    #[gpui::test(iterations = 10)]
1759    async fn test_peer_disconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1760        cx_a.foreground().forbid_parking();
1761        let lang_registry = Arc::new(LanguageRegistry::new());
1762        let fs = FakeFs::new(cx_a.background());
1763
1764        // Connect to a server as 2 clients.
1765        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1766        let client_a = server.create_client(cx_a, "user_a").await;
1767        let client_b = server.create_client(cx_b, "user_b").await;
1768
1769        // Share a project as client A
1770        fs.insert_tree(
1771            "/a",
1772            json!({
1773                ".zed.toml": r#"collaborators = ["user_b"]"#,
1774                "a.txt": "a-contents",
1775                "b.txt": "b-contents",
1776            }),
1777        )
1778        .await;
1779        let project_a = cx_a.update(|cx| {
1780            Project::local(
1781                client_a.clone(),
1782                client_a.user_store.clone(),
1783                lang_registry.clone(),
1784                fs.clone(),
1785                cx,
1786            )
1787        });
1788        let (worktree_a, _) = project_a
1789            .update(cx_a, |p, cx| {
1790                p.find_or_create_local_worktree("/a", false, cx)
1791            })
1792            .await
1793            .unwrap();
1794        worktree_a
1795            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1796            .await;
1797        let project_id = project_a
1798            .update(cx_a, |project, _| project.next_remote_id())
1799            .await;
1800        project_a
1801            .update(cx_a, |project, cx| project.share(cx))
1802            .await
1803            .unwrap();
1804
1805        // Join that project as client B
1806        let _project_b = Project::remote(
1807            project_id,
1808            client_b.clone(),
1809            client_b.user_store.clone(),
1810            lang_registry.clone(),
1811            fs.clone(),
1812            &mut cx_b.to_async(),
1813        )
1814        .await
1815        .unwrap();
1816
1817        // See that a guest has joined as client A.
1818        project_a
1819            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1820            .await;
1821
1822        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1823        client_b.disconnect(&cx_b.to_async()).unwrap();
1824        project_a
1825            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1826            .await;
1827    }
1828
1829    #[gpui::test(iterations = 10)]
1830    async fn test_collaborating_with_diagnostics(
1831        cx_a: &mut TestAppContext,
1832        cx_b: &mut TestAppContext,
1833    ) {
1834        cx_a.foreground().forbid_parking();
1835        let mut lang_registry = Arc::new(LanguageRegistry::new());
1836        let fs = FakeFs::new(cx_a.background());
1837
1838        // Set up a fake language server.
1839        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1840        Arc::get_mut(&mut lang_registry)
1841            .unwrap()
1842            .add(Arc::new(Language::new(
1843                LanguageConfig {
1844                    name: "Rust".into(),
1845                    path_suffixes: vec!["rs".to_string()],
1846                    language_server: Some(language_server_config),
1847                    ..Default::default()
1848                },
1849                Some(tree_sitter_rust::language()),
1850            )));
1851
1852        // Connect to a server as 2 clients.
1853        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1854        let client_a = server.create_client(cx_a, "user_a").await;
1855        let client_b = server.create_client(cx_b, "user_b").await;
1856
1857        // Share a project as client A
1858        fs.insert_tree(
1859            "/a",
1860            json!({
1861                ".zed.toml": r#"collaborators = ["user_b"]"#,
1862                "a.rs": "let one = two",
1863                "other.rs": "",
1864            }),
1865        )
1866        .await;
1867        let project_a = cx_a.update(|cx| {
1868            Project::local(
1869                client_a.clone(),
1870                client_a.user_store.clone(),
1871                lang_registry.clone(),
1872                fs.clone(),
1873                cx,
1874            )
1875        });
1876        let (worktree_a, _) = project_a
1877            .update(cx_a, |p, cx| {
1878                p.find_or_create_local_worktree("/a", false, cx)
1879            })
1880            .await
1881            .unwrap();
1882        worktree_a
1883            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1884            .await;
1885        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1886        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1887        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1888
1889        // Cause the language server to start.
1890        let _ = cx_a
1891            .background()
1892            .spawn(project_a.update(cx_a, |project, cx| {
1893                project.open_buffer(
1894                    ProjectPath {
1895                        worktree_id,
1896                        path: Path::new("other.rs").into(),
1897                    },
1898                    cx,
1899                )
1900            }))
1901            .await
1902            .unwrap();
1903
1904        // Simulate a language server reporting errors for a file.
1905        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1906        fake_language_server
1907            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1908                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1909                version: None,
1910                diagnostics: vec![lsp::Diagnostic {
1911                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1912                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1913                    message: "message 1".to_string(),
1914                    ..Default::default()
1915                }],
1916            })
1917            .await;
1918
1919        // Wait for server to see the diagnostics update.
1920        server
1921            .condition(|store| {
1922                let worktree = store
1923                    .project(project_id)
1924                    .unwrap()
1925                    .share
1926                    .as_ref()
1927                    .unwrap()
1928                    .worktrees
1929                    .get(&worktree_id.to_proto())
1930                    .unwrap();
1931
1932                !worktree.diagnostic_summaries.is_empty()
1933            })
1934            .await;
1935
1936        // Join the worktree as client B.
1937        let project_b = Project::remote(
1938            project_id,
1939            client_b.clone(),
1940            client_b.user_store.clone(),
1941            lang_registry.clone(),
1942            fs.clone(),
1943            &mut cx_b.to_async(),
1944        )
1945        .await
1946        .unwrap();
1947
1948        project_b.read_with(cx_b, |project, cx| {
1949            assert_eq!(
1950                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1951                &[(
1952                    ProjectPath {
1953                        worktree_id,
1954                        path: Arc::from(Path::new("a.rs")),
1955                    },
1956                    DiagnosticSummary {
1957                        error_count: 1,
1958                        warning_count: 0,
1959                        ..Default::default()
1960                    },
1961                )]
1962            )
1963        });
1964
1965        // Simulate a language server reporting more errors for a file.
1966        fake_language_server
1967            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1968                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1969                version: None,
1970                diagnostics: vec![
1971                    lsp::Diagnostic {
1972                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1973                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1974                        message: "message 1".to_string(),
1975                        ..Default::default()
1976                    },
1977                    lsp::Diagnostic {
1978                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1979                        range: lsp::Range::new(
1980                            lsp::Position::new(0, 10),
1981                            lsp::Position::new(0, 13),
1982                        ),
1983                        message: "message 2".to_string(),
1984                        ..Default::default()
1985                    },
1986                ],
1987            })
1988            .await;
1989
1990        // Client b gets the updated summaries
1991        project_b
1992            .condition(&cx_b, |project, cx| {
1993                project.diagnostic_summaries(cx).collect::<Vec<_>>()
1994                    == &[(
1995                        ProjectPath {
1996                            worktree_id,
1997                            path: Arc::from(Path::new("a.rs")),
1998                        },
1999                        DiagnosticSummary {
2000                            error_count: 1,
2001                            warning_count: 1,
2002                            ..Default::default()
2003                        },
2004                    )]
2005            })
2006            .await;
2007
2008        // Open the file with the errors on client B. They should be present.
2009        let buffer_b = cx_b
2010            .background()
2011            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2012            .await
2013            .unwrap();
2014
2015        buffer_b.read_with(cx_b, |buffer, _| {
2016            assert_eq!(
2017                buffer
2018                    .snapshot()
2019                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2020                    .map(|entry| entry)
2021                    .collect::<Vec<_>>(),
2022                &[
2023                    DiagnosticEntry {
2024                        range: Point::new(0, 4)..Point::new(0, 7),
2025                        diagnostic: Diagnostic {
2026                            group_id: 0,
2027                            message: "message 1".to_string(),
2028                            severity: lsp::DiagnosticSeverity::ERROR,
2029                            is_primary: true,
2030                            ..Default::default()
2031                        }
2032                    },
2033                    DiagnosticEntry {
2034                        range: Point::new(0, 10)..Point::new(0, 13),
2035                        diagnostic: Diagnostic {
2036                            group_id: 1,
2037                            severity: lsp::DiagnosticSeverity::WARNING,
2038                            message: "message 2".to_string(),
2039                            is_primary: true,
2040                            ..Default::default()
2041                        }
2042                    }
2043                ]
2044            );
2045        });
2046    }
2047
2048    #[gpui::test(iterations = 10)]
2049    async fn test_collaborating_with_completion(
2050        cx_a: &mut TestAppContext,
2051        cx_b: &mut TestAppContext,
2052    ) {
2053        cx_a.foreground().forbid_parking();
2054        let mut lang_registry = Arc::new(LanguageRegistry::new());
2055        let fs = FakeFs::new(cx_a.background());
2056
2057        // Set up a fake language server.
2058        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2059        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2060            completion_provider: Some(lsp::CompletionOptions {
2061                trigger_characters: Some(vec![".".to_string()]),
2062                ..Default::default()
2063            }),
2064            ..Default::default()
2065        });
2066        Arc::get_mut(&mut lang_registry)
2067            .unwrap()
2068            .add(Arc::new(Language::new(
2069                LanguageConfig {
2070                    name: "Rust".into(),
2071                    path_suffixes: vec!["rs".to_string()],
2072                    language_server: Some(language_server_config),
2073                    ..Default::default()
2074                },
2075                Some(tree_sitter_rust::language()),
2076            )));
2077
2078        // Connect to a server as 2 clients.
2079        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2080        let client_a = server.create_client(cx_a, "user_a").await;
2081        let client_b = server.create_client(cx_b, "user_b").await;
2082
2083        // Share a project as client A
2084        fs.insert_tree(
2085            "/a",
2086            json!({
2087                ".zed.toml": r#"collaborators = ["user_b"]"#,
2088                "main.rs": "fn main() { a }",
2089                "other.rs": "",
2090            }),
2091        )
2092        .await;
2093        let project_a = cx_a.update(|cx| {
2094            Project::local(
2095                client_a.clone(),
2096                client_a.user_store.clone(),
2097                lang_registry.clone(),
2098                fs.clone(),
2099                cx,
2100            )
2101        });
2102        let (worktree_a, _) = project_a
2103            .update(cx_a, |p, cx| {
2104                p.find_or_create_local_worktree("/a", false, cx)
2105            })
2106            .await
2107            .unwrap();
2108        worktree_a
2109            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2110            .await;
2111        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2112        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2113        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2114
2115        // Join the worktree as client B.
2116        let project_b = Project::remote(
2117            project_id,
2118            client_b.clone(),
2119            client_b.user_store.clone(),
2120            lang_registry.clone(),
2121            fs.clone(),
2122            &mut cx_b.to_async(),
2123        )
2124        .await
2125        .unwrap();
2126
2127        // Open a file in an editor as the guest.
2128        let buffer_b = project_b
2129            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2130            .await
2131            .unwrap();
2132        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2133        let editor_b = cx_b.add_view(window_b, |cx| {
2134            Editor::for_buffer(
2135                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2136                Some(project_b.clone()),
2137                watch::channel_with(Settings::test(cx)).1,
2138                cx,
2139            )
2140        });
2141
2142        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2143        buffer_b
2144            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2145            .await;
2146
2147        // Type a completion trigger character as the guest.
2148        editor_b.update(cx_b, |editor, cx| {
2149            editor.select_ranges([13..13], None, cx);
2150            editor.handle_input(&Input(".".into()), cx);
2151            cx.focus(&editor_b);
2152        });
2153
2154        // Receive a completion request as the host's language server.
2155        // Return some completions from the host's language server.
2156        cx_a.foreground().start_waiting();
2157        fake_language_server
2158            .handle_request::<lsp::request::Completion, _>(|params, _| {
2159                assert_eq!(
2160                    params.text_document_position.text_document.uri,
2161                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2162                );
2163                assert_eq!(
2164                    params.text_document_position.position,
2165                    lsp::Position::new(0, 14),
2166                );
2167
2168                Some(lsp::CompletionResponse::Array(vec![
2169                    lsp::CompletionItem {
2170                        label: "first_method(…)".into(),
2171                        detail: Some("fn(&mut self, B) -> C".into()),
2172                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2173                            new_text: "first_method($1)".to_string(),
2174                            range: lsp::Range::new(
2175                                lsp::Position::new(0, 14),
2176                                lsp::Position::new(0, 14),
2177                            ),
2178                        })),
2179                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2180                        ..Default::default()
2181                    },
2182                    lsp::CompletionItem {
2183                        label: "second_method(…)".into(),
2184                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2185                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2186                            new_text: "second_method()".to_string(),
2187                            range: lsp::Range::new(
2188                                lsp::Position::new(0, 14),
2189                                lsp::Position::new(0, 14),
2190                            ),
2191                        })),
2192                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2193                        ..Default::default()
2194                    },
2195                ]))
2196            })
2197            .next()
2198            .await
2199            .unwrap();
2200        cx_a.foreground().finish_waiting();
2201
2202        // Open the buffer on the host.
2203        let buffer_a = project_a
2204            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2205            .await
2206            .unwrap();
2207        buffer_a
2208            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2209            .await;
2210
2211        // Confirm a completion on the guest.
2212        editor_b
2213            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2214            .await;
2215        editor_b.update(cx_b, |editor, cx| {
2216            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2217            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2218        });
2219
2220        // Return a resolved completion from the host's language server.
2221        // The resolved completion has an additional text edit.
2222        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2223            |params, _| {
2224                assert_eq!(params.label, "first_method(…)");
2225                lsp::CompletionItem {
2226                    label: "first_method(…)".into(),
2227                    detail: Some("fn(&mut self, B) -> C".into()),
2228                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2229                        new_text: "first_method($1)".to_string(),
2230                        range: lsp::Range::new(
2231                            lsp::Position::new(0, 14),
2232                            lsp::Position::new(0, 14),
2233                        ),
2234                    })),
2235                    additional_text_edits: Some(vec![lsp::TextEdit {
2236                        new_text: "use d::SomeTrait;\n".to_string(),
2237                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2238                    }]),
2239                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2240                    ..Default::default()
2241                }
2242            },
2243        );
2244
2245        // The additional edit is applied.
2246        buffer_a
2247            .condition(&cx_a, |buffer, _| {
2248                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2249            })
2250            .await;
2251        buffer_b
2252            .condition(&cx_b, |buffer, _| {
2253                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2254            })
2255            .await;
2256    }
2257
2258    #[gpui::test(iterations = 10)]
2259    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2260        cx_a.foreground().forbid_parking();
2261        let mut lang_registry = Arc::new(LanguageRegistry::new());
2262        let fs = FakeFs::new(cx_a.background());
2263
2264        // Set up a fake language server.
2265        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2266        Arc::get_mut(&mut lang_registry)
2267            .unwrap()
2268            .add(Arc::new(Language::new(
2269                LanguageConfig {
2270                    name: "Rust".into(),
2271                    path_suffixes: vec!["rs".to_string()],
2272                    language_server: Some(language_server_config),
2273                    ..Default::default()
2274                },
2275                Some(tree_sitter_rust::language()),
2276            )));
2277
2278        // Connect to a server as 2 clients.
2279        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2280        let client_a = server.create_client(cx_a, "user_a").await;
2281        let client_b = server.create_client(cx_b, "user_b").await;
2282
2283        // Share a project as client A
2284        fs.insert_tree(
2285            "/a",
2286            json!({
2287                ".zed.toml": r#"collaborators = ["user_b"]"#,
2288                "a.rs": "let one = two",
2289            }),
2290        )
2291        .await;
2292        let project_a = cx_a.update(|cx| {
2293            Project::local(
2294                client_a.clone(),
2295                client_a.user_store.clone(),
2296                lang_registry.clone(),
2297                fs.clone(),
2298                cx,
2299            )
2300        });
2301        let (worktree_a, _) = project_a
2302            .update(cx_a, |p, cx| {
2303                p.find_or_create_local_worktree("/a", false, cx)
2304            })
2305            .await
2306            .unwrap();
2307        worktree_a
2308            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2309            .await;
2310        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2311        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2312        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2313
2314        // Join the worktree as client B.
2315        let project_b = Project::remote(
2316            project_id,
2317            client_b.clone(),
2318            client_b.user_store.clone(),
2319            lang_registry.clone(),
2320            fs.clone(),
2321            &mut cx_b.to_async(),
2322        )
2323        .await
2324        .unwrap();
2325
2326        let buffer_b = cx_b
2327            .background()
2328            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2329            .await
2330            .unwrap();
2331
2332        let format = project_b.update(cx_b, |project, cx| {
2333            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2334        });
2335
2336        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2337        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2338            Some(vec![
2339                lsp::TextEdit {
2340                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2341                    new_text: "h".to_string(),
2342                },
2343                lsp::TextEdit {
2344                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2345                    new_text: "y".to_string(),
2346                },
2347            ])
2348        });
2349
2350        format.await.unwrap();
2351        assert_eq!(
2352            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2353            "let honey = two"
2354        );
2355    }
2356
2357    #[gpui::test(iterations = 10)]
2358    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2359        cx_a.foreground().forbid_parking();
2360        let mut lang_registry = Arc::new(LanguageRegistry::new());
2361        let fs = FakeFs::new(cx_a.background());
2362        fs.insert_tree(
2363            "/root-1",
2364            json!({
2365                ".zed.toml": r#"collaborators = ["user_b"]"#,
2366                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2367            }),
2368        )
2369        .await;
2370        fs.insert_tree(
2371            "/root-2",
2372            json!({
2373                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2374            }),
2375        )
2376        .await;
2377
2378        // Set up a fake language server.
2379        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2380        Arc::get_mut(&mut lang_registry)
2381            .unwrap()
2382            .add(Arc::new(Language::new(
2383                LanguageConfig {
2384                    name: "Rust".into(),
2385                    path_suffixes: vec!["rs".to_string()],
2386                    language_server: Some(language_server_config),
2387                    ..Default::default()
2388                },
2389                Some(tree_sitter_rust::language()),
2390            )));
2391
2392        // Connect to a server as 2 clients.
2393        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2394        let client_a = server.create_client(cx_a, "user_a").await;
2395        let client_b = server.create_client(cx_b, "user_b").await;
2396
2397        // Share a project as client A
2398        let project_a = cx_a.update(|cx| {
2399            Project::local(
2400                client_a.clone(),
2401                client_a.user_store.clone(),
2402                lang_registry.clone(),
2403                fs.clone(),
2404                cx,
2405            )
2406        });
2407        let (worktree_a, _) = project_a
2408            .update(cx_a, |p, cx| {
2409                p.find_or_create_local_worktree("/root-1", false, cx)
2410            })
2411            .await
2412            .unwrap();
2413        worktree_a
2414            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2415            .await;
2416        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2417        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2418        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2419
2420        // Join the worktree as client B.
2421        let project_b = Project::remote(
2422            project_id,
2423            client_b.clone(),
2424            client_b.user_store.clone(),
2425            lang_registry.clone(),
2426            fs.clone(),
2427            &mut cx_b.to_async(),
2428        )
2429        .await
2430        .unwrap();
2431
2432        // Open the file on client B.
2433        let buffer_b = cx_b
2434            .background()
2435            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2436            .await
2437            .unwrap();
2438
2439        // Request the definition of a symbol as the guest.
2440        let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2441
2442        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2443        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2444            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2445                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2446                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2447            )))
2448        });
2449
2450        let definitions_1 = definitions_1.await.unwrap();
2451        cx_b.read(|cx| {
2452            assert_eq!(definitions_1.len(), 1);
2453            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2454            let target_buffer = definitions_1[0].buffer.read(cx);
2455            assert_eq!(
2456                target_buffer.text(),
2457                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2458            );
2459            assert_eq!(
2460                definitions_1[0].range.to_point(target_buffer),
2461                Point::new(0, 6)..Point::new(0, 9)
2462            );
2463        });
2464
2465        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2466        // the previous call to `definition`.
2467        let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2468        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2469            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2470                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2471                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2472            )))
2473        });
2474
2475        let definitions_2 = definitions_2.await.unwrap();
2476        cx_b.read(|cx| {
2477            assert_eq!(definitions_2.len(), 1);
2478            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2479            let target_buffer = definitions_2[0].buffer.read(cx);
2480            assert_eq!(
2481                target_buffer.text(),
2482                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2483            );
2484            assert_eq!(
2485                definitions_2[0].range.to_point(target_buffer),
2486                Point::new(1, 6)..Point::new(1, 11)
2487            );
2488        });
2489        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2490    }
2491
2492    #[gpui::test(iterations = 10)]
2493    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2494        cx_a.foreground().forbid_parking();
2495        let mut lang_registry = Arc::new(LanguageRegistry::new());
2496        let fs = FakeFs::new(cx_a.background());
2497        fs.insert_tree(
2498            "/root-1",
2499            json!({
2500                ".zed.toml": r#"collaborators = ["user_b"]"#,
2501                "one.rs": "const ONE: usize = 1;",
2502                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2503            }),
2504        )
2505        .await;
2506        fs.insert_tree(
2507            "/root-2",
2508            json!({
2509                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2510            }),
2511        )
2512        .await;
2513
2514        // Set up a fake language server.
2515        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2516        Arc::get_mut(&mut lang_registry)
2517            .unwrap()
2518            .add(Arc::new(Language::new(
2519                LanguageConfig {
2520                    name: "Rust".into(),
2521                    path_suffixes: vec!["rs".to_string()],
2522                    language_server: Some(language_server_config),
2523                    ..Default::default()
2524                },
2525                Some(tree_sitter_rust::language()),
2526            )));
2527
2528        // Connect to a server as 2 clients.
2529        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2530        let client_a = server.create_client(cx_a, "user_a").await;
2531        let client_b = server.create_client(cx_b, "user_b").await;
2532
2533        // Share a project as client A
2534        let project_a = cx_a.update(|cx| {
2535            Project::local(
2536                client_a.clone(),
2537                client_a.user_store.clone(),
2538                lang_registry.clone(),
2539                fs.clone(),
2540                cx,
2541            )
2542        });
2543        let (worktree_a, _) = project_a
2544            .update(cx_a, |p, cx| {
2545                p.find_or_create_local_worktree("/root-1", false, cx)
2546            })
2547            .await
2548            .unwrap();
2549        worktree_a
2550            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2551            .await;
2552        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2553        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2554        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2555
2556        // Join the worktree as client B.
2557        let project_b = Project::remote(
2558            project_id,
2559            client_b.clone(),
2560            client_b.user_store.clone(),
2561            lang_registry.clone(),
2562            fs.clone(),
2563            &mut cx_b.to_async(),
2564        )
2565        .await
2566        .unwrap();
2567
2568        // Open the file on client B.
2569        let buffer_b = cx_b
2570            .background()
2571            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2572            .await
2573            .unwrap();
2574
2575        // Request references to a symbol as the guest.
2576        let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2577
2578        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2579        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2580            assert_eq!(
2581                params.text_document_position.text_document.uri.as_str(),
2582                "file:///root-1/one.rs"
2583            );
2584            Some(vec![
2585                lsp::Location {
2586                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2587                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2588                },
2589                lsp::Location {
2590                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2591                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2592                },
2593                lsp::Location {
2594                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2595                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2596                },
2597            ])
2598        });
2599
2600        let references = references.await.unwrap();
2601        cx_b.read(|cx| {
2602            assert_eq!(references.len(), 3);
2603            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2604
2605            let two_buffer = references[0].buffer.read(cx);
2606            let three_buffer = references[2].buffer.read(cx);
2607            assert_eq!(
2608                two_buffer.file().unwrap().path().as_ref(),
2609                Path::new("two.rs")
2610            );
2611            assert_eq!(references[1].buffer, references[0].buffer);
2612            assert_eq!(
2613                three_buffer.file().unwrap().full_path(cx),
2614                Path::new("three.rs")
2615            );
2616
2617            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2618            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2619            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2620        });
2621    }
2622
2623    #[gpui::test(iterations = 10)]
2624    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2625        cx_a.foreground().forbid_parking();
2626        let lang_registry = Arc::new(LanguageRegistry::new());
2627        let fs = FakeFs::new(cx_a.background());
2628        fs.insert_tree(
2629            "/root-1",
2630            json!({
2631                ".zed.toml": r#"collaborators = ["user_b"]"#,
2632                "a": "hello world",
2633                "b": "goodnight moon",
2634                "c": "a world of goo",
2635                "d": "world champion of clown world",
2636            }),
2637        )
2638        .await;
2639        fs.insert_tree(
2640            "/root-2",
2641            json!({
2642                "e": "disney world is fun",
2643            }),
2644        )
2645        .await;
2646
2647        // Connect to a server as 2 clients.
2648        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2649        let client_a = server.create_client(cx_a, "user_a").await;
2650        let client_b = server.create_client(cx_b, "user_b").await;
2651
2652        // Share a project as client A
2653        let project_a = cx_a.update(|cx| {
2654            Project::local(
2655                client_a.clone(),
2656                client_a.user_store.clone(),
2657                lang_registry.clone(),
2658                fs.clone(),
2659                cx,
2660            )
2661        });
2662        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2663
2664        let (worktree_1, _) = project_a
2665            .update(cx_a, |p, cx| {
2666                p.find_or_create_local_worktree("/root-1", false, cx)
2667            })
2668            .await
2669            .unwrap();
2670        worktree_1
2671            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2672            .await;
2673        let (worktree_2, _) = project_a
2674            .update(cx_a, |p, cx| {
2675                p.find_or_create_local_worktree("/root-2", false, cx)
2676            })
2677            .await
2678            .unwrap();
2679        worktree_2
2680            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2681            .await;
2682
2683        eprintln!("sharing");
2684
2685        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2686
2687        // Join the worktree as client B.
2688        let project_b = Project::remote(
2689            project_id,
2690            client_b.clone(),
2691            client_b.user_store.clone(),
2692            lang_registry.clone(),
2693            fs.clone(),
2694            &mut cx_b.to_async(),
2695        )
2696        .await
2697        .unwrap();
2698
2699        let results = project_b
2700            .update(cx_b, |project, cx| {
2701                project.search(SearchQuery::text("world", false, false), cx)
2702            })
2703            .await
2704            .unwrap();
2705
2706        let mut ranges_by_path = results
2707            .into_iter()
2708            .map(|(buffer, ranges)| {
2709                buffer.read_with(cx_b, |buffer, cx| {
2710                    let path = buffer.file().unwrap().full_path(cx);
2711                    let offset_ranges = ranges
2712                        .into_iter()
2713                        .map(|range| range.to_offset(buffer))
2714                        .collect::<Vec<_>>();
2715                    (path, offset_ranges)
2716                })
2717            })
2718            .collect::<Vec<_>>();
2719        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2720
2721        assert_eq!(
2722            ranges_by_path,
2723            &[
2724                (PathBuf::from("root-1/a"), vec![6..11]),
2725                (PathBuf::from("root-1/c"), vec![2..7]),
2726                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2727                (PathBuf::from("root-2/e"), vec![7..12]),
2728            ]
2729        );
2730    }
2731
2732    #[gpui::test(iterations = 10)]
2733    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2734        cx_a.foreground().forbid_parking();
2735        let lang_registry = Arc::new(LanguageRegistry::new());
2736        let fs = FakeFs::new(cx_a.background());
2737        fs.insert_tree(
2738            "/root-1",
2739            json!({
2740                ".zed.toml": r#"collaborators = ["user_b"]"#,
2741                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2742            }),
2743        )
2744        .await;
2745
2746        // Set up a fake language server.
2747        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2748        lang_registry.add(Arc::new(Language::new(
2749            LanguageConfig {
2750                name: "Rust".into(),
2751                path_suffixes: vec!["rs".to_string()],
2752                language_server: Some(language_server_config),
2753                ..Default::default()
2754            },
2755            Some(tree_sitter_rust::language()),
2756        )));
2757
2758        // Connect to a server as 2 clients.
2759        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2760        let client_a = server.create_client(cx_a, "user_a").await;
2761        let client_b = server.create_client(cx_b, "user_b").await;
2762
2763        // Share a project as client A
2764        let project_a = cx_a.update(|cx| {
2765            Project::local(
2766                client_a.clone(),
2767                client_a.user_store.clone(),
2768                lang_registry.clone(),
2769                fs.clone(),
2770                cx,
2771            )
2772        });
2773        let (worktree_a, _) = project_a
2774            .update(cx_a, |p, cx| {
2775                p.find_or_create_local_worktree("/root-1", false, cx)
2776            })
2777            .await
2778            .unwrap();
2779        worktree_a
2780            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2781            .await;
2782        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2783        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2784        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2785
2786        // Join the worktree as client B.
2787        let project_b = Project::remote(
2788            project_id,
2789            client_b.clone(),
2790            client_b.user_store.clone(),
2791            lang_registry.clone(),
2792            fs.clone(),
2793            &mut cx_b.to_async(),
2794        )
2795        .await
2796        .unwrap();
2797
2798        // Open the file on client B.
2799        let buffer_b = cx_b
2800            .background()
2801            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2802            .await
2803            .unwrap();
2804
2805        // Request document highlights as the guest.
2806        let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2807
2808        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2809        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2810            |params, _| {
2811                assert_eq!(
2812                    params
2813                        .text_document_position_params
2814                        .text_document
2815                        .uri
2816                        .as_str(),
2817                    "file:///root-1/main.rs"
2818                );
2819                assert_eq!(
2820                    params.text_document_position_params.position,
2821                    lsp::Position::new(0, 34)
2822                );
2823                Some(vec![
2824                    lsp::DocumentHighlight {
2825                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2826                        range: lsp::Range::new(
2827                            lsp::Position::new(0, 10),
2828                            lsp::Position::new(0, 16),
2829                        ),
2830                    },
2831                    lsp::DocumentHighlight {
2832                        kind: Some(lsp::DocumentHighlightKind::READ),
2833                        range: lsp::Range::new(
2834                            lsp::Position::new(0, 32),
2835                            lsp::Position::new(0, 38),
2836                        ),
2837                    },
2838                    lsp::DocumentHighlight {
2839                        kind: Some(lsp::DocumentHighlightKind::READ),
2840                        range: lsp::Range::new(
2841                            lsp::Position::new(0, 41),
2842                            lsp::Position::new(0, 47),
2843                        ),
2844                    },
2845                ])
2846            },
2847        );
2848
2849        let highlights = highlights.await.unwrap();
2850        buffer_b.read_with(cx_b, |buffer, _| {
2851            let snapshot = buffer.snapshot();
2852
2853            let highlights = highlights
2854                .into_iter()
2855                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2856                .collect::<Vec<_>>();
2857            assert_eq!(
2858                highlights,
2859                &[
2860                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2861                    (lsp::DocumentHighlightKind::READ, 32..38),
2862                    (lsp::DocumentHighlightKind::READ, 41..47)
2863                ]
2864            )
2865        });
2866    }
2867
2868    #[gpui::test(iterations = 10)]
2869    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2870        cx_a.foreground().forbid_parking();
2871        let mut lang_registry = Arc::new(LanguageRegistry::new());
2872        let fs = FakeFs::new(cx_a.background());
2873        fs.insert_tree(
2874            "/code",
2875            json!({
2876                "crate-1": {
2877                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2878                    "one.rs": "const ONE: usize = 1;",
2879                },
2880                "crate-2": {
2881                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2882                },
2883                "private": {
2884                    "passwords.txt": "the-password",
2885                }
2886            }),
2887        )
2888        .await;
2889
2890        // Set up a fake language server.
2891        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2892        Arc::get_mut(&mut lang_registry)
2893            .unwrap()
2894            .add(Arc::new(Language::new(
2895                LanguageConfig {
2896                    name: "Rust".into(),
2897                    path_suffixes: vec!["rs".to_string()],
2898                    language_server: Some(language_server_config),
2899                    ..Default::default()
2900                },
2901                Some(tree_sitter_rust::language()),
2902            )));
2903
2904        // Connect to a server as 2 clients.
2905        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2906        let client_a = server.create_client(cx_a, "user_a").await;
2907        let client_b = server.create_client(cx_b, "user_b").await;
2908
2909        // Share a project as client A
2910        let project_a = cx_a.update(|cx| {
2911            Project::local(
2912                client_a.clone(),
2913                client_a.user_store.clone(),
2914                lang_registry.clone(),
2915                fs.clone(),
2916                cx,
2917            )
2918        });
2919        let (worktree_a, _) = project_a
2920            .update(cx_a, |p, cx| {
2921                p.find_or_create_local_worktree("/code/crate-1", false, cx)
2922            })
2923            .await
2924            .unwrap();
2925        worktree_a
2926            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2927            .await;
2928        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2929        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2930        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2931
2932        // Join the worktree as client B.
2933        let project_b = Project::remote(
2934            project_id,
2935            client_b.clone(),
2936            client_b.user_store.clone(),
2937            lang_registry.clone(),
2938            fs.clone(),
2939            &mut cx_b.to_async(),
2940        )
2941        .await
2942        .unwrap();
2943
2944        // Cause the language server to start.
2945        let _buffer = cx_b
2946            .background()
2947            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2948            .await
2949            .unwrap();
2950
2951        // Request the definition of a symbol as the guest.
2952        let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
2953        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2954        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2955            #[allow(deprecated)]
2956            Some(vec![lsp::SymbolInformation {
2957                name: "TWO".into(),
2958                location: lsp::Location {
2959                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2960                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2961                },
2962                kind: lsp::SymbolKind::CONSTANT,
2963                tags: None,
2964                container_name: None,
2965                deprecated: None,
2966            }])
2967        });
2968
2969        let symbols = symbols.await.unwrap();
2970        assert_eq!(symbols.len(), 1);
2971        assert_eq!(symbols[0].name, "TWO");
2972
2973        // Open one of the returned symbols.
2974        let buffer_b_2 = project_b
2975            .update(cx_b, |project, cx| {
2976                project.open_buffer_for_symbol(&symbols[0], cx)
2977            })
2978            .await
2979            .unwrap();
2980        buffer_b_2.read_with(cx_b, |buffer, _| {
2981            assert_eq!(
2982                buffer.file().unwrap().path().as_ref(),
2983                Path::new("../crate-2/two.rs")
2984            );
2985        });
2986
2987        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
2988        let mut fake_symbol = symbols[0].clone();
2989        fake_symbol.path = Path::new("/code/secrets").into();
2990        let error = project_b
2991            .update(cx_b, |project, cx| {
2992                project.open_buffer_for_symbol(&fake_symbol, cx)
2993            })
2994            .await
2995            .unwrap_err();
2996        assert!(error.to_string().contains("invalid symbol signature"));
2997    }
2998
2999    #[gpui::test(iterations = 10)]
3000    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3001        cx_a: &mut TestAppContext,
3002        cx_b: &mut TestAppContext,
3003        mut rng: StdRng,
3004    ) {
3005        cx_a.foreground().forbid_parking();
3006        let mut lang_registry = Arc::new(LanguageRegistry::new());
3007        let fs = FakeFs::new(cx_a.background());
3008        fs.insert_tree(
3009            "/root",
3010            json!({
3011                ".zed.toml": r#"collaborators = ["user_b"]"#,
3012                "a.rs": "const ONE: usize = b::TWO;",
3013                "b.rs": "const TWO: usize = 2",
3014            }),
3015        )
3016        .await;
3017
3018        // Set up a fake language server.
3019        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3020
3021        Arc::get_mut(&mut lang_registry)
3022            .unwrap()
3023            .add(Arc::new(Language::new(
3024                LanguageConfig {
3025                    name: "Rust".into(),
3026                    path_suffixes: vec!["rs".to_string()],
3027                    language_server: Some(language_server_config),
3028                    ..Default::default()
3029                },
3030                Some(tree_sitter_rust::language()),
3031            )));
3032
3033        // Connect to a server as 2 clients.
3034        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3035        let client_a = server.create_client(cx_a, "user_a").await;
3036        let client_b = server.create_client(cx_b, "user_b").await;
3037
3038        // Share a project as client A
3039        let project_a = cx_a.update(|cx| {
3040            Project::local(
3041                client_a.clone(),
3042                client_a.user_store.clone(),
3043                lang_registry.clone(),
3044                fs.clone(),
3045                cx,
3046            )
3047        });
3048
3049        let (worktree_a, _) = project_a
3050            .update(cx_a, |p, cx| {
3051                p.find_or_create_local_worktree("/root", false, cx)
3052            })
3053            .await
3054            .unwrap();
3055        worktree_a
3056            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3057            .await;
3058        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3059        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3060        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3061
3062        // Join the worktree as client B.
3063        let project_b = Project::remote(
3064            project_id,
3065            client_b.clone(),
3066            client_b.user_store.clone(),
3067            lang_registry.clone(),
3068            fs.clone(),
3069            &mut cx_b.to_async(),
3070        )
3071        .await
3072        .unwrap();
3073
3074        let buffer_b1 = cx_b
3075            .background()
3076            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3077            .await
3078            .unwrap();
3079
3080        let definitions;
3081        let buffer_b2;
3082        if rng.gen() {
3083            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3084            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3085        } else {
3086            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3087            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3088        }
3089
3090        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3091        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3092            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3093                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3094                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3095            )))
3096        });
3097
3098        let buffer_b2 = buffer_b2.await.unwrap();
3099        let definitions = definitions.await.unwrap();
3100        assert_eq!(definitions.len(), 1);
3101        assert_eq!(definitions[0].buffer, buffer_b2);
3102    }
3103
3104    #[gpui::test(iterations = 10)]
3105    async fn test_collaborating_with_code_actions(
3106        cx_a: &mut TestAppContext,
3107        cx_b: &mut TestAppContext,
3108    ) {
3109        cx_a.foreground().forbid_parking();
3110        let mut lang_registry = Arc::new(LanguageRegistry::new());
3111        let fs = FakeFs::new(cx_a.background());
3112        let mut path_openers_b = Vec::new();
3113        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3114
3115        // Set up a fake language server.
3116        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3117        Arc::get_mut(&mut lang_registry)
3118            .unwrap()
3119            .add(Arc::new(Language::new(
3120                LanguageConfig {
3121                    name: "Rust".into(),
3122                    path_suffixes: vec!["rs".to_string()],
3123                    language_server: Some(language_server_config),
3124                    ..Default::default()
3125                },
3126                Some(tree_sitter_rust::language()),
3127            )));
3128
3129        // Connect to a server as 2 clients.
3130        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3131        let client_a = server.create_client(cx_a, "user_a").await;
3132        let client_b = server.create_client(cx_b, "user_b").await;
3133
3134        // Share a project as client A
3135        fs.insert_tree(
3136            "/a",
3137            json!({
3138                ".zed.toml": r#"collaborators = ["user_b"]"#,
3139                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3140                "other.rs": "pub fn foo() -> usize { 4 }",
3141            }),
3142        )
3143        .await;
3144        let project_a = cx_a.update(|cx| {
3145            Project::local(
3146                client_a.clone(),
3147                client_a.user_store.clone(),
3148                lang_registry.clone(),
3149                fs.clone(),
3150                cx,
3151            )
3152        });
3153        let (worktree_a, _) = project_a
3154            .update(cx_a, |p, cx| {
3155                p.find_or_create_local_worktree("/a", false, cx)
3156            })
3157            .await
3158            .unwrap();
3159        worktree_a
3160            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3161            .await;
3162        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3163        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3164        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3165
3166        // Join the worktree as client B.
3167        let project_b = Project::remote(
3168            project_id,
3169            client_b.clone(),
3170            client_b.user_store.clone(),
3171            lang_registry.clone(),
3172            fs.clone(),
3173            &mut cx_b.to_async(),
3174        )
3175        .await
3176        .unwrap();
3177        let mut params = cx_b.update(WorkspaceParams::test);
3178        params.languages = lang_registry.clone();
3179        params.client = client_b.client.clone();
3180        params.user_store = client_b.user_store.clone();
3181        params.project = project_b;
3182        params.path_openers = path_openers_b.into();
3183
3184        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3185        let editor_b = workspace_b
3186            .update(cx_b, |workspace, cx| {
3187                workspace.open_path((worktree_id, "main.rs").into(), cx)
3188            })
3189            .await
3190            .unwrap()
3191            .downcast::<Editor>()
3192            .unwrap();
3193
3194        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3195        fake_language_server
3196            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3197                assert_eq!(
3198                    params.text_document.uri,
3199                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3200                );
3201                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3202                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3203                None
3204            })
3205            .next()
3206            .await;
3207
3208        // Move cursor to a location that contains code actions.
3209        editor_b.update(cx_b, |editor, cx| {
3210            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3211            cx.focus(&editor_b);
3212        });
3213
3214        fake_language_server
3215            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3216                assert_eq!(
3217                    params.text_document.uri,
3218                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3219                );
3220                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3221                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3222
3223                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3224                    lsp::CodeAction {
3225                        title: "Inline into all callers".to_string(),
3226                        edit: Some(lsp::WorkspaceEdit {
3227                            changes: Some(
3228                                [
3229                                    (
3230                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3231                                        vec![lsp::TextEdit::new(
3232                                            lsp::Range::new(
3233                                                lsp::Position::new(1, 22),
3234                                                lsp::Position::new(1, 34),
3235                                            ),
3236                                            "4".to_string(),
3237                                        )],
3238                                    ),
3239                                    (
3240                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3241                                        vec![lsp::TextEdit::new(
3242                                            lsp::Range::new(
3243                                                lsp::Position::new(0, 0),
3244                                                lsp::Position::new(0, 27),
3245                                            ),
3246                                            "".to_string(),
3247                                        )],
3248                                    ),
3249                                ]
3250                                .into_iter()
3251                                .collect(),
3252                            ),
3253                            ..Default::default()
3254                        }),
3255                        data: Some(json!({
3256                            "codeActionParams": {
3257                                "range": {
3258                                    "start": {"line": 1, "column": 31},
3259                                    "end": {"line": 1, "column": 31},
3260                                }
3261                            }
3262                        })),
3263                        ..Default::default()
3264                    },
3265                )])
3266            })
3267            .next()
3268            .await;
3269
3270        // Toggle code actions and wait for them to display.
3271        editor_b.update(cx_b, |editor, cx| {
3272            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3273        });
3274        editor_b
3275            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3276            .await;
3277
3278        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3279
3280        // Confirming the code action will trigger a resolve request.
3281        let confirm_action = workspace_b
3282            .update(cx_b, |workspace, cx| {
3283                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3284            })
3285            .unwrap();
3286        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3287            lsp::CodeAction {
3288                title: "Inline into all callers".to_string(),
3289                edit: Some(lsp::WorkspaceEdit {
3290                    changes: Some(
3291                        [
3292                            (
3293                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3294                                vec![lsp::TextEdit::new(
3295                                    lsp::Range::new(
3296                                        lsp::Position::new(1, 22),
3297                                        lsp::Position::new(1, 34),
3298                                    ),
3299                                    "4".to_string(),
3300                                )],
3301                            ),
3302                            (
3303                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3304                                vec![lsp::TextEdit::new(
3305                                    lsp::Range::new(
3306                                        lsp::Position::new(0, 0),
3307                                        lsp::Position::new(0, 27),
3308                                    ),
3309                                    "".to_string(),
3310                                )],
3311                            ),
3312                        ]
3313                        .into_iter()
3314                        .collect(),
3315                    ),
3316                    ..Default::default()
3317                }),
3318                ..Default::default()
3319            }
3320        });
3321
3322        // After the action is confirmed, an editor containing both modified files is opened.
3323        confirm_action.await.unwrap();
3324        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3325            workspace
3326                .active_item(cx)
3327                .unwrap()
3328                .downcast::<Editor>()
3329                .unwrap()
3330        });
3331        code_action_editor.update(cx_b, |editor, cx| {
3332            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3333            editor.undo(&Undo, cx);
3334            assert_eq!(
3335                editor.text(cx),
3336                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3337            );
3338            editor.redo(&Redo, cx);
3339            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3340        });
3341    }
3342
3343    #[gpui::test(iterations = 10)]
3344    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3345        cx_a.foreground().forbid_parking();
3346        let mut lang_registry = Arc::new(LanguageRegistry::new());
3347        let fs = FakeFs::new(cx_a.background());
3348        let mut path_openers_b = Vec::new();
3349        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3350
3351        // Set up a fake language server.
3352        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3353        Arc::get_mut(&mut lang_registry)
3354            .unwrap()
3355            .add(Arc::new(Language::new(
3356                LanguageConfig {
3357                    name: "Rust".into(),
3358                    path_suffixes: vec!["rs".to_string()],
3359                    language_server: Some(language_server_config),
3360                    ..Default::default()
3361                },
3362                Some(tree_sitter_rust::language()),
3363            )));
3364
3365        // Connect to a server as 2 clients.
3366        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3367        let client_a = server.create_client(cx_a, "user_a").await;
3368        let client_b = server.create_client(cx_b, "user_b").await;
3369
3370        // Share a project as client A
3371        fs.insert_tree(
3372            "/dir",
3373            json!({
3374                ".zed.toml": r#"collaborators = ["user_b"]"#,
3375                "one.rs": "const ONE: usize = 1;",
3376                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3377            }),
3378        )
3379        .await;
3380        let project_a = cx_a.update(|cx| {
3381            Project::local(
3382                client_a.clone(),
3383                client_a.user_store.clone(),
3384                lang_registry.clone(),
3385                fs.clone(),
3386                cx,
3387            )
3388        });
3389        let (worktree_a, _) = project_a
3390            .update(cx_a, |p, cx| {
3391                p.find_or_create_local_worktree("/dir", false, cx)
3392            })
3393            .await
3394            .unwrap();
3395        worktree_a
3396            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3397            .await;
3398        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3399        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3400        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3401
3402        // Join the worktree as client B.
3403        let project_b = Project::remote(
3404            project_id,
3405            client_b.clone(),
3406            client_b.user_store.clone(),
3407            lang_registry.clone(),
3408            fs.clone(),
3409            &mut cx_b.to_async(),
3410        )
3411        .await
3412        .unwrap();
3413        let mut params = cx_b.update(WorkspaceParams::test);
3414        params.languages = lang_registry.clone();
3415        params.client = client_b.client.clone();
3416        params.user_store = client_b.user_store.clone();
3417        params.project = project_b;
3418        params.path_openers = path_openers_b.into();
3419
3420        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3421        let editor_b = workspace_b
3422            .update(cx_b, |workspace, cx| {
3423                workspace.open_path((worktree_id, "one.rs").into(), cx)
3424            })
3425            .await
3426            .unwrap()
3427            .downcast::<Editor>()
3428            .unwrap();
3429        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3430
3431        // Move cursor to a location that can be renamed.
3432        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3433            editor.select_ranges([7..7], None, cx);
3434            editor.rename(&Rename, cx).unwrap()
3435        });
3436
3437        fake_language_server
3438            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3439                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3440                assert_eq!(params.position, lsp::Position::new(0, 7));
3441                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3442                    lsp::Position::new(0, 6),
3443                    lsp::Position::new(0, 9),
3444                )))
3445            })
3446            .next()
3447            .await
3448            .unwrap();
3449        prepare_rename.await.unwrap();
3450        editor_b.update(cx_b, |editor, cx| {
3451            let rename = editor.pending_rename().unwrap();
3452            let buffer = editor.buffer().read(cx).snapshot(cx);
3453            assert_eq!(
3454                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3455                6..9
3456            );
3457            rename.editor.update(cx, |rename_editor, cx| {
3458                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3459                    rename_buffer.edit([0..3], "THREE", cx);
3460                });
3461            });
3462        });
3463
3464        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3465            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3466        });
3467        fake_language_server
3468            .handle_request::<lsp::request::Rename, _>(|params, _| {
3469                assert_eq!(
3470                    params.text_document_position.text_document.uri.as_str(),
3471                    "file:///dir/one.rs"
3472                );
3473                assert_eq!(
3474                    params.text_document_position.position,
3475                    lsp::Position::new(0, 6)
3476                );
3477                assert_eq!(params.new_name, "THREE");
3478                Some(lsp::WorkspaceEdit {
3479                    changes: Some(
3480                        [
3481                            (
3482                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3483                                vec![lsp::TextEdit::new(
3484                                    lsp::Range::new(
3485                                        lsp::Position::new(0, 6),
3486                                        lsp::Position::new(0, 9),
3487                                    ),
3488                                    "THREE".to_string(),
3489                                )],
3490                            ),
3491                            (
3492                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3493                                vec![
3494                                    lsp::TextEdit::new(
3495                                        lsp::Range::new(
3496                                            lsp::Position::new(0, 24),
3497                                            lsp::Position::new(0, 27),
3498                                        ),
3499                                        "THREE".to_string(),
3500                                    ),
3501                                    lsp::TextEdit::new(
3502                                        lsp::Range::new(
3503                                            lsp::Position::new(0, 35),
3504                                            lsp::Position::new(0, 38),
3505                                        ),
3506                                        "THREE".to_string(),
3507                                    ),
3508                                ],
3509                            ),
3510                        ]
3511                        .into_iter()
3512                        .collect(),
3513                    ),
3514                    ..Default::default()
3515                })
3516            })
3517            .next()
3518            .await
3519            .unwrap();
3520        confirm_rename.await.unwrap();
3521
3522        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3523            workspace
3524                .active_item(cx)
3525                .unwrap()
3526                .downcast::<Editor>()
3527                .unwrap()
3528        });
3529        rename_editor.update(cx_b, |editor, cx| {
3530            assert_eq!(
3531                editor.text(cx),
3532                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3533            );
3534            editor.undo(&Undo, cx);
3535            assert_eq!(
3536                editor.text(cx),
3537                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3538            );
3539            editor.redo(&Redo, cx);
3540            assert_eq!(
3541                editor.text(cx),
3542                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3543            );
3544        });
3545
3546        // Ensure temporary rename edits cannot be undone/redone.
3547        editor_b.update(cx_b, |editor, cx| {
3548            editor.undo(&Undo, cx);
3549            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3550            editor.undo(&Undo, cx);
3551            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3552            editor.redo(&Redo, cx);
3553            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3554        })
3555    }
3556
3557    #[gpui::test(iterations = 10)]
3558    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3559        cx_a.foreground().forbid_parking();
3560
3561        // Connect to a server as 2 clients.
3562        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3563        let client_a = server.create_client(cx_a, "user_a").await;
3564        let client_b = server.create_client(cx_b, "user_b").await;
3565
3566        // Create an org that includes these 2 users.
3567        let db = &server.app_state.db;
3568        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3569        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3570            .await
3571            .unwrap();
3572        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3573            .await
3574            .unwrap();
3575
3576        // Create a channel that includes all the users.
3577        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3578        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3579            .await
3580            .unwrap();
3581        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3582            .await
3583            .unwrap();
3584        db.create_channel_message(
3585            channel_id,
3586            client_b.current_user_id(&cx_b),
3587            "hello A, it's B.",
3588            OffsetDateTime::now_utc(),
3589            1,
3590        )
3591        .await
3592        .unwrap();
3593
3594        let channels_a = cx_a
3595            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3596        channels_a
3597            .condition(cx_a, |list, _| list.available_channels().is_some())
3598            .await;
3599        channels_a.read_with(cx_a, |list, _| {
3600            assert_eq!(
3601                list.available_channels().unwrap(),
3602                &[ChannelDetails {
3603                    id: channel_id.to_proto(),
3604                    name: "test-channel".to_string()
3605                }]
3606            )
3607        });
3608        let channel_a = channels_a.update(cx_a, |this, cx| {
3609            this.get_channel(channel_id.to_proto(), cx).unwrap()
3610        });
3611        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3612        channel_a
3613            .condition(&cx_a, |channel, _| {
3614                channel_messages(channel)
3615                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3616            })
3617            .await;
3618
3619        let channels_b = cx_b
3620            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3621        channels_b
3622            .condition(cx_b, |list, _| list.available_channels().is_some())
3623            .await;
3624        channels_b.read_with(cx_b, |list, _| {
3625            assert_eq!(
3626                list.available_channels().unwrap(),
3627                &[ChannelDetails {
3628                    id: channel_id.to_proto(),
3629                    name: "test-channel".to_string()
3630                }]
3631            )
3632        });
3633
3634        let channel_b = channels_b.update(cx_b, |this, cx| {
3635            this.get_channel(channel_id.to_proto(), cx).unwrap()
3636        });
3637        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3638        channel_b
3639            .condition(&cx_b, |channel, _| {
3640                channel_messages(channel)
3641                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3642            })
3643            .await;
3644
3645        channel_a
3646            .update(cx_a, |channel, cx| {
3647                channel
3648                    .send_message("oh, hi B.".to_string(), cx)
3649                    .unwrap()
3650                    .detach();
3651                let task = channel.send_message("sup".to_string(), cx).unwrap();
3652                assert_eq!(
3653                    channel_messages(channel),
3654                    &[
3655                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3656                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3657                        ("user_a".to_string(), "sup".to_string(), true)
3658                    ]
3659                );
3660                task
3661            })
3662            .await
3663            .unwrap();
3664
3665        channel_b
3666            .condition(&cx_b, |channel, _| {
3667                channel_messages(channel)
3668                    == [
3669                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3670                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3671                        ("user_a".to_string(), "sup".to_string(), false),
3672                    ]
3673            })
3674            .await;
3675
3676        assert_eq!(
3677            server
3678                .state()
3679                .await
3680                .channel(channel_id)
3681                .unwrap()
3682                .connection_ids
3683                .len(),
3684            2
3685        );
3686        cx_b.update(|_| drop(channel_b));
3687        server
3688            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3689            .await;
3690
3691        cx_a.update(|_| drop(channel_a));
3692        server
3693            .condition(|state| state.channel(channel_id).is_none())
3694            .await;
3695    }
3696
3697    #[gpui::test(iterations = 10)]
3698    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3699        cx_a.foreground().forbid_parking();
3700
3701        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3702        let client_a = server.create_client(cx_a, "user_a").await;
3703
3704        let db = &server.app_state.db;
3705        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3706        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3707        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3708            .await
3709            .unwrap();
3710        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3711            .await
3712            .unwrap();
3713
3714        let channels_a = cx_a
3715            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3716        channels_a
3717            .condition(cx_a, |list, _| list.available_channels().is_some())
3718            .await;
3719        let channel_a = channels_a.update(cx_a, |this, cx| {
3720            this.get_channel(channel_id.to_proto(), cx).unwrap()
3721        });
3722
3723        // Messages aren't allowed to be too long.
3724        channel_a
3725            .update(cx_a, |channel, cx| {
3726                let long_body = "this is long.\n".repeat(1024);
3727                channel.send_message(long_body, cx).unwrap()
3728            })
3729            .await
3730            .unwrap_err();
3731
3732        // Messages aren't allowed to be blank.
3733        channel_a.update(cx_a, |channel, cx| {
3734            channel.send_message(String::new(), cx).unwrap_err()
3735        });
3736
3737        // Leading and trailing whitespace are trimmed.
3738        channel_a
3739            .update(cx_a, |channel, cx| {
3740                channel
3741                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3742                    .unwrap()
3743            })
3744            .await
3745            .unwrap();
3746        assert_eq!(
3747            db.get_channel_messages(channel_id, 10, None)
3748                .await
3749                .unwrap()
3750                .iter()
3751                .map(|m| &m.body)
3752                .collect::<Vec<_>>(),
3753            &["surrounded by whitespace"]
3754        );
3755    }
3756
3757    #[gpui::test(iterations = 10)]
3758    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3759        cx_a.foreground().forbid_parking();
3760
3761        // Connect to a server as 2 clients.
3762        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3763        let client_a = server.create_client(cx_a, "user_a").await;
3764        let client_b = server.create_client(cx_b, "user_b").await;
3765        let mut status_b = client_b.status();
3766
3767        // Create an org that includes these 2 users.
3768        let db = &server.app_state.db;
3769        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3770        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3771            .await
3772            .unwrap();
3773        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3774            .await
3775            .unwrap();
3776
3777        // Create a channel that includes all the users.
3778        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3779        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3780            .await
3781            .unwrap();
3782        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3783            .await
3784            .unwrap();
3785        db.create_channel_message(
3786            channel_id,
3787            client_b.current_user_id(&cx_b),
3788            "hello A, it's B.",
3789            OffsetDateTime::now_utc(),
3790            2,
3791        )
3792        .await
3793        .unwrap();
3794
3795        let channels_a = cx_a
3796            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3797        channels_a
3798            .condition(cx_a, |list, _| list.available_channels().is_some())
3799            .await;
3800
3801        channels_a.read_with(cx_a, |list, _| {
3802            assert_eq!(
3803                list.available_channels().unwrap(),
3804                &[ChannelDetails {
3805                    id: channel_id.to_proto(),
3806                    name: "test-channel".to_string()
3807                }]
3808            )
3809        });
3810        let channel_a = channels_a.update(cx_a, |this, cx| {
3811            this.get_channel(channel_id.to_proto(), cx).unwrap()
3812        });
3813        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3814        channel_a
3815            .condition(&cx_a, |channel, _| {
3816                channel_messages(channel)
3817                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3818            })
3819            .await;
3820
3821        let channels_b = cx_b
3822            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3823        channels_b
3824            .condition(cx_b, |list, _| list.available_channels().is_some())
3825            .await;
3826        channels_b.read_with(cx_b, |list, _| {
3827            assert_eq!(
3828                list.available_channels().unwrap(),
3829                &[ChannelDetails {
3830                    id: channel_id.to_proto(),
3831                    name: "test-channel".to_string()
3832                }]
3833            )
3834        });
3835
3836        let channel_b = channels_b.update(cx_b, |this, cx| {
3837            this.get_channel(channel_id.to_proto(), cx).unwrap()
3838        });
3839        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3840        channel_b
3841            .condition(&cx_b, |channel, _| {
3842                channel_messages(channel)
3843                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3844            })
3845            .await;
3846
3847        // Disconnect client B, ensuring we can still access its cached channel data.
3848        server.forbid_connections();
3849        server.disconnect_client(client_b.current_user_id(&cx_b));
3850        while !matches!(
3851            status_b.next().await,
3852            Some(client::Status::ReconnectionError { .. })
3853        ) {}
3854
3855        channels_b.read_with(cx_b, |channels, _| {
3856            assert_eq!(
3857                channels.available_channels().unwrap(),
3858                [ChannelDetails {
3859                    id: channel_id.to_proto(),
3860                    name: "test-channel".to_string()
3861                }]
3862            )
3863        });
3864        channel_b.read_with(cx_b, |channel, _| {
3865            assert_eq!(
3866                channel_messages(channel),
3867                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3868            )
3869        });
3870
3871        // Send a message from client B while it is disconnected.
3872        channel_b
3873            .update(cx_b, |channel, cx| {
3874                let task = channel
3875                    .send_message("can you see this?".to_string(), cx)
3876                    .unwrap();
3877                assert_eq!(
3878                    channel_messages(channel),
3879                    &[
3880                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3881                        ("user_b".to_string(), "can you see this?".to_string(), true)
3882                    ]
3883                );
3884                task
3885            })
3886            .await
3887            .unwrap_err();
3888
3889        // Send a message from client A while B is disconnected.
3890        channel_a
3891            .update(cx_a, |channel, cx| {
3892                channel
3893                    .send_message("oh, hi B.".to_string(), cx)
3894                    .unwrap()
3895                    .detach();
3896                let task = channel.send_message("sup".to_string(), cx).unwrap();
3897                assert_eq!(
3898                    channel_messages(channel),
3899                    &[
3900                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3901                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3902                        ("user_a".to_string(), "sup".to_string(), true)
3903                    ]
3904                );
3905                task
3906            })
3907            .await
3908            .unwrap();
3909
3910        // Give client B a chance to reconnect.
3911        server.allow_connections();
3912        cx_b.foreground().advance_clock(Duration::from_secs(10));
3913
3914        // Verify that B sees the new messages upon reconnection, as well as the message client B
3915        // sent while offline.
3916        channel_b
3917            .condition(&cx_b, |channel, _| {
3918                channel_messages(channel)
3919                    == [
3920                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3921                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3922                        ("user_a".to_string(), "sup".to_string(), false),
3923                        ("user_b".to_string(), "can you see this?".to_string(), false),
3924                    ]
3925            })
3926            .await;
3927
3928        // Ensure client A and B can communicate normally after reconnection.
3929        channel_a
3930            .update(cx_a, |channel, cx| {
3931                channel.send_message("you online?".to_string(), cx).unwrap()
3932            })
3933            .await
3934            .unwrap();
3935        channel_b
3936            .condition(&cx_b, |channel, _| {
3937                channel_messages(channel)
3938                    == [
3939                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3940                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3941                        ("user_a".to_string(), "sup".to_string(), false),
3942                        ("user_b".to_string(), "can you see this?".to_string(), false),
3943                        ("user_a".to_string(), "you online?".to_string(), false),
3944                    ]
3945            })
3946            .await;
3947
3948        channel_b
3949            .update(cx_b, |channel, cx| {
3950                channel.send_message("yep".to_string(), cx).unwrap()
3951            })
3952            .await
3953            .unwrap();
3954        channel_a
3955            .condition(&cx_a, |channel, _| {
3956                channel_messages(channel)
3957                    == [
3958                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3959                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3960                        ("user_a".to_string(), "sup".to_string(), false),
3961                        ("user_b".to_string(), "can you see this?".to_string(), false),
3962                        ("user_a".to_string(), "you online?".to_string(), false),
3963                        ("user_b".to_string(), "yep".to_string(), false),
3964                    ]
3965            })
3966            .await;
3967    }
3968
3969    #[gpui::test(iterations = 10)]
3970    async fn test_contacts(
3971        cx_a: &mut TestAppContext,
3972        cx_b: &mut TestAppContext,
3973        cx_c: &mut TestAppContext,
3974    ) {
3975        cx_a.foreground().forbid_parking();
3976        let lang_registry = Arc::new(LanguageRegistry::new());
3977        let fs = FakeFs::new(cx_a.background());
3978
3979        // Connect to a server as 3 clients.
3980        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3981        let client_a = server.create_client(cx_a, "user_a").await;
3982        let client_b = server.create_client(cx_b, "user_b").await;
3983        let client_c = server.create_client(cx_c, "user_c").await;
3984
3985        // Share a worktree as client A.
3986        fs.insert_tree(
3987            "/a",
3988            json!({
3989                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3990            }),
3991        )
3992        .await;
3993
3994        let project_a = cx_a.update(|cx| {
3995            Project::local(
3996                client_a.clone(),
3997                client_a.user_store.clone(),
3998                lang_registry.clone(),
3999                fs.clone(),
4000                cx,
4001            )
4002        });
4003        let (worktree_a, _) = project_a
4004            .update(cx_a, |p, cx| {
4005                p.find_or_create_local_worktree("/a", false, cx)
4006            })
4007            .await
4008            .unwrap();
4009        worktree_a
4010            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4011            .await;
4012
4013        client_a
4014            .user_store
4015            .condition(&cx_a, |user_store, _| {
4016                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4017            })
4018            .await;
4019        client_b
4020            .user_store
4021            .condition(&cx_b, |user_store, _| {
4022                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4023            })
4024            .await;
4025        client_c
4026            .user_store
4027            .condition(&cx_c, |user_store, _| {
4028                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4029            })
4030            .await;
4031
4032        let project_id = project_a
4033            .update(cx_a, |project, _| project.next_remote_id())
4034            .await;
4035        project_a
4036            .update(cx_a, |project, cx| project.share(cx))
4037            .await
4038            .unwrap();
4039
4040        let _project_b = Project::remote(
4041            project_id,
4042            client_b.clone(),
4043            client_b.user_store.clone(),
4044            lang_registry.clone(),
4045            fs.clone(),
4046            &mut cx_b.to_async(),
4047        )
4048        .await
4049        .unwrap();
4050
4051        client_a
4052            .user_store
4053            .condition(&cx_a, |user_store, _| {
4054                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4055            })
4056            .await;
4057        client_b
4058            .user_store
4059            .condition(&cx_b, |user_store, _| {
4060                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4061            })
4062            .await;
4063        client_c
4064            .user_store
4065            .condition(&cx_c, |user_store, _| {
4066                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4067            })
4068            .await;
4069
4070        project_a
4071            .condition(&cx_a, |project, _| {
4072                project.collaborators().contains_key(&client_b.peer_id)
4073            })
4074            .await;
4075
4076        cx_a.update(move |_| drop(project_a));
4077        client_a
4078            .user_store
4079            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4080            .await;
4081        client_b
4082            .user_store
4083            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4084            .await;
4085        client_c
4086            .user_store
4087            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4088            .await;
4089
4090        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4091            user_store
4092                .contacts()
4093                .iter()
4094                .map(|contact| {
4095                    let worktrees = contact
4096                        .projects
4097                        .iter()
4098                        .map(|p| {
4099                            (
4100                                p.worktree_root_names[0].as_str(),
4101                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4102                            )
4103                        })
4104                        .collect();
4105                    (contact.user.github_login.as_str(), worktrees)
4106                })
4107                .collect()
4108        }
4109    }
4110
4111    #[gpui::test(iterations = 100)]
4112    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4113        cx.foreground().forbid_parking();
4114        let max_peers = env::var("MAX_PEERS")
4115            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4116            .unwrap_or(5);
4117        let max_operations = env::var("OPERATIONS")
4118            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4119            .unwrap_or(10);
4120
4121        let rng = Arc::new(Mutex::new(rng));
4122
4123        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4124        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4125
4126        let fs = FakeFs::new(cx.background());
4127        fs.insert_tree(
4128            "/_collab",
4129            json!({
4130                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4131            }),
4132        )
4133        .await;
4134
4135        let operations = Rc::new(Cell::new(0));
4136        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4137        let mut clients = Vec::new();
4138
4139        let mut next_entity_id = 100000;
4140        let mut host_cx = TestAppContext::new(
4141            cx.foreground_platform(),
4142            cx.platform(),
4143            cx.foreground(),
4144            cx.background(),
4145            cx.font_cache(),
4146            cx.leak_detector(),
4147            next_entity_id,
4148        );
4149        let host = server.create_client(&mut host_cx, "host").await;
4150        let host_project = host_cx.update(|cx| {
4151            Project::local(
4152                host.client.clone(),
4153                host.user_store.clone(),
4154                Arc::new(LanguageRegistry::new()),
4155                fs.clone(),
4156                cx,
4157            )
4158        });
4159        let host_project_id = host_project
4160            .update(&mut host_cx, |p, _| p.next_remote_id())
4161            .await;
4162
4163        let (collab_worktree, _) = host_project
4164            .update(&mut host_cx, |project, cx| {
4165                project.find_or_create_local_worktree("/_collab", false, cx)
4166            })
4167            .await
4168            .unwrap();
4169        collab_worktree
4170            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4171            .await;
4172        host_project
4173            .update(&mut host_cx, |project, cx| project.share(cx))
4174            .await
4175            .unwrap();
4176
4177        clients.push(cx.foreground().spawn(host.simulate_host(
4178            host_project.clone(),
4179            language_server_config,
4180            operations.clone(),
4181            max_operations,
4182            rng.clone(),
4183            host_cx,
4184        )));
4185
4186        while operations.get() < max_operations {
4187            cx.background().simulate_random_delay().await;
4188            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4189                operations.set(operations.get() + 1);
4190
4191                let guest_id = clients.len();
4192                log::info!("Adding guest {}", guest_id);
4193                next_entity_id += 100000;
4194                let mut guest_cx = TestAppContext::new(
4195                    cx.foreground_platform(),
4196                    cx.platform(),
4197                    cx.foreground(),
4198                    cx.background(),
4199                    cx.font_cache(),
4200                    cx.leak_detector(),
4201                    next_entity_id,
4202                );
4203                let guest = server
4204                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4205                    .await;
4206                let guest_project = Project::remote(
4207                    host_project_id,
4208                    guest.client.clone(),
4209                    guest.user_store.clone(),
4210                    guest_lang_registry.clone(),
4211                    FakeFs::new(cx.background()),
4212                    &mut guest_cx.to_async(),
4213                )
4214                .await
4215                .unwrap();
4216                clients.push(cx.foreground().spawn(guest.simulate_guest(
4217                    guest_id,
4218                    guest_project,
4219                    operations.clone(),
4220                    max_operations,
4221                    rng.clone(),
4222                    guest_cx,
4223                )));
4224
4225                log::info!("Guest {} added", guest_id);
4226            }
4227        }
4228
4229        let mut clients = futures::future::join_all(clients).await;
4230        cx.foreground().run_until_parked();
4231
4232        let (_, host_cx) = clients.remove(0);
4233        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4234            project
4235                .worktrees(cx)
4236                .map(|worktree| {
4237                    let snapshot = worktree.read(cx).snapshot();
4238                    (snapshot.id(), snapshot)
4239                })
4240                .collect::<BTreeMap<_, _>>()
4241        });
4242
4243        for (guest_client, guest_cx) in clients.iter() {
4244            let guest_id = guest_client.client.id();
4245            let worktree_snapshots =
4246                guest_client
4247                    .project
4248                    .as_ref()
4249                    .unwrap()
4250                    .read_with(guest_cx, |project, cx| {
4251                        project
4252                            .worktrees(cx)
4253                            .map(|worktree| {
4254                                let worktree = worktree.read(cx);
4255                                (worktree.id(), worktree.snapshot())
4256                            })
4257                            .collect::<BTreeMap<_, _>>()
4258                    });
4259
4260            assert_eq!(
4261                worktree_snapshots.keys().collect::<Vec<_>>(),
4262                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4263                "guest {} has different worktrees than the host",
4264                guest_id
4265            );
4266            for (id, host_snapshot) in &host_worktree_snapshots {
4267                let guest_snapshot = &worktree_snapshots[id];
4268                assert_eq!(
4269                    guest_snapshot.root_name(),
4270                    host_snapshot.root_name(),
4271                    "guest {} has different root name than the host for worktree {}",
4272                    guest_id,
4273                    id
4274                );
4275                assert_eq!(
4276                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4277                    host_snapshot.entries(false).collect::<Vec<_>>(),
4278                    "guest {} has different snapshot than the host for worktree {}",
4279                    guest_id,
4280                    id
4281                );
4282            }
4283
4284            guest_client
4285                .project
4286                .as_ref()
4287                .unwrap()
4288                .read_with(guest_cx, |project, cx| {
4289                    assert!(
4290                        !project.has_deferred_operations(cx),
4291                        "guest {} has deferred operations",
4292                        guest_id,
4293                    );
4294                });
4295
4296            for guest_buffer in &guest_client.buffers {
4297                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4298                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4299                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4300                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4301                        guest_id, guest_client.peer_id, buffer_id
4302                    ))
4303                });
4304                assert_eq!(
4305                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4306                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4307                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4308                    guest_id,
4309                    buffer_id,
4310                    host_buffer
4311                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4312                );
4313            }
4314        }
4315    }
4316
4317    struct TestServer {
4318        peer: Arc<Peer>,
4319        app_state: Arc<AppState>,
4320        server: Arc<Server>,
4321        foreground: Rc<executor::Foreground>,
4322        notifications: mpsc::UnboundedReceiver<()>,
4323        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4324        forbid_connections: Arc<AtomicBool>,
4325        _test_db: TestDb,
4326    }
4327
4328    impl TestServer {
4329        async fn start(
4330            foreground: Rc<executor::Foreground>,
4331            background: Arc<executor::Background>,
4332        ) -> Self {
4333            let test_db = TestDb::fake(background);
4334            let app_state = Self::build_app_state(&test_db).await;
4335            let peer = Peer::new();
4336            let notifications = mpsc::unbounded();
4337            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4338            Self {
4339                peer,
4340                app_state,
4341                server,
4342                foreground,
4343                notifications: notifications.1,
4344                connection_killers: Default::default(),
4345                forbid_connections: Default::default(),
4346                _test_db: test_db,
4347            }
4348        }
4349
4350        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4351            let http = FakeHttpClient::with_404_response();
4352            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4353            let client_name = name.to_string();
4354            let mut client = Client::new(http.clone());
4355            let server = self.server.clone();
4356            let connection_killers = self.connection_killers.clone();
4357            let forbid_connections = self.forbid_connections.clone();
4358            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4359
4360            Arc::get_mut(&mut client)
4361                .unwrap()
4362                .override_authenticate(move |cx| {
4363                    cx.spawn(|_| async move {
4364                        let access_token = "the-token".to_string();
4365                        Ok(Credentials {
4366                            user_id: user_id.0 as u64,
4367                            access_token,
4368                        })
4369                    })
4370                })
4371                .override_establish_connection(move |credentials, cx| {
4372                    assert_eq!(credentials.user_id, user_id.0 as u64);
4373                    assert_eq!(credentials.access_token, "the-token");
4374
4375                    let server = server.clone();
4376                    let connection_killers = connection_killers.clone();
4377                    let forbid_connections = forbid_connections.clone();
4378                    let client_name = client_name.clone();
4379                    let connection_id_tx = connection_id_tx.clone();
4380                    cx.spawn(move |cx| async move {
4381                        if forbid_connections.load(SeqCst) {
4382                            Err(EstablishConnectionError::other(anyhow!(
4383                                "server is forbidding connections"
4384                            )))
4385                        } else {
4386                            let (client_conn, server_conn, kill_conn) =
4387                                Connection::in_memory(cx.background());
4388                            connection_killers.lock().insert(user_id, kill_conn);
4389                            cx.background()
4390                                .spawn(server.handle_connection(
4391                                    server_conn,
4392                                    client_name,
4393                                    user_id,
4394                                    Some(connection_id_tx),
4395                                    cx.background(),
4396                                ))
4397                                .detach();
4398                            Ok(client_conn)
4399                        }
4400                    })
4401                });
4402
4403            client
4404                .authenticate_and_connect(&cx.to_async())
4405                .await
4406                .unwrap();
4407
4408            Channel::init(&client);
4409            Project::init(&client);
4410
4411            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4412            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4413            let mut authed_user =
4414                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4415            while authed_user.next().await.unwrap().is_none() {}
4416
4417            TestClient {
4418                client,
4419                peer_id,
4420                user_store,
4421                project: Default::default(),
4422                buffers: Default::default(),
4423            }
4424        }
4425
4426        fn disconnect_client(&self, user_id: UserId) {
4427            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4428                let _ = kill_conn.try_send(Some(()));
4429            }
4430        }
4431
4432        fn forbid_connections(&self) {
4433            self.forbid_connections.store(true, SeqCst);
4434        }
4435
4436        fn allow_connections(&self) {
4437            self.forbid_connections.store(false, SeqCst);
4438        }
4439
4440        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4441            let mut config = Config::default();
4442            config.session_secret = "a".repeat(32);
4443            config.database_url = test_db.url.clone();
4444            let github_client = github::AppClient::test();
4445            Arc::new(AppState {
4446                db: test_db.db().clone(),
4447                handlebars: Default::default(),
4448                auth_client: auth::build_client("", ""),
4449                repo_client: github::RepoClient::test(&github_client),
4450                github_client,
4451                config,
4452            })
4453        }
4454
4455        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4456            self.server.store.read()
4457        }
4458
4459        async fn condition<F>(&mut self, mut predicate: F)
4460        where
4461            F: FnMut(&Store) -> bool,
4462        {
4463            async_std::future::timeout(Duration::from_millis(500), async {
4464                while !(predicate)(&*self.server.store.read()) {
4465                    self.foreground.start_waiting();
4466                    self.notifications.next().await;
4467                    self.foreground.finish_waiting();
4468                }
4469            })
4470            .await
4471            .expect("condition timed out");
4472        }
4473    }
4474
4475    impl Drop for TestServer {
4476        fn drop(&mut self) {
4477            self.peer.reset();
4478        }
4479    }
4480
4481    struct TestClient {
4482        client: Arc<Client>,
4483        pub peer_id: PeerId,
4484        pub user_store: ModelHandle<UserStore>,
4485        project: Option<ModelHandle<Project>>,
4486        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4487    }
4488
4489    impl Deref for TestClient {
4490        type Target = Arc<Client>;
4491
4492        fn deref(&self) -> &Self::Target {
4493            &self.client
4494        }
4495    }
4496
4497    impl TestClient {
4498        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4499            UserId::from_proto(
4500                self.user_store
4501                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4502            )
4503        }
4504
4505        fn simulate_host(
4506            mut self,
4507            project: ModelHandle<Project>,
4508            mut language_server_config: LanguageServerConfig,
4509            operations: Rc<Cell<usize>>,
4510            max_operations: usize,
4511            rng: Arc<Mutex<StdRng>>,
4512            mut cx: TestAppContext,
4513        ) -> impl Future<Output = (Self, TestAppContext)> {
4514            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4515
4516            // Set up a fake language server.
4517            language_server_config.set_fake_initializer({
4518                let rng = rng.clone();
4519                let files = files.clone();
4520                let project = project.clone();
4521                move |fake_server| {
4522                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4523                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4524                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4525                                range: lsp::Range::new(
4526                                    lsp::Position::new(0, 0),
4527                                    lsp::Position::new(0, 0),
4528                                ),
4529                                new_text: "the-new-text".to_string(),
4530                            })),
4531                            ..Default::default()
4532                        }]))
4533                    });
4534
4535                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4536                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4537                            lsp::CodeAction {
4538                                title: "the-code-action".to_string(),
4539                                ..Default::default()
4540                            },
4541                        )])
4542                    });
4543
4544                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4545                        |params, _| {
4546                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4547                                params.position,
4548                                params.position,
4549                            )))
4550                        },
4551                    );
4552
4553                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4554                        let files = files.clone();
4555                        let rng = rng.clone();
4556                        move |_, _| {
4557                            let files = files.lock();
4558                            let mut rng = rng.lock();
4559                            let count = rng.gen_range::<usize, _>(1..3);
4560                            let files = (0..count)
4561                                .map(|_| files.choose(&mut *rng).unwrap())
4562                                .collect::<Vec<_>>();
4563                            log::info!("LSP: Returning definitions in files {:?}", &files);
4564                            Some(lsp::GotoDefinitionResponse::Array(
4565                                files
4566                                    .into_iter()
4567                                    .map(|file| lsp::Location {
4568                                        uri: lsp::Url::from_file_path(file).unwrap(),
4569                                        range: Default::default(),
4570                                    })
4571                                    .collect(),
4572                            ))
4573                        }
4574                    });
4575
4576                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4577                        let rng = rng.clone();
4578                        let project = project.clone();
4579                        move |params, mut cx| {
4580                            project.update(&mut cx, |project, cx| {
4581                                let path = params
4582                                    .text_document_position_params
4583                                    .text_document
4584                                    .uri
4585                                    .to_file_path()
4586                                    .unwrap();
4587                                let (worktree, relative_path) =
4588                                    project.find_local_worktree(&path, cx)?;
4589                                let project_path =
4590                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4591                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4592
4593                                let mut highlights = Vec::new();
4594                                let highlight_count = rng.lock().gen_range(1..=5);
4595                                let mut prev_end = 0;
4596                                for _ in 0..highlight_count {
4597                                    let range =
4598                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4599                                    let start =
4600                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4601                                    let end =
4602                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4603                                    highlights.push(lsp::DocumentHighlight {
4604                                        range: lsp::Range::new(start, end),
4605                                        kind: Some(lsp::DocumentHighlightKind::READ),
4606                                    });
4607                                    prev_end = range.end;
4608                                }
4609                                Some(highlights)
4610                            })
4611                        }
4612                    });
4613                }
4614            });
4615
4616            project.update(&mut cx, |project, _| {
4617                project.languages().add(Arc::new(Language::new(
4618                    LanguageConfig {
4619                        name: "Rust".into(),
4620                        path_suffixes: vec!["rs".to_string()],
4621                        language_server: Some(language_server_config),
4622                        ..Default::default()
4623                    },
4624                    None,
4625                )));
4626            });
4627
4628            async move {
4629                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4630                while operations.get() < max_operations {
4631                    operations.set(operations.get() + 1);
4632
4633                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4634                    match distribution {
4635                        0..=20 if !files.lock().is_empty() => {
4636                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4637                            let mut path = path.as_path();
4638                            while let Some(parent_path) = path.parent() {
4639                                path = parent_path;
4640                                if rng.lock().gen() {
4641                                    break;
4642                                }
4643                            }
4644
4645                            log::info!("Host: find/create local worktree {:?}", path);
4646                            project
4647                                .update(&mut cx, |project, cx| {
4648                                    project.find_or_create_local_worktree(path, false, cx)
4649                                })
4650                                .await
4651                                .unwrap();
4652                        }
4653                        10..=80 if !files.lock().is_empty() => {
4654                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4655                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4656                                let (worktree, path) = project
4657                                    .update(&mut cx, |project, cx| {
4658                                        project.find_or_create_local_worktree(
4659                                            file.clone(),
4660                                            false,
4661                                            cx,
4662                                        )
4663                                    })
4664                                    .await
4665                                    .unwrap();
4666                                let project_path =
4667                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4668                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
4669                                let buffer = project
4670                                    .update(&mut cx, |project, cx| {
4671                                        project.open_buffer(project_path, cx)
4672                                    })
4673                                    .await
4674                                    .unwrap();
4675                                self.buffers.insert(buffer.clone());
4676                                buffer
4677                            } else {
4678                                self.buffers
4679                                    .iter()
4680                                    .choose(&mut *rng.lock())
4681                                    .unwrap()
4682                                    .clone()
4683                            };
4684
4685                            if rng.lock().gen_bool(0.1) {
4686                                cx.update(|cx| {
4687                                    log::info!(
4688                                        "Host: dropping buffer {:?}",
4689                                        buffer.read(cx).file().unwrap().full_path(cx)
4690                                    );
4691                                    self.buffers.remove(&buffer);
4692                                    drop(buffer);
4693                                });
4694                            } else {
4695                                buffer.update(&mut cx, |buffer, cx| {
4696                                    log::info!(
4697                                        "Host: updating buffer {:?} ({})",
4698                                        buffer.file().unwrap().full_path(cx),
4699                                        buffer.remote_id()
4700                                    );
4701                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4702                                });
4703                            }
4704                        }
4705                        _ => loop {
4706                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4707                            let mut path = PathBuf::new();
4708                            path.push("/");
4709                            for _ in 0..path_component_count {
4710                                let letter = rng.lock().gen_range(b'a'..=b'z');
4711                                path.push(std::str::from_utf8(&[letter]).unwrap());
4712                            }
4713                            path.set_extension("rs");
4714                            let parent_path = path.parent().unwrap();
4715
4716                            log::info!("Host: creating file {:?}", path,);
4717
4718                            if fs.create_dir(&parent_path).await.is_ok()
4719                                && fs.create_file(&path, Default::default()).await.is_ok()
4720                            {
4721                                files.lock().push(path);
4722                                break;
4723                            } else {
4724                                log::info!("Host: cannot create file");
4725                            }
4726                        },
4727                    }
4728
4729                    cx.background().simulate_random_delay().await;
4730                }
4731
4732                log::info!("Host done");
4733
4734                self.project = Some(project);
4735                (self, cx)
4736            }
4737        }
4738
4739        pub async fn simulate_guest(
4740            mut self,
4741            guest_id: usize,
4742            project: ModelHandle<Project>,
4743            operations: Rc<Cell<usize>>,
4744            max_operations: usize,
4745            rng: Arc<Mutex<StdRng>>,
4746            mut cx: TestAppContext,
4747        ) -> (Self, TestAppContext) {
4748            while operations.get() < max_operations {
4749                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4750                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4751                        project
4752                            .worktrees(&cx)
4753                            .filter(|worktree| {
4754                                let worktree = worktree.read(cx);
4755                                !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
4756                            })
4757                            .choose(&mut *rng.lock())
4758                    }) {
4759                        worktree
4760                    } else {
4761                        cx.background().simulate_random_delay().await;
4762                        continue;
4763                    };
4764
4765                    operations.set(operations.get() + 1);
4766                    let (worktree_root_name, project_path) =
4767                        worktree.read_with(&cx, |worktree, _| {
4768                            let entry = worktree
4769                                .entries(false)
4770                                .filter(|e| e.is_file())
4771                                .choose(&mut *rng.lock())
4772                                .unwrap();
4773                            (
4774                                worktree.root_name().to_string(),
4775                                (worktree.id(), entry.path.clone()),
4776                            )
4777                        });
4778                    log::info!(
4779                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
4780                        guest_id,
4781                        project_path.0,
4782                        worktree_root_name,
4783                        project_path.1
4784                    );
4785                    let buffer = project
4786                        .update(&mut cx, |project, cx| {
4787                            project.open_buffer(project_path.clone(), cx)
4788                        })
4789                        .await
4790                        .unwrap();
4791                    log::info!(
4792                        "Guest {}: path in worktree {:?} {:?} {:?} opened with buffer id {:?}",
4793                        guest_id,
4794                        project_path.0,
4795                        worktree_root_name,
4796                        project_path.1,
4797                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4798                    );
4799                    self.buffers.insert(buffer.clone());
4800                    buffer
4801                } else {
4802                    operations.set(operations.get() + 1);
4803
4804                    self.buffers
4805                        .iter()
4806                        .choose(&mut *rng.lock())
4807                        .unwrap()
4808                        .clone()
4809                };
4810
4811                let choice = rng.lock().gen_range(0..100);
4812                match choice {
4813                    0..=9 => {
4814                        cx.update(|cx| {
4815                            log::info!(
4816                                "Guest {}: dropping buffer {:?}",
4817                                guest_id,
4818                                buffer.read(cx).file().unwrap().full_path(cx)
4819                            );
4820                            self.buffers.remove(&buffer);
4821                            drop(buffer);
4822                        });
4823                    }
4824                    10..=19 => {
4825                        let completions = project.update(&mut cx, |project, cx| {
4826                            log::info!(
4827                                "Guest {}: requesting completions for buffer {:?}",
4828                                guest_id,
4829                                buffer.read(cx).file().unwrap().full_path(cx)
4830                            );
4831                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4832                            project.completions(&buffer, offset, cx)
4833                        });
4834                        let completions = cx.background().spawn(async move {
4835                            completions.await.expect("completions request failed");
4836                        });
4837                        if rng.lock().gen_bool(0.3) {
4838                            log::info!("Guest {}: detaching completions request", guest_id);
4839                            completions.detach();
4840                        } else {
4841                            completions.await;
4842                        }
4843                    }
4844                    20..=29 => {
4845                        let code_actions = project.update(&mut cx, |project, cx| {
4846                            log::info!(
4847                                "Guest {}: requesting code actions for buffer {:?}",
4848                                guest_id,
4849                                buffer.read(cx).file().unwrap().full_path(cx)
4850                            );
4851                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4852                            project.code_actions(&buffer, range, cx)
4853                        });
4854                        let code_actions = cx.background().spawn(async move {
4855                            code_actions.await.expect("code actions request failed");
4856                        });
4857                        if rng.lock().gen_bool(0.3) {
4858                            log::info!("Guest {}: detaching code actions request", guest_id);
4859                            code_actions.detach();
4860                        } else {
4861                            code_actions.await;
4862                        }
4863                    }
4864                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4865                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4866                            log::info!(
4867                                "Guest {}: saving buffer {:?}",
4868                                guest_id,
4869                                buffer.file().unwrap().full_path(cx)
4870                            );
4871                            (buffer.version(), buffer.save(cx))
4872                        });
4873                        let save = cx.spawn(|cx| async move {
4874                            let (saved_version, _) = save.await.expect("save request failed");
4875                            buffer.read_with(&cx, |buffer, _| {
4876                                assert!(buffer.version().observed_all(&saved_version));
4877                                assert!(saved_version.observed_all(&requested_version));
4878                            });
4879                        });
4880                        if rng.lock().gen_bool(0.3) {
4881                            log::info!("Guest {}: detaching save request", guest_id);
4882                            save.detach();
4883                        } else {
4884                            save.await;
4885                        }
4886                    }
4887                    40..=44 => {
4888                        let prepare_rename = project.update(&mut cx, |project, cx| {
4889                            log::info!(
4890                                "Guest {}: preparing rename for buffer {:?}",
4891                                guest_id,
4892                                buffer.read(cx).file().unwrap().full_path(cx)
4893                            );
4894                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4895                            project.prepare_rename(buffer, offset, cx)
4896                        });
4897                        let prepare_rename = cx.background().spawn(async move {
4898                            prepare_rename.await.expect("prepare rename request failed");
4899                        });
4900                        if rng.lock().gen_bool(0.3) {
4901                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4902                            prepare_rename.detach();
4903                        } else {
4904                            prepare_rename.await;
4905                        }
4906                    }
4907                    45..=49 => {
4908                        let definitions = project.update(&mut cx, |project, cx| {
4909                            log::info!(
4910                                "Guest {}: requesting definitions for buffer {:?}",
4911                                guest_id,
4912                                buffer.read(cx).file().unwrap().full_path(cx)
4913                            );
4914                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4915                            project.definition(&buffer, offset, cx)
4916                        });
4917                        let definitions = cx.background().spawn(async move {
4918                            definitions.await.expect("definitions request failed")
4919                        });
4920                        if rng.lock().gen_bool(0.3) {
4921                            log::info!("Guest {}: detaching definitions request", guest_id);
4922                            definitions.detach();
4923                        } else {
4924                            self.buffers
4925                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
4926                        }
4927                    }
4928                    50..=54 => {
4929                        let highlights = project.update(&mut cx, |project, cx| {
4930                            log::info!(
4931                                "Guest {}: requesting highlights for buffer {:?}",
4932                                guest_id,
4933                                buffer.read(cx).file().unwrap().full_path(cx)
4934                            );
4935                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4936                            project.document_highlights(&buffer, offset, cx)
4937                        });
4938                        let highlights = cx.background().spawn(async move {
4939                            highlights.await.expect("highlights request failed");
4940                        });
4941                        if rng.lock().gen_bool(0.3) {
4942                            log::info!("Guest {}: detaching highlights request", guest_id);
4943                            highlights.detach();
4944                        } else {
4945                            highlights.await;
4946                        }
4947                    }
4948                    55..=59 => {
4949                        let search = project.update(&mut cx, |project, cx| {
4950                            let query = rng.lock().gen_range('a'..='z');
4951                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
4952                            project.search(SearchQuery::text(query, false, false), cx)
4953                        });
4954                        let search = cx
4955                            .background()
4956                            .spawn(async move { search.await.expect("search request failed") });
4957                        if rng.lock().gen_bool(0.3) {
4958                            log::info!("Guest {}: detaching search request", guest_id);
4959                            search.detach();
4960                        } else {
4961                            self.buffers.extend(search.await.into_keys());
4962                        }
4963                    }
4964                    _ => {
4965                        buffer.update(&mut cx, |buffer, cx| {
4966                            log::info!(
4967                                "Guest {}: updating buffer {:?}",
4968                                guest_id,
4969                                buffer.file().unwrap().full_path(cx)
4970                            );
4971                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4972                        });
4973                    }
4974                }
4975                cx.background().simulate_random_delay().await;
4976            }
4977
4978            log::info!("Guest {} done", guest_id);
4979
4980            self.project = Some(project);
4981            (self, cx)
4982        }
4983    }
4984
4985    impl Drop for TestClient {
4986        fn drop(&mut self) {
4987            self.client.tear_down();
4988        }
4989    }
4990
4991    impl Executor for Arc<gpui::executor::Background> {
4992        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4993            self.spawn(future).detach();
4994        }
4995    }
4996
4997    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4998        channel
4999            .messages()
5000            .cursor::<()>()
5001            .map(|m| {
5002                (
5003                    m.sender.github_login.clone(),
5004                    m.body.clone(),
5005                    m.is_pending(),
5006                )
5007            })
5008            .collect()
5009    }
5010
5011    struct EmptyView;
5012
5013    impl gpui::Entity for EmptyView {
5014        type Event = ();
5015    }
5016
5017    impl gpui::View for EmptyView {
5018        fn ui_name() -> &'static str {
5019            "empty view"
5020        }
5021
5022        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5023            gpui::Element::boxed(gpui::elements::Empty)
5024        }
5025    }
5026}