rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  81            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  82            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  83            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  84            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  85            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  86            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  87            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  88            .add_request_handler(
  89                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  90            )
  91            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
  92            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
  93            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
  94            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
  95            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
  96            .add_message_handler(Server::close_buffer)
  97            .add_request_handler(Server::update_buffer)
  98            .add_message_handler(Server::update_buffer_file)
  99            .add_message_handler(Server::buffer_reloaded)
 100            .add_message_handler(Server::buffer_saved)
 101            .add_request_handler(Server::save_buffer)
 102            .add_request_handler(Server::get_channels)
 103            .add_request_handler(Server::get_users)
 104            .add_request_handler(Server::join_channel)
 105            .add_message_handler(Server::leave_channel)
 106            .add_request_handler(Server::send_channel_message)
 107            .add_request_handler(Server::get_channel_messages);
 108
 109        Arc::new(server)
 110    }
 111
 112    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 113    where
 114        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 115        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 116        M: EnvelopedMessage,
 117    {
 118        let prev_handler = self.handlers.insert(
 119            TypeId::of::<M>(),
 120            Box::new(move |server, envelope| {
 121                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 122                (handler)(server, *envelope).boxed()
 123            }),
 124        );
 125        if prev_handler.is_some() {
 126            panic!("registered a handler for the same message twice");
 127        }
 128        self
 129    }
 130
 131    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 132    where
 133        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 134        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 135        M: RequestMessage,
 136    {
 137        self.add_message_handler(move |server, envelope| {
 138            let receipt = envelope.receipt();
 139            let response = (handler)(server.clone(), envelope);
 140            async move {
 141                match response.await {
 142                    Ok(response) => {
 143                        server.peer.respond(receipt, response)?;
 144                        Ok(())
 145                    }
 146                    Err(error) => {
 147                        server.peer.respond_with_error(
 148                            receipt,
 149                            proto::Error {
 150                                message: error.to_string(),
 151                            },
 152                        )?;
 153                        Err(error)
 154                    }
 155                }
 156            }
 157        })
 158    }
 159
 160    pub fn handle_connection<E: Executor>(
 161        self: &Arc<Self>,
 162        connection: Connection,
 163        addr: String,
 164        user_id: UserId,
 165        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 166        executor: E,
 167    ) -> impl Future<Output = ()> {
 168        let mut this = self.clone();
 169        async move {
 170            let (connection_id, handle_io, mut incoming_rx) =
 171                this.peer.add_connection(connection).await;
 172
 173            if let Some(send_connection_id) = send_connection_id.as_mut() {
 174                let _ = send_connection_id.send(connection_id).await;
 175            }
 176
 177            this.state_mut().add_connection(connection_id, user_id);
 178            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 179                log::error!("error updating contacts for {:?}: {}", user_id, err);
 180            }
 181
 182            let handle_io = handle_io.fuse();
 183            futures::pin_mut!(handle_io);
 184            loop {
 185                let next_message = incoming_rx.next().fuse();
 186                futures::pin_mut!(next_message);
 187                futures::select_biased! {
 188                    result = handle_io => {
 189                        if let Err(err) = result {
 190                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 191                        }
 192                        break;
 193                    }
 194                    message = next_message => {
 195                        if let Some(message) = message {
 196                            let start_time = Instant::now();
 197                            let type_name = message.payload_type_name();
 198                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 199                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 200                                let notifications = this.notifications.clone();
 201                                let is_background = message.is_background();
 202                                let handle_message = (handler)(this.clone(), message);
 203                                let handle_message = async move {
 204                                    if let Err(err) = handle_message.await {
 205                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 206                                    } else {
 207                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 208                                    }
 209                                    if let Some(mut notifications) = notifications {
 210                                        let _ = notifications.send(()).await;
 211                                    }
 212                                };
 213                                if is_background {
 214                                    executor.spawn_detached(handle_message);
 215                                } else {
 216                                    handle_message.await;
 217                                }
 218                            } else {
 219                                log::warn!("unhandled message: {}", type_name);
 220                            }
 221                        } else {
 222                            log::info!("rpc connection closed {:?}", addr);
 223                            break;
 224                        }
 225                    }
 226                }
 227            }
 228
 229            if let Err(err) = this.sign_out(connection_id).await {
 230                log::error!("error signing out connection {:?} - {:?}", addr, err);
 231            }
 232        }
 233    }
 234
 235    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 236        self.peer.disconnect(connection_id);
 237        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 238
 239        for (project_id, project) in removed_connection.hosted_projects {
 240            if let Some(share) = project.share {
 241                broadcast(
 242                    connection_id,
 243                    share.guests.keys().copied().collect(),
 244                    |conn_id| {
 245                        self.peer
 246                            .send(conn_id, proto::UnshareProject { project_id })
 247                    },
 248                )?;
 249            }
 250        }
 251
 252        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 253            broadcast(connection_id, peer_ids, |conn_id| {
 254                self.peer.send(
 255                    conn_id,
 256                    proto::RemoveProjectCollaborator {
 257                        project_id,
 258                        peer_id: connection_id.0,
 259                    },
 260                )
 261            })?;
 262        }
 263
 264        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 265        Ok(())
 266    }
 267
 268    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 269        Ok(proto::Ack {})
 270    }
 271
 272    async fn register_project(
 273        mut self: Arc<Server>,
 274        request: TypedEnvelope<proto::RegisterProject>,
 275    ) -> tide::Result<proto::RegisterProjectResponse> {
 276        let project_id = {
 277            let mut state = self.state_mut();
 278            let user_id = state.user_id_for_connection(request.sender_id)?;
 279            state.register_project(request.sender_id, user_id)
 280        };
 281        Ok(proto::RegisterProjectResponse { project_id })
 282    }
 283
 284    async fn unregister_project(
 285        mut self: Arc<Server>,
 286        request: TypedEnvelope<proto::UnregisterProject>,
 287    ) -> tide::Result<()> {
 288        let project = self
 289            .state_mut()
 290            .unregister_project(request.payload.project_id, request.sender_id)?;
 291        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 292        Ok(())
 293    }
 294
 295    async fn share_project(
 296        mut self: Arc<Server>,
 297        request: TypedEnvelope<proto::ShareProject>,
 298    ) -> tide::Result<proto::Ack> {
 299        self.state_mut()
 300            .share_project(request.payload.project_id, request.sender_id);
 301        Ok(proto::Ack {})
 302    }
 303
 304    async fn unshare_project(
 305        mut self: Arc<Server>,
 306        request: TypedEnvelope<proto::UnshareProject>,
 307    ) -> tide::Result<()> {
 308        let project_id = request.payload.project_id;
 309        let project = self
 310            .state_mut()
 311            .unshare_project(project_id, request.sender_id)?;
 312
 313        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 314            self.peer
 315                .send(conn_id, proto::UnshareProject { project_id })
 316        })?;
 317        self.update_contacts_for_users(&project.authorized_user_ids)?;
 318        Ok(())
 319    }
 320
 321    async fn join_project(
 322        mut self: Arc<Server>,
 323        request: TypedEnvelope<proto::JoinProject>,
 324    ) -> tide::Result<proto::JoinProjectResponse> {
 325        let project_id = request.payload.project_id;
 326
 327        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 328        let (response, connection_ids, contact_user_ids) = self
 329            .state_mut()
 330            .join_project(request.sender_id, user_id, project_id)
 331            .and_then(|joined| {
 332                let share = joined.project.share()?;
 333                let peer_count = share.guests.len();
 334                let mut collaborators = Vec::with_capacity(peer_count);
 335                collaborators.push(proto::Collaborator {
 336                    peer_id: joined.project.host_connection_id.0,
 337                    replica_id: 0,
 338                    user_id: joined.project.host_user_id.to_proto(),
 339                });
 340                let worktrees = share
 341                    .worktrees
 342                    .iter()
 343                    .filter_map(|(id, shared_worktree)| {
 344                        let worktree = joined.project.worktrees.get(&id)?;
 345                        Some(proto::Worktree {
 346                            id: *id,
 347                            root_name: worktree.root_name.clone(),
 348                            entries: shared_worktree.entries.values().cloned().collect(),
 349                            diagnostic_summaries: shared_worktree
 350                                .diagnostic_summaries
 351                                .values()
 352                                .cloned()
 353                                .collect(),
 354                            weak: worktree.weak,
 355                        })
 356                    })
 357                    .collect();
 358                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 359                    if *peer_conn_id != request.sender_id {
 360                        collaborators.push(proto::Collaborator {
 361                            peer_id: peer_conn_id.0,
 362                            replica_id: *peer_replica_id as u32,
 363                            user_id: peer_user_id.to_proto(),
 364                        });
 365                    }
 366                }
 367                let response = proto::JoinProjectResponse {
 368                    worktrees,
 369                    replica_id: joined.replica_id as u32,
 370                    collaborators,
 371                };
 372                let connection_ids = joined.project.connection_ids();
 373                let contact_user_ids = joined.project.authorized_user_ids();
 374                Ok((response, connection_ids, contact_user_ids))
 375            })?;
 376
 377        broadcast(request.sender_id, connection_ids, |conn_id| {
 378            self.peer.send(
 379                conn_id,
 380                proto::AddProjectCollaborator {
 381                    project_id,
 382                    collaborator: Some(proto::Collaborator {
 383                        peer_id: request.sender_id.0,
 384                        replica_id: response.replica_id,
 385                        user_id: user_id.to_proto(),
 386                    }),
 387                },
 388            )
 389        })?;
 390        self.update_contacts_for_users(&contact_user_ids)?;
 391        Ok(response)
 392    }
 393
 394    async fn leave_project(
 395        mut self: Arc<Server>,
 396        request: TypedEnvelope<proto::LeaveProject>,
 397    ) -> tide::Result<()> {
 398        let sender_id = request.sender_id;
 399        let project_id = request.payload.project_id;
 400        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 401
 402        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 403            self.peer.send(
 404                conn_id,
 405                proto::RemoveProjectCollaborator {
 406                    project_id,
 407                    peer_id: sender_id.0,
 408                },
 409            )
 410        })?;
 411        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 412
 413        Ok(())
 414    }
 415
 416    async fn register_worktree(
 417        mut self: Arc<Server>,
 418        request: TypedEnvelope<proto::RegisterWorktree>,
 419    ) -> tide::Result<proto::Ack> {
 420        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 421
 422        let mut contact_user_ids = HashSet::default();
 423        contact_user_ids.insert(host_user_id);
 424        for github_login in &request.payload.authorized_logins {
 425            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 426            contact_user_ids.insert(contact_user_id);
 427        }
 428
 429        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 430        let guest_connection_ids;
 431        {
 432            let mut state = self.state_mut();
 433            guest_connection_ids = state
 434                .read_project(request.payload.project_id, request.sender_id)?
 435                .guest_connection_ids();
 436            state.register_worktree(
 437                request.payload.project_id,
 438                request.payload.worktree_id,
 439                request.sender_id,
 440                Worktree {
 441                    authorized_user_ids: contact_user_ids.clone(),
 442                    root_name: request.payload.root_name.clone(),
 443                    weak: request.payload.weak,
 444                },
 445            )?;
 446        }
 447        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 448            self.peer
 449                .forward_send(request.sender_id, connection_id, request.payload.clone())
 450        })?;
 451        self.update_contacts_for_users(&contact_user_ids)?;
 452        Ok(proto::Ack {})
 453    }
 454
 455    async fn unregister_worktree(
 456        mut self: Arc<Server>,
 457        request: TypedEnvelope<proto::UnregisterWorktree>,
 458    ) -> tide::Result<()> {
 459        let project_id = request.payload.project_id;
 460        let worktree_id = request.payload.worktree_id;
 461        let (worktree, guest_connection_ids) =
 462            self.state_mut()
 463                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 464        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 465            self.peer.send(
 466                conn_id,
 467                proto::UnregisterWorktree {
 468                    project_id,
 469                    worktree_id,
 470                },
 471            )
 472        })?;
 473        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 474        Ok(())
 475    }
 476
 477    async fn update_worktree(
 478        mut self: Arc<Server>,
 479        request: TypedEnvelope<proto::UpdateWorktree>,
 480    ) -> tide::Result<proto::Ack> {
 481        let connection_ids = self.state_mut().update_worktree(
 482            request.sender_id,
 483            request.payload.project_id,
 484            request.payload.worktree_id,
 485            &request.payload.removed_entries,
 486            &request.payload.updated_entries,
 487        )?;
 488
 489        broadcast(request.sender_id, connection_ids, |connection_id| {
 490            self.peer
 491                .forward_send(request.sender_id, connection_id, request.payload.clone())
 492        })?;
 493
 494        Ok(proto::Ack {})
 495    }
 496
 497    async fn update_diagnostic_summary(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 500    ) -> tide::Result<()> {
 501        let summary = request
 502            .payload
 503            .summary
 504            .clone()
 505            .ok_or_else(|| anyhow!("invalid summary"))?;
 506        let receiver_ids = self.state_mut().update_diagnostic_summary(
 507            request.payload.project_id,
 508            request.payload.worktree_id,
 509            request.sender_id,
 510            summary,
 511        )?;
 512
 513        broadcast(request.sender_id, receiver_ids, |connection_id| {
 514            self.peer
 515                .forward_send(request.sender_id, connection_id, request.payload.clone())
 516        })?;
 517        Ok(())
 518    }
 519
 520    async fn disk_based_diagnostics_updating(
 521        self: Arc<Server>,
 522        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 523    ) -> tide::Result<()> {
 524        let receiver_ids = self
 525            .state()
 526            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 527        broadcast(request.sender_id, receiver_ids, |connection_id| {
 528            self.peer
 529                .forward_send(request.sender_id, connection_id, request.payload.clone())
 530        })?;
 531        Ok(())
 532    }
 533
 534    async fn disk_based_diagnostics_updated(
 535        self: Arc<Server>,
 536        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 537    ) -> tide::Result<()> {
 538        let receiver_ids = self
 539            .state()
 540            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 541        broadcast(request.sender_id, receiver_ids, |connection_id| {
 542            self.peer
 543                .forward_send(request.sender_id, connection_id, request.payload.clone())
 544        })?;
 545        Ok(())
 546    }
 547
 548    async fn forward_project_request<T>(
 549        self: Arc<Server>,
 550        request: TypedEnvelope<T>,
 551    ) -> tide::Result<T::Response>
 552    where
 553        T: EntityMessage + RequestMessage,
 554    {
 555        let host_connection_id = self
 556            .state()
 557            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 558            .host_connection_id;
 559        Ok(self
 560            .peer
 561            .forward_request(request.sender_id, host_connection_id, request.payload)
 562            .await?)
 563    }
 564
 565    async fn close_buffer(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::CloseBuffer>,
 568    ) -> tide::Result<()> {
 569        let host_connection_id = self
 570            .state()
 571            .read_project(request.payload.project_id, request.sender_id)?
 572            .host_connection_id;
 573        self.peer
 574            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 575        Ok(())
 576    }
 577
 578    async fn save_buffer(
 579        self: Arc<Server>,
 580        request: TypedEnvelope<proto::SaveBuffer>,
 581    ) -> tide::Result<proto::BufferSaved> {
 582        let host;
 583        let mut guests;
 584        {
 585            let state = self.state();
 586            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 587            host = project.host_connection_id;
 588            guests = project.guest_connection_ids()
 589        }
 590
 591        let response = self
 592            .peer
 593            .forward_request(request.sender_id, host, request.payload.clone())
 594            .await?;
 595
 596        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 597        broadcast(host, guests, |conn_id| {
 598            self.peer.forward_send(host, conn_id, response.clone())
 599        })?;
 600
 601        Ok(response)
 602    }
 603
 604    async fn update_buffer(
 605        self: Arc<Server>,
 606        request: TypedEnvelope<proto::UpdateBuffer>,
 607    ) -> tide::Result<proto::Ack> {
 608        let receiver_ids = self
 609            .state()
 610            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 611        broadcast(request.sender_id, receiver_ids, |connection_id| {
 612            self.peer
 613                .forward_send(request.sender_id, connection_id, request.payload.clone())
 614        })?;
 615        Ok(proto::Ack {})
 616    }
 617
 618    async fn update_buffer_file(
 619        self: Arc<Server>,
 620        request: TypedEnvelope<proto::UpdateBufferFile>,
 621    ) -> tide::Result<()> {
 622        let receiver_ids = self
 623            .state()
 624            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 625        broadcast(request.sender_id, receiver_ids, |connection_id| {
 626            self.peer
 627                .forward_send(request.sender_id, connection_id, request.payload.clone())
 628        })?;
 629        Ok(())
 630    }
 631
 632    async fn buffer_reloaded(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::BufferReloaded>,
 635    ) -> tide::Result<()> {
 636        let receiver_ids = self
 637            .state()
 638            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 639        broadcast(request.sender_id, receiver_ids, |connection_id| {
 640            self.peer
 641                .forward_send(request.sender_id, connection_id, request.payload.clone())
 642        })?;
 643        Ok(())
 644    }
 645
 646    async fn buffer_saved(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::BufferSaved>,
 649    ) -> tide::Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 653        broadcast(request.sender_id, receiver_ids, |connection_id| {
 654            self.peer
 655                .forward_send(request.sender_id, connection_id, request.payload.clone())
 656        })?;
 657        Ok(())
 658    }
 659
 660    async fn get_channels(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::GetChannels>,
 663    ) -> tide::Result<proto::GetChannelsResponse> {
 664        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 665        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 666        Ok(proto::GetChannelsResponse {
 667            channels: channels
 668                .into_iter()
 669                .map(|chan| proto::Channel {
 670                    id: chan.id.to_proto(),
 671                    name: chan.name,
 672                })
 673                .collect(),
 674        })
 675    }
 676
 677    async fn get_users(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::GetUsers>,
 680    ) -> tide::Result<proto::GetUsersResponse> {
 681        let user_ids = request
 682            .payload
 683            .user_ids
 684            .into_iter()
 685            .map(UserId::from_proto)
 686            .collect();
 687        let users = self
 688            .app_state
 689            .db
 690            .get_users_by_ids(user_ids)
 691            .await?
 692            .into_iter()
 693            .map(|user| proto::User {
 694                id: user.id.to_proto(),
 695                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 696                github_login: user.github_login,
 697            })
 698            .collect();
 699        Ok(proto::GetUsersResponse { users })
 700    }
 701
 702    fn update_contacts_for_users<'a>(
 703        self: &Arc<Server>,
 704        user_ids: impl IntoIterator<Item = &'a UserId>,
 705    ) -> anyhow::Result<()> {
 706        let mut result = Ok(());
 707        let state = self.state();
 708        for user_id in user_ids {
 709            let contacts = state.contacts_for_user(*user_id);
 710            for connection_id in state.connection_ids_for_user(*user_id) {
 711                if let Err(error) = self.peer.send(
 712                    connection_id,
 713                    proto::UpdateContacts {
 714                        contacts: contacts.clone(),
 715                    },
 716                ) {
 717                    result = Err(error);
 718                }
 719            }
 720        }
 721        result
 722    }
 723
 724    async fn join_channel(
 725        mut self: Arc<Self>,
 726        request: TypedEnvelope<proto::JoinChannel>,
 727    ) -> tide::Result<proto::JoinChannelResponse> {
 728        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 729        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 730        if !self
 731            .app_state
 732            .db
 733            .can_user_access_channel(user_id, channel_id)
 734            .await?
 735        {
 736            Err(anyhow!("access denied"))?;
 737        }
 738
 739        self.state_mut().join_channel(request.sender_id, channel_id);
 740        let messages = self
 741            .app_state
 742            .db
 743            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 744            .await?
 745            .into_iter()
 746            .map(|msg| proto::ChannelMessage {
 747                id: msg.id.to_proto(),
 748                body: msg.body,
 749                timestamp: msg.sent_at.unix_timestamp() as u64,
 750                sender_id: msg.sender_id.to_proto(),
 751                nonce: Some(msg.nonce.as_u128().into()),
 752            })
 753            .collect::<Vec<_>>();
 754        Ok(proto::JoinChannelResponse {
 755            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 756            messages,
 757        })
 758    }
 759
 760    async fn leave_channel(
 761        mut self: Arc<Self>,
 762        request: TypedEnvelope<proto::LeaveChannel>,
 763    ) -> tide::Result<()> {
 764        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 765        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 766        if !self
 767            .app_state
 768            .db
 769            .can_user_access_channel(user_id, channel_id)
 770            .await?
 771        {
 772            Err(anyhow!("access denied"))?;
 773        }
 774
 775        self.state_mut()
 776            .leave_channel(request.sender_id, channel_id);
 777
 778        Ok(())
 779    }
 780
 781    async fn send_channel_message(
 782        self: Arc<Self>,
 783        request: TypedEnvelope<proto::SendChannelMessage>,
 784    ) -> tide::Result<proto::SendChannelMessageResponse> {
 785        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 786        let user_id;
 787        let connection_ids;
 788        {
 789            let state = self.state();
 790            user_id = state.user_id_for_connection(request.sender_id)?;
 791            connection_ids = state.channel_connection_ids(channel_id)?;
 792        }
 793
 794        // Validate the message body.
 795        let body = request.payload.body.trim().to_string();
 796        if body.len() > MAX_MESSAGE_LEN {
 797            return Err(anyhow!("message is too long"))?;
 798        }
 799        if body.is_empty() {
 800            return Err(anyhow!("message can't be blank"))?;
 801        }
 802
 803        let timestamp = OffsetDateTime::now_utc();
 804        let nonce = request
 805            .payload
 806            .nonce
 807            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 808
 809        let message_id = self
 810            .app_state
 811            .db
 812            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 813            .await?
 814            .to_proto();
 815        let message = proto::ChannelMessage {
 816            sender_id: user_id.to_proto(),
 817            id: message_id,
 818            body,
 819            timestamp: timestamp.unix_timestamp() as u64,
 820            nonce: Some(nonce),
 821        };
 822        broadcast(request.sender_id, connection_ids, |conn_id| {
 823            self.peer.send(
 824                conn_id,
 825                proto::ChannelMessageSent {
 826                    channel_id: channel_id.to_proto(),
 827                    message: Some(message.clone()),
 828                },
 829            )
 830        })?;
 831        Ok(proto::SendChannelMessageResponse {
 832            message: Some(message),
 833        })
 834    }
 835
 836    async fn get_channel_messages(
 837        self: Arc<Self>,
 838        request: TypedEnvelope<proto::GetChannelMessages>,
 839    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 840        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 841        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 842        if !self
 843            .app_state
 844            .db
 845            .can_user_access_channel(user_id, channel_id)
 846            .await?
 847        {
 848            Err(anyhow!("access denied"))?;
 849        }
 850
 851        let messages = self
 852            .app_state
 853            .db
 854            .get_channel_messages(
 855                channel_id,
 856                MESSAGE_COUNT_PER_PAGE,
 857                Some(MessageId::from_proto(request.payload.before_message_id)),
 858            )
 859            .await?
 860            .into_iter()
 861            .map(|msg| proto::ChannelMessage {
 862                id: msg.id.to_proto(),
 863                body: msg.body,
 864                timestamp: msg.sent_at.unix_timestamp() as u64,
 865                sender_id: msg.sender_id.to_proto(),
 866                nonce: Some(msg.nonce.as_u128().into()),
 867            })
 868            .collect::<Vec<_>>();
 869
 870        Ok(proto::GetChannelMessagesResponse {
 871            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 872            messages,
 873        })
 874    }
 875
 876    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 877        self.store.read()
 878    }
 879
 880    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 881        self.store.write()
 882    }
 883}
 884
 885impl Executor for RealExecutor {
 886    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 887        task::spawn(future);
 888    }
 889}
 890
 891fn broadcast<F>(
 892    sender_id: ConnectionId,
 893    receiver_ids: Vec<ConnectionId>,
 894    mut f: F,
 895) -> anyhow::Result<()>
 896where
 897    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 898{
 899    let mut result = Ok(());
 900    for receiver_id in receiver_ids {
 901        if receiver_id != sender_id {
 902            if let Err(error) = f(receiver_id) {
 903                if result.is_ok() {
 904                    result = Err(error);
 905                }
 906            }
 907        }
 908    }
 909    result
 910}
 911
 912pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 913    let server = Server::new(app.state().clone(), rpc.clone(), None);
 914    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 915        let server = server.clone();
 916        async move {
 917            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 918
 919            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 920            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 921            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 922            let client_protocol_version: Option<u32> = request
 923                .header("X-Zed-Protocol-Version")
 924                .and_then(|v| v.as_str().parse().ok());
 925
 926            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 927                return Ok(Response::new(StatusCode::UpgradeRequired));
 928            }
 929
 930            let header = match request.header("Sec-Websocket-Key") {
 931                Some(h) => h.as_str(),
 932                None => return Err(anyhow!("expected sec-websocket-key"))?,
 933            };
 934
 935            let user_id = process_auth_header(&request).await?;
 936
 937            let mut response = Response::new(StatusCode::SwitchingProtocols);
 938            response.insert_header(UPGRADE, "websocket");
 939            response.insert_header(CONNECTION, "Upgrade");
 940            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 941            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 942            response.insert_header("Sec-Websocket-Version", "13");
 943
 944            let http_res: &mut tide::http::Response = response.as_mut();
 945            let upgrade_receiver = http_res.recv_upgrade().await;
 946            let addr = request.remote().unwrap_or("unknown").to_string();
 947            task::spawn(async move {
 948                if let Some(stream) = upgrade_receiver.await {
 949                    server
 950                        .handle_connection(
 951                            Connection::new(
 952                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 953                            ),
 954                            addr,
 955                            user_id,
 956                            None,
 957                            RealExecutor,
 958                        )
 959                        .await;
 960                }
 961            });
 962
 963            Ok(response)
 964        }
 965    });
 966}
 967
 968fn header_contains_ignore_case<T>(
 969    request: &tide::Request<T>,
 970    header_name: HeaderName,
 971    value: &str,
 972) -> bool {
 973    request
 974        .header(header_name)
 975        .map(|h| {
 976            h.as_str()
 977                .split(',')
 978                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 979        })
 980        .unwrap_or(false)
 981}
 982
 983#[cfg(test)]
 984mod tests {
 985    use super::*;
 986    use crate::{
 987        auth,
 988        db::{tests::TestDb, UserId},
 989        github, AppState, Config,
 990    };
 991    use ::rpc::Peer;
 992    use client::{
 993        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 994        EstablishConnectionError, UserStore,
 995    };
 996    use collections::BTreeMap;
 997    use editor::{
 998        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
 999        Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1000    };
1001    use gpui::{executor, ModelHandle, TestAppContext};
1002    use language::{
1003        tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1004        LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1005    };
1006    use lsp;
1007    use parking_lot::Mutex;
1008    use postage::{sink::Sink, watch};
1009    use project::{
1010        fs::{FakeFs, Fs as _},
1011        search::SearchQuery,
1012        DiagnosticSummary, Project, ProjectPath,
1013    };
1014    use rand::prelude::*;
1015    use rpc::PeerId;
1016    use serde_json::json;
1017    use sqlx::types::time::OffsetDateTime;
1018    use std::{
1019        cell::Cell,
1020        env,
1021        ops::Deref,
1022        path::{Path, PathBuf},
1023        rc::Rc,
1024        sync::{
1025            atomic::{AtomicBool, Ordering::SeqCst},
1026            Arc,
1027        },
1028        time::Duration,
1029    };
1030    use workspace::{Settings, Workspace, WorkspaceParams};
1031
1032    #[cfg(test)]
1033    #[ctor::ctor]
1034    fn init_logger() {
1035        if std::env::var("RUST_LOG").is_ok() {
1036            env_logger::init();
1037        }
1038    }
1039
1040    #[gpui::test(iterations = 10)]
1041    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1042        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1043        let lang_registry = Arc::new(LanguageRegistry::new());
1044        let fs = FakeFs::new(cx_a.background());
1045        cx_a.foreground().forbid_parking();
1046
1047        // Connect to a server as 2 clients.
1048        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1049        let client_a = server.create_client(cx_a, "user_a").await;
1050        let client_b = server.create_client(cx_b, "user_b").await;
1051
1052        // Share a project as client A
1053        fs.insert_tree(
1054            "/a",
1055            json!({
1056                ".zed.toml": r#"collaborators = ["user_b"]"#,
1057                "a.txt": "a-contents",
1058                "b.txt": "b-contents",
1059            }),
1060        )
1061        .await;
1062        let project_a = cx_a.update(|cx| {
1063            Project::local(
1064                client_a.clone(),
1065                client_a.user_store.clone(),
1066                lang_registry.clone(),
1067                fs.clone(),
1068                cx,
1069            )
1070        });
1071        let (worktree_a, _) = project_a
1072            .update(cx_a, |p, cx| {
1073                p.find_or_create_local_worktree("/a", false, cx)
1074            })
1075            .await
1076            .unwrap();
1077        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1078        worktree_a
1079            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1080            .await;
1081        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1082        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1083
1084        // Join that project as client B
1085        let project_b = Project::remote(
1086            project_id,
1087            client_b.clone(),
1088            client_b.user_store.clone(),
1089            lang_registry.clone(),
1090            fs.clone(),
1091            &mut cx_b.to_async(),
1092        )
1093        .await
1094        .unwrap();
1095
1096        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1097            assert_eq!(
1098                project
1099                    .collaborators()
1100                    .get(&client_a.peer_id)
1101                    .unwrap()
1102                    .user
1103                    .github_login,
1104                "user_a"
1105            );
1106            project.replica_id()
1107        });
1108        project_a
1109            .condition(&cx_a, |tree, _| {
1110                tree.collaborators()
1111                    .get(&client_b.peer_id)
1112                    .map_or(false, |collaborator| {
1113                        collaborator.replica_id == replica_id_b
1114                            && collaborator.user.github_login == "user_b"
1115                    })
1116            })
1117            .await;
1118
1119        // Open the same file as client B and client A.
1120        let buffer_b = project_b
1121            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1122            .await
1123            .unwrap();
1124        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1125        buffer_b.read_with(cx_b, |buf, cx| {
1126            assert_eq!(buf.read(cx).text(), "b-contents")
1127        });
1128        project_a.read_with(cx_a, |project, cx| {
1129            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1130        });
1131        let buffer_a = project_a
1132            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1133            .await
1134            .unwrap();
1135
1136        let editor_b = cx_b.add_view(window_b, |cx| {
1137            Editor::for_buffer(
1138                buffer_b,
1139                None,
1140                watch::channel_with(Settings::test(cx)).1,
1141                cx,
1142            )
1143        });
1144
1145        // TODO
1146        // // Create a selection set as client B and see that selection set as client A.
1147        // buffer_a
1148        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1149        //     .await;
1150
1151        // Edit the buffer as client B and see that edit as client A.
1152        editor_b.update(cx_b, |editor, cx| {
1153            editor.handle_input(&Input("ok, ".into()), cx)
1154        });
1155        buffer_a
1156            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1157            .await;
1158
1159        // TODO
1160        // // Remove the selection set as client B, see those selections disappear as client A.
1161        cx_b.update(move |_| drop(editor_b));
1162        // buffer_a
1163        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1164        //     .await;
1165
1166        // Dropping the client B's project removes client B from client A's collaborators.
1167        cx_b.update(move |_| drop(project_b));
1168        project_a
1169            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1170            .await;
1171    }
1172
1173    #[gpui::test(iterations = 10)]
1174    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1175        let lang_registry = Arc::new(LanguageRegistry::new());
1176        let fs = FakeFs::new(cx_a.background());
1177        cx_a.foreground().forbid_parking();
1178
1179        // Connect to a server as 2 clients.
1180        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1181        let client_a = server.create_client(cx_a, "user_a").await;
1182        let client_b = server.create_client(cx_b, "user_b").await;
1183
1184        // Share a project as client A
1185        fs.insert_tree(
1186            "/a",
1187            json!({
1188                ".zed.toml": r#"collaborators = ["user_b"]"#,
1189                "a.txt": "a-contents",
1190                "b.txt": "b-contents",
1191            }),
1192        )
1193        .await;
1194        let project_a = cx_a.update(|cx| {
1195            Project::local(
1196                client_a.clone(),
1197                client_a.user_store.clone(),
1198                lang_registry.clone(),
1199                fs.clone(),
1200                cx,
1201            )
1202        });
1203        let (worktree_a, _) = project_a
1204            .update(cx_a, |p, cx| {
1205                p.find_or_create_local_worktree("/a", false, cx)
1206            })
1207            .await
1208            .unwrap();
1209        worktree_a
1210            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1211            .await;
1212        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1213        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1214        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1215        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1216
1217        // Join that project as client B
1218        let project_b = Project::remote(
1219            project_id,
1220            client_b.clone(),
1221            client_b.user_store.clone(),
1222            lang_registry.clone(),
1223            fs.clone(),
1224            &mut cx_b.to_async(),
1225        )
1226        .await
1227        .unwrap();
1228        project_b
1229            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1230            .await
1231            .unwrap();
1232
1233        // Unshare the project as client A
1234        project_a
1235            .update(cx_a, |project, cx| project.unshare(cx))
1236            .await
1237            .unwrap();
1238        project_b
1239            .condition(cx_b, |project, _| project.is_read_only())
1240            .await;
1241        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1242        cx_b.update(|_| {
1243            drop(project_b);
1244        });
1245
1246        // Share the project again and ensure guests can still join.
1247        project_a
1248            .update(cx_a, |project, cx| project.share(cx))
1249            .await
1250            .unwrap();
1251        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1252
1253        let project_b2 = Project::remote(
1254            project_id,
1255            client_b.clone(),
1256            client_b.user_store.clone(),
1257            lang_registry.clone(),
1258            fs.clone(),
1259            &mut cx_b.to_async(),
1260        )
1261        .await
1262        .unwrap();
1263        project_b2
1264            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1265            .await
1266            .unwrap();
1267    }
1268
1269    #[gpui::test(iterations = 10)]
1270    async fn test_propagate_saves_and_fs_changes(
1271        cx_a: &mut TestAppContext,
1272        cx_b: &mut TestAppContext,
1273        cx_c: &mut TestAppContext,
1274    ) {
1275        let lang_registry = Arc::new(LanguageRegistry::new());
1276        let fs = FakeFs::new(cx_a.background());
1277        cx_a.foreground().forbid_parking();
1278
1279        // Connect to a server as 3 clients.
1280        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1281        let client_a = server.create_client(cx_a, "user_a").await;
1282        let client_b = server.create_client(cx_b, "user_b").await;
1283        let client_c = server.create_client(cx_c, "user_c").await;
1284
1285        // Share a worktree as client A.
1286        fs.insert_tree(
1287            "/a",
1288            json!({
1289                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1290                "file1": "",
1291                "file2": ""
1292            }),
1293        )
1294        .await;
1295        let project_a = cx_a.update(|cx| {
1296            Project::local(
1297                client_a.clone(),
1298                client_a.user_store.clone(),
1299                lang_registry.clone(),
1300                fs.clone(),
1301                cx,
1302            )
1303        });
1304        let (worktree_a, _) = project_a
1305            .update(cx_a, |p, cx| {
1306                p.find_or_create_local_worktree("/a", false, cx)
1307            })
1308            .await
1309            .unwrap();
1310        worktree_a
1311            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1312            .await;
1313        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1314        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1315        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1316
1317        // Join that worktree as clients B and C.
1318        let project_b = Project::remote(
1319            project_id,
1320            client_b.clone(),
1321            client_b.user_store.clone(),
1322            lang_registry.clone(),
1323            fs.clone(),
1324            &mut cx_b.to_async(),
1325        )
1326        .await
1327        .unwrap();
1328        let project_c = Project::remote(
1329            project_id,
1330            client_c.clone(),
1331            client_c.user_store.clone(),
1332            lang_registry.clone(),
1333            fs.clone(),
1334            &mut cx_c.to_async(),
1335        )
1336        .await
1337        .unwrap();
1338        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1339        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1340
1341        // Open and edit a buffer as both guests B and C.
1342        let buffer_b = project_b
1343            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1344            .await
1345            .unwrap();
1346        let buffer_c = project_c
1347            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1348            .await
1349            .unwrap();
1350        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1351        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1352
1353        // Open and edit that buffer as the host.
1354        let buffer_a = project_a
1355            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1356            .await
1357            .unwrap();
1358
1359        buffer_a
1360            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1361            .await;
1362        buffer_a.update(cx_a, |buf, cx| {
1363            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1364        });
1365
1366        // Wait for edits to propagate
1367        buffer_a
1368            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1369            .await;
1370        buffer_b
1371            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1372            .await;
1373        buffer_c
1374            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1375            .await;
1376
1377        // Edit the buffer as the host and concurrently save as guest B.
1378        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1379        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1380        save_b.await.unwrap();
1381        assert_eq!(
1382            fs.load("/a/file1".as_ref()).await.unwrap(),
1383            "hi-a, i-am-c, i-am-b, i-am-a"
1384        );
1385        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1386        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1387        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1388
1389        // Make changes on host's file system, see those changes on guest worktrees.
1390        fs.rename(
1391            "/a/file1".as_ref(),
1392            "/a/file1-renamed".as_ref(),
1393            Default::default(),
1394        )
1395        .await
1396        .unwrap();
1397
1398        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1399            .await
1400            .unwrap();
1401        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1402
1403        worktree_a
1404            .condition(&cx_a, |tree, _| {
1405                tree.paths()
1406                    .map(|p| p.to_string_lossy())
1407                    .collect::<Vec<_>>()
1408                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1409            })
1410            .await;
1411        worktree_b
1412            .condition(&cx_b, |tree, _| {
1413                tree.paths()
1414                    .map(|p| p.to_string_lossy())
1415                    .collect::<Vec<_>>()
1416                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1417            })
1418            .await;
1419        worktree_c
1420            .condition(&cx_c, |tree, _| {
1421                tree.paths()
1422                    .map(|p| p.to_string_lossy())
1423                    .collect::<Vec<_>>()
1424                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1425            })
1426            .await;
1427
1428        // Ensure buffer files are updated as well.
1429        buffer_a
1430            .condition(&cx_a, |buf, _| {
1431                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1432            })
1433            .await;
1434        buffer_b
1435            .condition(&cx_b, |buf, _| {
1436                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1437            })
1438            .await;
1439        buffer_c
1440            .condition(&cx_c, |buf, _| {
1441                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1442            })
1443            .await;
1444    }
1445
1446    #[gpui::test(iterations = 10)]
1447    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1448        cx_a.foreground().forbid_parking();
1449        let lang_registry = Arc::new(LanguageRegistry::new());
1450        let fs = FakeFs::new(cx_a.background());
1451
1452        // Connect to a server as 2 clients.
1453        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1454        let client_a = server.create_client(cx_a, "user_a").await;
1455        let client_b = server.create_client(cx_b, "user_b").await;
1456
1457        // Share a project as client A
1458        fs.insert_tree(
1459            "/dir",
1460            json!({
1461                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1462                "a.txt": "a-contents",
1463            }),
1464        )
1465        .await;
1466
1467        let project_a = cx_a.update(|cx| {
1468            Project::local(
1469                client_a.clone(),
1470                client_a.user_store.clone(),
1471                lang_registry.clone(),
1472                fs.clone(),
1473                cx,
1474            )
1475        });
1476        let (worktree_a, _) = project_a
1477            .update(cx_a, |p, cx| {
1478                p.find_or_create_local_worktree("/dir", false, cx)
1479            })
1480            .await
1481            .unwrap();
1482        worktree_a
1483            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1484            .await;
1485        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1486        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1487        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1488
1489        // Join that project as client B
1490        let project_b = Project::remote(
1491            project_id,
1492            client_b.clone(),
1493            client_b.user_store.clone(),
1494            lang_registry.clone(),
1495            fs.clone(),
1496            &mut cx_b.to_async(),
1497        )
1498        .await
1499        .unwrap();
1500
1501        // Open a buffer as client B
1502        let buffer_b = project_b
1503            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1504            .await
1505            .unwrap();
1506
1507        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1508        buffer_b.read_with(cx_b, |buf, _| {
1509            assert!(buf.is_dirty());
1510            assert!(!buf.has_conflict());
1511        });
1512
1513        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1514        buffer_b
1515            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1516            .await;
1517        buffer_b.read_with(cx_b, |buf, _| {
1518            assert!(!buf.has_conflict());
1519        });
1520
1521        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1522        buffer_b.read_with(cx_b, |buf, _| {
1523            assert!(buf.is_dirty());
1524            assert!(!buf.has_conflict());
1525        });
1526    }
1527
1528    #[gpui::test(iterations = 10)]
1529    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1530        cx_a.foreground().forbid_parking();
1531        let lang_registry = Arc::new(LanguageRegistry::new());
1532        let fs = FakeFs::new(cx_a.background());
1533
1534        // Connect to a server as 2 clients.
1535        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1536        let client_a = server.create_client(cx_a, "user_a").await;
1537        let client_b = server.create_client(cx_b, "user_b").await;
1538
1539        // Share a project as client A
1540        fs.insert_tree(
1541            "/dir",
1542            json!({
1543                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1544                "a.txt": "a-contents",
1545            }),
1546        )
1547        .await;
1548
1549        let project_a = cx_a.update(|cx| {
1550            Project::local(
1551                client_a.clone(),
1552                client_a.user_store.clone(),
1553                lang_registry.clone(),
1554                fs.clone(),
1555                cx,
1556            )
1557        });
1558        let (worktree_a, _) = project_a
1559            .update(cx_a, |p, cx| {
1560                p.find_or_create_local_worktree("/dir", false, cx)
1561            })
1562            .await
1563            .unwrap();
1564        worktree_a
1565            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1566            .await;
1567        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1568        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1569        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1570
1571        // Join that project as client B
1572        let project_b = Project::remote(
1573            project_id,
1574            client_b.clone(),
1575            client_b.user_store.clone(),
1576            lang_registry.clone(),
1577            fs.clone(),
1578            &mut cx_b.to_async(),
1579        )
1580        .await
1581        .unwrap();
1582        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1583
1584        // Open a buffer as client B
1585        let buffer_b = project_b
1586            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1587            .await
1588            .unwrap();
1589        buffer_b.read_with(cx_b, |buf, _| {
1590            assert!(!buf.is_dirty());
1591            assert!(!buf.has_conflict());
1592        });
1593
1594        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1595            .await
1596            .unwrap();
1597        buffer_b
1598            .condition(&cx_b, |buf, _| {
1599                buf.text() == "new contents" && !buf.is_dirty()
1600            })
1601            .await;
1602        buffer_b.read_with(cx_b, |buf, _| {
1603            assert!(!buf.has_conflict());
1604        });
1605    }
1606
1607    #[gpui::test(iterations = 10)]
1608    async fn test_editing_while_guest_opens_buffer(
1609        cx_a: &mut TestAppContext,
1610        cx_b: &mut TestAppContext,
1611    ) {
1612        cx_a.foreground().forbid_parking();
1613        let lang_registry = Arc::new(LanguageRegistry::new());
1614        let fs = FakeFs::new(cx_a.background());
1615
1616        // Connect to a server as 2 clients.
1617        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1618        let client_a = server.create_client(cx_a, "user_a").await;
1619        let client_b = server.create_client(cx_b, "user_b").await;
1620
1621        // Share a project as client A
1622        fs.insert_tree(
1623            "/dir",
1624            json!({
1625                ".zed.toml": r#"collaborators = ["user_b"]"#,
1626                "a.txt": "a-contents",
1627            }),
1628        )
1629        .await;
1630        let project_a = cx_a.update(|cx| {
1631            Project::local(
1632                client_a.clone(),
1633                client_a.user_store.clone(),
1634                lang_registry.clone(),
1635                fs.clone(),
1636                cx,
1637            )
1638        });
1639        let (worktree_a, _) = project_a
1640            .update(cx_a, |p, cx| {
1641                p.find_or_create_local_worktree("/dir", false, cx)
1642            })
1643            .await
1644            .unwrap();
1645        worktree_a
1646            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1647            .await;
1648        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1649        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1650        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1651
1652        // Join that project as client B
1653        let project_b = Project::remote(
1654            project_id,
1655            client_b.clone(),
1656            client_b.user_store.clone(),
1657            lang_registry.clone(),
1658            fs.clone(),
1659            &mut cx_b.to_async(),
1660        )
1661        .await
1662        .unwrap();
1663
1664        // Open a buffer as client A
1665        let buffer_a = project_a
1666            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1667            .await
1668            .unwrap();
1669
1670        // Start opening the same buffer as client B
1671        let buffer_b = cx_b
1672            .background()
1673            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1674
1675        // Edit the buffer as client A while client B is still opening it.
1676        cx_b.background().simulate_random_delay().await;
1677        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1678        cx_b.background().simulate_random_delay().await;
1679        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1680
1681        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1682        let buffer_b = buffer_b.await.unwrap();
1683        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1684    }
1685
1686    #[gpui::test(iterations = 10)]
1687    async fn test_leaving_worktree_while_opening_buffer(
1688        cx_a: &mut TestAppContext,
1689        cx_b: &mut TestAppContext,
1690    ) {
1691        cx_a.foreground().forbid_parking();
1692        let lang_registry = Arc::new(LanguageRegistry::new());
1693        let fs = FakeFs::new(cx_a.background());
1694
1695        // Connect to a server as 2 clients.
1696        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1697        let client_a = server.create_client(cx_a, "user_a").await;
1698        let client_b = server.create_client(cx_b, "user_b").await;
1699
1700        // Share a project as client A
1701        fs.insert_tree(
1702            "/dir",
1703            json!({
1704                ".zed.toml": r#"collaborators = ["user_b"]"#,
1705                "a.txt": "a-contents",
1706            }),
1707        )
1708        .await;
1709        let project_a = cx_a.update(|cx| {
1710            Project::local(
1711                client_a.clone(),
1712                client_a.user_store.clone(),
1713                lang_registry.clone(),
1714                fs.clone(),
1715                cx,
1716            )
1717        });
1718        let (worktree_a, _) = project_a
1719            .update(cx_a, |p, cx| {
1720                p.find_or_create_local_worktree("/dir", false, cx)
1721            })
1722            .await
1723            .unwrap();
1724        worktree_a
1725            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1726            .await;
1727        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1728        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1729        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1730
1731        // Join that project as client B
1732        let project_b = Project::remote(
1733            project_id,
1734            client_b.clone(),
1735            client_b.user_store.clone(),
1736            lang_registry.clone(),
1737            fs.clone(),
1738            &mut cx_b.to_async(),
1739        )
1740        .await
1741        .unwrap();
1742
1743        // See that a guest has joined as client A.
1744        project_a
1745            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1746            .await;
1747
1748        // Begin opening a buffer as client B, but leave the project before the open completes.
1749        let buffer_b = cx_b
1750            .background()
1751            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1752        cx_b.update(|_| drop(project_b));
1753        drop(buffer_b);
1754
1755        // See that the guest has left.
1756        project_a
1757            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1758            .await;
1759    }
1760
1761    #[gpui::test(iterations = 10)]
1762    async fn test_peer_disconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1763        cx_a.foreground().forbid_parking();
1764        let lang_registry = Arc::new(LanguageRegistry::new());
1765        let fs = FakeFs::new(cx_a.background());
1766
1767        // Connect to a server as 2 clients.
1768        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1769        let client_a = server.create_client(cx_a, "user_a").await;
1770        let client_b = server.create_client(cx_b, "user_b").await;
1771
1772        // Share a project as client A
1773        fs.insert_tree(
1774            "/a",
1775            json!({
1776                ".zed.toml": r#"collaborators = ["user_b"]"#,
1777                "a.txt": "a-contents",
1778                "b.txt": "b-contents",
1779            }),
1780        )
1781        .await;
1782        let project_a = cx_a.update(|cx| {
1783            Project::local(
1784                client_a.clone(),
1785                client_a.user_store.clone(),
1786                lang_registry.clone(),
1787                fs.clone(),
1788                cx,
1789            )
1790        });
1791        let (worktree_a, _) = project_a
1792            .update(cx_a, |p, cx| {
1793                p.find_or_create_local_worktree("/a", false, cx)
1794            })
1795            .await
1796            .unwrap();
1797        worktree_a
1798            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1799            .await;
1800        let project_id = project_a
1801            .update(cx_a, |project, _| project.next_remote_id())
1802            .await;
1803        project_a
1804            .update(cx_a, |project, cx| project.share(cx))
1805            .await
1806            .unwrap();
1807
1808        // Join that project as client B
1809        let _project_b = Project::remote(
1810            project_id,
1811            client_b.clone(),
1812            client_b.user_store.clone(),
1813            lang_registry.clone(),
1814            fs.clone(),
1815            &mut cx_b.to_async(),
1816        )
1817        .await
1818        .unwrap();
1819
1820        // See that a guest has joined as client A.
1821        project_a
1822            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1823            .await;
1824
1825        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1826        client_b.disconnect(&cx_b.to_async()).unwrap();
1827        project_a
1828            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1829            .await;
1830    }
1831
1832    #[gpui::test(iterations = 10)]
1833    async fn test_collaborating_with_diagnostics(
1834        cx_a: &mut TestAppContext,
1835        cx_b: &mut TestAppContext,
1836    ) {
1837        cx_a.foreground().forbid_parking();
1838        let mut lang_registry = Arc::new(LanguageRegistry::new());
1839        let fs = FakeFs::new(cx_a.background());
1840
1841        // Set up a fake language server.
1842        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1843        Arc::get_mut(&mut lang_registry)
1844            .unwrap()
1845            .add(Arc::new(Language::new(
1846                LanguageConfig {
1847                    name: "Rust".into(),
1848                    path_suffixes: vec!["rs".to_string()],
1849                    language_server: Some(language_server_config),
1850                    ..Default::default()
1851                },
1852                Some(tree_sitter_rust::language()),
1853            )));
1854
1855        // Connect to a server as 2 clients.
1856        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1857        let client_a = server.create_client(cx_a, "user_a").await;
1858        let client_b = server.create_client(cx_b, "user_b").await;
1859
1860        // Share a project as client A
1861        fs.insert_tree(
1862            "/a",
1863            json!({
1864                ".zed.toml": r#"collaborators = ["user_b"]"#,
1865                "a.rs": "let one = two",
1866                "other.rs": "",
1867            }),
1868        )
1869        .await;
1870        let project_a = cx_a.update(|cx| {
1871            Project::local(
1872                client_a.clone(),
1873                client_a.user_store.clone(),
1874                lang_registry.clone(),
1875                fs.clone(),
1876                cx,
1877            )
1878        });
1879        let (worktree_a, _) = project_a
1880            .update(cx_a, |p, cx| {
1881                p.find_or_create_local_worktree("/a", false, cx)
1882            })
1883            .await
1884            .unwrap();
1885        worktree_a
1886            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1887            .await;
1888        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1889        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1890        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1891
1892        // Cause the language server to start.
1893        let _ = cx_a
1894            .background()
1895            .spawn(project_a.update(cx_a, |project, cx| {
1896                project.open_buffer(
1897                    ProjectPath {
1898                        worktree_id,
1899                        path: Path::new("other.rs").into(),
1900                    },
1901                    cx,
1902                )
1903            }))
1904            .await
1905            .unwrap();
1906
1907        // Simulate a language server reporting errors for a file.
1908        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1909        fake_language_server
1910            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1911                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1912                version: None,
1913                diagnostics: vec![lsp::Diagnostic {
1914                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1915                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1916                    message: "message 1".to_string(),
1917                    ..Default::default()
1918                }],
1919            })
1920            .await;
1921
1922        // Wait for server to see the diagnostics update.
1923        server
1924            .condition(|store| {
1925                let worktree = store
1926                    .project(project_id)
1927                    .unwrap()
1928                    .share
1929                    .as_ref()
1930                    .unwrap()
1931                    .worktrees
1932                    .get(&worktree_id.to_proto())
1933                    .unwrap();
1934
1935                !worktree.diagnostic_summaries.is_empty()
1936            })
1937            .await;
1938
1939        // Join the worktree as client B.
1940        let project_b = Project::remote(
1941            project_id,
1942            client_b.clone(),
1943            client_b.user_store.clone(),
1944            lang_registry.clone(),
1945            fs.clone(),
1946            &mut cx_b.to_async(),
1947        )
1948        .await
1949        .unwrap();
1950
1951        project_b.read_with(cx_b, |project, cx| {
1952            assert_eq!(
1953                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1954                &[(
1955                    ProjectPath {
1956                        worktree_id,
1957                        path: Arc::from(Path::new("a.rs")),
1958                    },
1959                    DiagnosticSummary {
1960                        error_count: 1,
1961                        warning_count: 0,
1962                        ..Default::default()
1963                    },
1964                )]
1965            )
1966        });
1967
1968        // Simulate a language server reporting more errors for a file.
1969        fake_language_server
1970            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1971                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1972                version: None,
1973                diagnostics: vec![
1974                    lsp::Diagnostic {
1975                        severity: Some(lsp::DiagnosticSeverity::ERROR),
1976                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1977                        message: "message 1".to_string(),
1978                        ..Default::default()
1979                    },
1980                    lsp::Diagnostic {
1981                        severity: Some(lsp::DiagnosticSeverity::WARNING),
1982                        range: lsp::Range::new(
1983                            lsp::Position::new(0, 10),
1984                            lsp::Position::new(0, 13),
1985                        ),
1986                        message: "message 2".to_string(),
1987                        ..Default::default()
1988                    },
1989                ],
1990            })
1991            .await;
1992
1993        // Client b gets the updated summaries
1994        project_b
1995            .condition(&cx_b, |project, cx| {
1996                project.diagnostic_summaries(cx).collect::<Vec<_>>()
1997                    == &[(
1998                        ProjectPath {
1999                            worktree_id,
2000                            path: Arc::from(Path::new("a.rs")),
2001                        },
2002                        DiagnosticSummary {
2003                            error_count: 1,
2004                            warning_count: 1,
2005                            ..Default::default()
2006                        },
2007                    )]
2008            })
2009            .await;
2010
2011        // Open the file with the errors on client B. They should be present.
2012        let buffer_b = cx_b
2013            .background()
2014            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2015            .await
2016            .unwrap();
2017
2018        buffer_b.read_with(cx_b, |buffer, _| {
2019            assert_eq!(
2020                buffer
2021                    .snapshot()
2022                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2023                    .map(|entry| entry)
2024                    .collect::<Vec<_>>(),
2025                &[
2026                    DiagnosticEntry {
2027                        range: Point::new(0, 4)..Point::new(0, 7),
2028                        diagnostic: Diagnostic {
2029                            group_id: 0,
2030                            message: "message 1".to_string(),
2031                            severity: lsp::DiagnosticSeverity::ERROR,
2032                            is_primary: true,
2033                            ..Default::default()
2034                        }
2035                    },
2036                    DiagnosticEntry {
2037                        range: Point::new(0, 10)..Point::new(0, 13),
2038                        diagnostic: Diagnostic {
2039                            group_id: 1,
2040                            severity: lsp::DiagnosticSeverity::WARNING,
2041                            message: "message 2".to_string(),
2042                            is_primary: true,
2043                            ..Default::default()
2044                        }
2045                    }
2046                ]
2047            );
2048        });
2049    }
2050
2051    #[gpui::test(iterations = 10)]
2052    async fn test_collaborating_with_completion(
2053        cx_a: &mut TestAppContext,
2054        cx_b: &mut TestAppContext,
2055    ) {
2056        cx_a.foreground().forbid_parking();
2057        let mut lang_registry = Arc::new(LanguageRegistry::new());
2058        let fs = FakeFs::new(cx_a.background());
2059
2060        // Set up a fake language server.
2061        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2062        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2063            completion_provider: Some(lsp::CompletionOptions {
2064                trigger_characters: Some(vec![".".to_string()]),
2065                ..Default::default()
2066            }),
2067            ..Default::default()
2068        });
2069        Arc::get_mut(&mut lang_registry)
2070            .unwrap()
2071            .add(Arc::new(Language::new(
2072                LanguageConfig {
2073                    name: "Rust".into(),
2074                    path_suffixes: vec!["rs".to_string()],
2075                    language_server: Some(language_server_config),
2076                    ..Default::default()
2077                },
2078                Some(tree_sitter_rust::language()),
2079            )));
2080
2081        // Connect to a server as 2 clients.
2082        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2083        let client_a = server.create_client(cx_a, "user_a").await;
2084        let client_b = server.create_client(cx_b, "user_b").await;
2085
2086        // Share a project as client A
2087        fs.insert_tree(
2088            "/a",
2089            json!({
2090                ".zed.toml": r#"collaborators = ["user_b"]"#,
2091                "main.rs": "fn main() { a }",
2092                "other.rs": "",
2093            }),
2094        )
2095        .await;
2096        let project_a = cx_a.update(|cx| {
2097            Project::local(
2098                client_a.clone(),
2099                client_a.user_store.clone(),
2100                lang_registry.clone(),
2101                fs.clone(),
2102                cx,
2103            )
2104        });
2105        let (worktree_a, _) = project_a
2106            .update(cx_a, |p, cx| {
2107                p.find_or_create_local_worktree("/a", false, cx)
2108            })
2109            .await
2110            .unwrap();
2111        worktree_a
2112            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2113            .await;
2114        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2115        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2116        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2117
2118        // Join the worktree as client B.
2119        let project_b = Project::remote(
2120            project_id,
2121            client_b.clone(),
2122            client_b.user_store.clone(),
2123            lang_registry.clone(),
2124            fs.clone(),
2125            &mut cx_b.to_async(),
2126        )
2127        .await
2128        .unwrap();
2129
2130        // Open a file in an editor as the guest.
2131        let buffer_b = project_b
2132            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2133            .await
2134            .unwrap();
2135        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2136        let editor_b = cx_b.add_view(window_b, |cx| {
2137            Editor::for_buffer(
2138                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2139                Some(project_b.clone()),
2140                watch::channel_with(Settings::test(cx)).1,
2141                cx,
2142            )
2143        });
2144
2145        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2146        buffer_b
2147            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2148            .await;
2149
2150        // Type a completion trigger character as the guest.
2151        editor_b.update(cx_b, |editor, cx| {
2152            editor.select_ranges([13..13], None, cx);
2153            editor.handle_input(&Input(".".into()), cx);
2154            cx.focus(&editor_b);
2155        });
2156
2157        // Receive a completion request as the host's language server.
2158        // Return some completions from the host's language server.
2159        cx_a.foreground().start_waiting();
2160        fake_language_server
2161            .handle_request::<lsp::request::Completion, _>(|params, _| {
2162                assert_eq!(
2163                    params.text_document_position.text_document.uri,
2164                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2165                );
2166                assert_eq!(
2167                    params.text_document_position.position,
2168                    lsp::Position::new(0, 14),
2169                );
2170
2171                Some(lsp::CompletionResponse::Array(vec![
2172                    lsp::CompletionItem {
2173                        label: "first_method(…)".into(),
2174                        detail: Some("fn(&mut self, B) -> C".into()),
2175                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2176                            new_text: "first_method($1)".to_string(),
2177                            range: lsp::Range::new(
2178                                lsp::Position::new(0, 14),
2179                                lsp::Position::new(0, 14),
2180                            ),
2181                        })),
2182                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2183                        ..Default::default()
2184                    },
2185                    lsp::CompletionItem {
2186                        label: "second_method(…)".into(),
2187                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2188                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2189                            new_text: "second_method()".to_string(),
2190                            range: lsp::Range::new(
2191                                lsp::Position::new(0, 14),
2192                                lsp::Position::new(0, 14),
2193                            ),
2194                        })),
2195                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2196                        ..Default::default()
2197                    },
2198                ]))
2199            })
2200            .next()
2201            .await
2202            .unwrap();
2203        cx_a.foreground().finish_waiting();
2204
2205        // Open the buffer on the host.
2206        let buffer_a = project_a
2207            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2208            .await
2209            .unwrap();
2210        buffer_a
2211            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2212            .await;
2213
2214        // Confirm a completion on the guest.
2215        editor_b
2216            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2217            .await;
2218        editor_b.update(cx_b, |editor, cx| {
2219            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2220            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2221        });
2222
2223        // Return a resolved completion from the host's language server.
2224        // The resolved completion has an additional text edit.
2225        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2226            |params, _| {
2227                assert_eq!(params.label, "first_method(…)");
2228                lsp::CompletionItem {
2229                    label: "first_method(…)".into(),
2230                    detail: Some("fn(&mut self, B) -> C".into()),
2231                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2232                        new_text: "first_method($1)".to_string(),
2233                        range: lsp::Range::new(
2234                            lsp::Position::new(0, 14),
2235                            lsp::Position::new(0, 14),
2236                        ),
2237                    })),
2238                    additional_text_edits: Some(vec![lsp::TextEdit {
2239                        new_text: "use d::SomeTrait;\n".to_string(),
2240                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2241                    }]),
2242                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2243                    ..Default::default()
2244                }
2245            },
2246        );
2247
2248        // The additional edit is applied.
2249        buffer_a
2250            .condition(&cx_a, |buffer, _| {
2251                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2252            })
2253            .await;
2254        buffer_b
2255            .condition(&cx_b, |buffer, _| {
2256                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2257            })
2258            .await;
2259    }
2260
2261    #[gpui::test(iterations = 10)]
2262    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2263        cx_a.foreground().forbid_parking();
2264        let mut lang_registry = Arc::new(LanguageRegistry::new());
2265        let fs = FakeFs::new(cx_a.background());
2266
2267        // Set up a fake language server.
2268        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2269        Arc::get_mut(&mut lang_registry)
2270            .unwrap()
2271            .add(Arc::new(Language::new(
2272                LanguageConfig {
2273                    name: "Rust".into(),
2274                    path_suffixes: vec!["rs".to_string()],
2275                    language_server: Some(language_server_config),
2276                    ..Default::default()
2277                },
2278                Some(tree_sitter_rust::language()),
2279            )));
2280
2281        // Connect to a server as 2 clients.
2282        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2283        let client_a = server.create_client(cx_a, "user_a").await;
2284        let client_b = server.create_client(cx_b, "user_b").await;
2285
2286        // Share a project as client A
2287        fs.insert_tree(
2288            "/a",
2289            json!({
2290                ".zed.toml": r#"collaborators = ["user_b"]"#,
2291                "a.rs": "let one = two",
2292            }),
2293        )
2294        .await;
2295        let project_a = cx_a.update(|cx| {
2296            Project::local(
2297                client_a.clone(),
2298                client_a.user_store.clone(),
2299                lang_registry.clone(),
2300                fs.clone(),
2301                cx,
2302            )
2303        });
2304        let (worktree_a, _) = project_a
2305            .update(cx_a, |p, cx| {
2306                p.find_or_create_local_worktree("/a", false, cx)
2307            })
2308            .await
2309            .unwrap();
2310        worktree_a
2311            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2312            .await;
2313        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2314        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2315        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2316
2317        // Join the worktree as client B.
2318        let project_b = Project::remote(
2319            project_id,
2320            client_b.clone(),
2321            client_b.user_store.clone(),
2322            lang_registry.clone(),
2323            fs.clone(),
2324            &mut cx_b.to_async(),
2325        )
2326        .await
2327        .unwrap();
2328
2329        let buffer_b = cx_b
2330            .background()
2331            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2332            .await
2333            .unwrap();
2334
2335        let format = project_b.update(cx_b, |project, cx| {
2336            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2337        });
2338
2339        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2340        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2341            Some(vec![
2342                lsp::TextEdit {
2343                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2344                    new_text: "h".to_string(),
2345                },
2346                lsp::TextEdit {
2347                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2348                    new_text: "y".to_string(),
2349                },
2350            ])
2351        });
2352
2353        format.await.unwrap();
2354        assert_eq!(
2355            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2356            "let honey = two"
2357        );
2358    }
2359
2360    #[gpui::test(iterations = 10)]
2361    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2362        cx_a.foreground().forbid_parking();
2363        let mut lang_registry = Arc::new(LanguageRegistry::new());
2364        let fs = FakeFs::new(cx_a.background());
2365        fs.insert_tree(
2366            "/root-1",
2367            json!({
2368                ".zed.toml": r#"collaborators = ["user_b"]"#,
2369                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2370            }),
2371        )
2372        .await;
2373        fs.insert_tree(
2374            "/root-2",
2375            json!({
2376                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2377            }),
2378        )
2379        .await;
2380
2381        // Set up a fake language server.
2382        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2383        Arc::get_mut(&mut lang_registry)
2384            .unwrap()
2385            .add(Arc::new(Language::new(
2386                LanguageConfig {
2387                    name: "Rust".into(),
2388                    path_suffixes: vec!["rs".to_string()],
2389                    language_server: Some(language_server_config),
2390                    ..Default::default()
2391                },
2392                Some(tree_sitter_rust::language()),
2393            )));
2394
2395        // Connect to a server as 2 clients.
2396        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2397        let client_a = server.create_client(cx_a, "user_a").await;
2398        let client_b = server.create_client(cx_b, "user_b").await;
2399
2400        // Share a project as client A
2401        let project_a = cx_a.update(|cx| {
2402            Project::local(
2403                client_a.clone(),
2404                client_a.user_store.clone(),
2405                lang_registry.clone(),
2406                fs.clone(),
2407                cx,
2408            )
2409        });
2410        let (worktree_a, _) = project_a
2411            .update(cx_a, |p, cx| {
2412                p.find_or_create_local_worktree("/root-1", false, cx)
2413            })
2414            .await
2415            .unwrap();
2416        worktree_a
2417            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2418            .await;
2419        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2420        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2421        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2422
2423        // Join the worktree as client B.
2424        let project_b = Project::remote(
2425            project_id,
2426            client_b.clone(),
2427            client_b.user_store.clone(),
2428            lang_registry.clone(),
2429            fs.clone(),
2430            &mut cx_b.to_async(),
2431        )
2432        .await
2433        .unwrap();
2434
2435        // Open the file on client B.
2436        let buffer_b = cx_b
2437            .background()
2438            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2439            .await
2440            .unwrap();
2441
2442        // Request the definition of a symbol as the guest.
2443        let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2444
2445        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2446        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2447            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2448                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2449                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2450            )))
2451        });
2452
2453        let definitions_1 = definitions_1.await.unwrap();
2454        cx_b.read(|cx| {
2455            assert_eq!(definitions_1.len(), 1);
2456            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2457            let target_buffer = definitions_1[0].buffer.read(cx);
2458            assert_eq!(
2459                target_buffer.text(),
2460                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2461            );
2462            assert_eq!(
2463                definitions_1[0].range.to_point(target_buffer),
2464                Point::new(0, 6)..Point::new(0, 9)
2465            );
2466        });
2467
2468        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2469        // the previous call to `definition`.
2470        let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2471        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2472            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2473                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2474                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2475            )))
2476        });
2477
2478        let definitions_2 = definitions_2.await.unwrap();
2479        cx_b.read(|cx| {
2480            assert_eq!(definitions_2.len(), 1);
2481            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2482            let target_buffer = definitions_2[0].buffer.read(cx);
2483            assert_eq!(
2484                target_buffer.text(),
2485                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2486            );
2487            assert_eq!(
2488                definitions_2[0].range.to_point(target_buffer),
2489                Point::new(1, 6)..Point::new(1, 11)
2490            );
2491        });
2492        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2493    }
2494
2495    #[gpui::test(iterations = 10)]
2496    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2497        cx_a.foreground().forbid_parking();
2498        let mut lang_registry = Arc::new(LanguageRegistry::new());
2499        let fs = FakeFs::new(cx_a.background());
2500        fs.insert_tree(
2501            "/root-1",
2502            json!({
2503                ".zed.toml": r#"collaborators = ["user_b"]"#,
2504                "one.rs": "const ONE: usize = 1;",
2505                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2506            }),
2507        )
2508        .await;
2509        fs.insert_tree(
2510            "/root-2",
2511            json!({
2512                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2513            }),
2514        )
2515        .await;
2516
2517        // Set up a fake language server.
2518        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2519        Arc::get_mut(&mut lang_registry)
2520            .unwrap()
2521            .add(Arc::new(Language::new(
2522                LanguageConfig {
2523                    name: "Rust".into(),
2524                    path_suffixes: vec!["rs".to_string()],
2525                    language_server: Some(language_server_config),
2526                    ..Default::default()
2527                },
2528                Some(tree_sitter_rust::language()),
2529            )));
2530
2531        // Connect to a server as 2 clients.
2532        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2533        let client_a = server.create_client(cx_a, "user_a").await;
2534        let client_b = server.create_client(cx_b, "user_b").await;
2535
2536        // Share a project as client A
2537        let project_a = cx_a.update(|cx| {
2538            Project::local(
2539                client_a.clone(),
2540                client_a.user_store.clone(),
2541                lang_registry.clone(),
2542                fs.clone(),
2543                cx,
2544            )
2545        });
2546        let (worktree_a, _) = project_a
2547            .update(cx_a, |p, cx| {
2548                p.find_or_create_local_worktree("/root-1", false, cx)
2549            })
2550            .await
2551            .unwrap();
2552        worktree_a
2553            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2554            .await;
2555        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2556        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2557        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2558
2559        // Join the worktree as client B.
2560        let project_b = Project::remote(
2561            project_id,
2562            client_b.clone(),
2563            client_b.user_store.clone(),
2564            lang_registry.clone(),
2565            fs.clone(),
2566            &mut cx_b.to_async(),
2567        )
2568        .await
2569        .unwrap();
2570
2571        // Open the file on client B.
2572        let buffer_b = cx_b
2573            .background()
2574            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2575            .await
2576            .unwrap();
2577
2578        // Request references to a symbol as the guest.
2579        let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2580
2581        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2582        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2583            assert_eq!(
2584                params.text_document_position.text_document.uri.as_str(),
2585                "file:///root-1/one.rs"
2586            );
2587            Some(vec![
2588                lsp::Location {
2589                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2590                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2591                },
2592                lsp::Location {
2593                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2594                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2595                },
2596                lsp::Location {
2597                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2598                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2599                },
2600            ])
2601        });
2602
2603        let references = references.await.unwrap();
2604        cx_b.read(|cx| {
2605            assert_eq!(references.len(), 3);
2606            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2607
2608            let two_buffer = references[0].buffer.read(cx);
2609            let three_buffer = references[2].buffer.read(cx);
2610            assert_eq!(
2611                two_buffer.file().unwrap().path().as_ref(),
2612                Path::new("two.rs")
2613            );
2614            assert_eq!(references[1].buffer, references[0].buffer);
2615            assert_eq!(
2616                three_buffer.file().unwrap().full_path(cx),
2617                Path::new("three.rs")
2618            );
2619
2620            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2621            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2622            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2623        });
2624    }
2625
2626    #[gpui::test(iterations = 10)]
2627    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2628        cx_a.foreground().forbid_parking();
2629        let lang_registry = Arc::new(LanguageRegistry::new());
2630        let fs = FakeFs::new(cx_a.background());
2631        fs.insert_tree(
2632            "/root-1",
2633            json!({
2634                ".zed.toml": r#"collaborators = ["user_b"]"#,
2635                "a": "hello world",
2636                "b": "goodnight moon",
2637                "c": "a world of goo",
2638                "d": "world champion of clown world",
2639            }),
2640        )
2641        .await;
2642        fs.insert_tree(
2643            "/root-2",
2644            json!({
2645                "e": "disney world is fun",
2646            }),
2647        )
2648        .await;
2649
2650        // Connect to a server as 2 clients.
2651        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2652        let client_a = server.create_client(cx_a, "user_a").await;
2653        let client_b = server.create_client(cx_b, "user_b").await;
2654
2655        // Share a project as client A
2656        let project_a = cx_a.update(|cx| {
2657            Project::local(
2658                client_a.clone(),
2659                client_a.user_store.clone(),
2660                lang_registry.clone(),
2661                fs.clone(),
2662                cx,
2663            )
2664        });
2665        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2666
2667        let (worktree_1, _) = project_a
2668            .update(cx_a, |p, cx| {
2669                p.find_or_create_local_worktree("/root-1", false, cx)
2670            })
2671            .await
2672            .unwrap();
2673        worktree_1
2674            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2675            .await;
2676        let (worktree_2, _) = project_a
2677            .update(cx_a, |p, cx| {
2678                p.find_or_create_local_worktree("/root-2", false, cx)
2679            })
2680            .await
2681            .unwrap();
2682        worktree_2
2683            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2684            .await;
2685
2686        eprintln!("sharing");
2687
2688        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2689
2690        // Join the worktree as client B.
2691        let project_b = Project::remote(
2692            project_id,
2693            client_b.clone(),
2694            client_b.user_store.clone(),
2695            lang_registry.clone(),
2696            fs.clone(),
2697            &mut cx_b.to_async(),
2698        )
2699        .await
2700        .unwrap();
2701
2702        let results = project_b
2703            .update(cx_b, |project, cx| {
2704                project.search(SearchQuery::text("world", false, false), cx)
2705            })
2706            .await
2707            .unwrap();
2708
2709        let mut ranges_by_path = results
2710            .into_iter()
2711            .map(|(buffer, ranges)| {
2712                buffer.read_with(cx_b, |buffer, cx| {
2713                    let path = buffer.file().unwrap().full_path(cx);
2714                    let offset_ranges = ranges
2715                        .into_iter()
2716                        .map(|range| range.to_offset(buffer))
2717                        .collect::<Vec<_>>();
2718                    (path, offset_ranges)
2719                })
2720            })
2721            .collect::<Vec<_>>();
2722        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2723
2724        assert_eq!(
2725            ranges_by_path,
2726            &[
2727                (PathBuf::from("root-1/a"), vec![6..11]),
2728                (PathBuf::from("root-1/c"), vec![2..7]),
2729                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2730                (PathBuf::from("root-2/e"), vec![7..12]),
2731            ]
2732        );
2733    }
2734
2735    #[gpui::test(iterations = 10)]
2736    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2737        cx_a.foreground().forbid_parking();
2738        let lang_registry = Arc::new(LanguageRegistry::new());
2739        let fs = FakeFs::new(cx_a.background());
2740        fs.insert_tree(
2741            "/root-1",
2742            json!({
2743                ".zed.toml": r#"collaborators = ["user_b"]"#,
2744                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2745            }),
2746        )
2747        .await;
2748
2749        // Set up a fake language server.
2750        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2751        lang_registry.add(Arc::new(Language::new(
2752            LanguageConfig {
2753                name: "Rust".into(),
2754                path_suffixes: vec!["rs".to_string()],
2755                language_server: Some(language_server_config),
2756                ..Default::default()
2757            },
2758            Some(tree_sitter_rust::language()),
2759        )));
2760
2761        // Connect to a server as 2 clients.
2762        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2763        let client_a = server.create_client(cx_a, "user_a").await;
2764        let client_b = server.create_client(cx_b, "user_b").await;
2765
2766        // Share a project as client A
2767        let project_a = cx_a.update(|cx| {
2768            Project::local(
2769                client_a.clone(),
2770                client_a.user_store.clone(),
2771                lang_registry.clone(),
2772                fs.clone(),
2773                cx,
2774            )
2775        });
2776        let (worktree_a, _) = project_a
2777            .update(cx_a, |p, cx| {
2778                p.find_or_create_local_worktree("/root-1", false, cx)
2779            })
2780            .await
2781            .unwrap();
2782        worktree_a
2783            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2784            .await;
2785        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2786        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2787        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2788
2789        // Join the worktree as client B.
2790        let project_b = Project::remote(
2791            project_id,
2792            client_b.clone(),
2793            client_b.user_store.clone(),
2794            lang_registry.clone(),
2795            fs.clone(),
2796            &mut cx_b.to_async(),
2797        )
2798        .await
2799        .unwrap();
2800
2801        // Open the file on client B.
2802        let buffer_b = cx_b
2803            .background()
2804            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2805            .await
2806            .unwrap();
2807
2808        // Request document highlights as the guest.
2809        let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2810
2811        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2812        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2813            |params, _| {
2814                assert_eq!(
2815                    params
2816                        .text_document_position_params
2817                        .text_document
2818                        .uri
2819                        .as_str(),
2820                    "file:///root-1/main.rs"
2821                );
2822                assert_eq!(
2823                    params.text_document_position_params.position,
2824                    lsp::Position::new(0, 34)
2825                );
2826                Some(vec![
2827                    lsp::DocumentHighlight {
2828                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2829                        range: lsp::Range::new(
2830                            lsp::Position::new(0, 10),
2831                            lsp::Position::new(0, 16),
2832                        ),
2833                    },
2834                    lsp::DocumentHighlight {
2835                        kind: Some(lsp::DocumentHighlightKind::READ),
2836                        range: lsp::Range::new(
2837                            lsp::Position::new(0, 32),
2838                            lsp::Position::new(0, 38),
2839                        ),
2840                    },
2841                    lsp::DocumentHighlight {
2842                        kind: Some(lsp::DocumentHighlightKind::READ),
2843                        range: lsp::Range::new(
2844                            lsp::Position::new(0, 41),
2845                            lsp::Position::new(0, 47),
2846                        ),
2847                    },
2848                ])
2849            },
2850        );
2851
2852        let highlights = highlights.await.unwrap();
2853        buffer_b.read_with(cx_b, |buffer, _| {
2854            let snapshot = buffer.snapshot();
2855
2856            let highlights = highlights
2857                .into_iter()
2858                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2859                .collect::<Vec<_>>();
2860            assert_eq!(
2861                highlights,
2862                &[
2863                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2864                    (lsp::DocumentHighlightKind::READ, 32..38),
2865                    (lsp::DocumentHighlightKind::READ, 41..47)
2866                ]
2867            )
2868        });
2869    }
2870
2871    #[gpui::test(iterations = 10)]
2872    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2873        cx_a.foreground().forbid_parking();
2874        let mut lang_registry = Arc::new(LanguageRegistry::new());
2875        let fs = FakeFs::new(cx_a.background());
2876        fs.insert_tree(
2877            "/code",
2878            json!({
2879                "crate-1": {
2880                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2881                    "one.rs": "const ONE: usize = 1;",
2882                },
2883                "crate-2": {
2884                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2885                },
2886                "private": {
2887                    "passwords.txt": "the-password",
2888                }
2889            }),
2890        )
2891        .await;
2892
2893        // Set up a fake language server.
2894        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2895        Arc::get_mut(&mut lang_registry)
2896            .unwrap()
2897            .add(Arc::new(Language::new(
2898                LanguageConfig {
2899                    name: "Rust".into(),
2900                    path_suffixes: vec!["rs".to_string()],
2901                    language_server: Some(language_server_config),
2902                    ..Default::default()
2903                },
2904                Some(tree_sitter_rust::language()),
2905            )));
2906
2907        // Connect to a server as 2 clients.
2908        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2909        let client_a = server.create_client(cx_a, "user_a").await;
2910        let client_b = server.create_client(cx_b, "user_b").await;
2911
2912        // Share a project as client A
2913        let project_a = cx_a.update(|cx| {
2914            Project::local(
2915                client_a.clone(),
2916                client_a.user_store.clone(),
2917                lang_registry.clone(),
2918                fs.clone(),
2919                cx,
2920            )
2921        });
2922        let (worktree_a, _) = project_a
2923            .update(cx_a, |p, cx| {
2924                p.find_or_create_local_worktree("/code/crate-1", false, cx)
2925            })
2926            .await
2927            .unwrap();
2928        worktree_a
2929            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2930            .await;
2931        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2932        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2933        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2934
2935        // Join the worktree as client B.
2936        let project_b = Project::remote(
2937            project_id,
2938            client_b.clone(),
2939            client_b.user_store.clone(),
2940            lang_registry.clone(),
2941            fs.clone(),
2942            &mut cx_b.to_async(),
2943        )
2944        .await
2945        .unwrap();
2946
2947        // Cause the language server to start.
2948        let _buffer = cx_b
2949            .background()
2950            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2951            .await
2952            .unwrap();
2953
2954        // Request the definition of a symbol as the guest.
2955        let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
2956        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2957        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2958            #[allow(deprecated)]
2959            Some(vec![lsp::SymbolInformation {
2960                name: "TWO".into(),
2961                location: lsp::Location {
2962                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2963                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2964                },
2965                kind: lsp::SymbolKind::CONSTANT,
2966                tags: None,
2967                container_name: None,
2968                deprecated: None,
2969            }])
2970        });
2971
2972        let symbols = symbols.await.unwrap();
2973        assert_eq!(symbols.len(), 1);
2974        assert_eq!(symbols[0].name, "TWO");
2975
2976        // Open one of the returned symbols.
2977        let buffer_b_2 = project_b
2978            .update(cx_b, |project, cx| {
2979                project.open_buffer_for_symbol(&symbols[0], cx)
2980            })
2981            .await
2982            .unwrap();
2983        buffer_b_2.read_with(cx_b, |buffer, _| {
2984            assert_eq!(
2985                buffer.file().unwrap().path().as_ref(),
2986                Path::new("../crate-2/two.rs")
2987            );
2988        });
2989
2990        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
2991        let mut fake_symbol = symbols[0].clone();
2992        fake_symbol.path = Path::new("/code/secrets").into();
2993        let error = project_b
2994            .update(cx_b, |project, cx| {
2995                project.open_buffer_for_symbol(&fake_symbol, cx)
2996            })
2997            .await
2998            .unwrap_err();
2999        assert!(error.to_string().contains("invalid symbol signature"));
3000    }
3001
3002    #[gpui::test(iterations = 10)]
3003    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3004        cx_a: &mut TestAppContext,
3005        cx_b: &mut TestAppContext,
3006        mut rng: StdRng,
3007    ) {
3008        cx_a.foreground().forbid_parking();
3009        let mut lang_registry = Arc::new(LanguageRegistry::new());
3010        let fs = FakeFs::new(cx_a.background());
3011        fs.insert_tree(
3012            "/root",
3013            json!({
3014                ".zed.toml": r#"collaborators = ["user_b"]"#,
3015                "a.rs": "const ONE: usize = b::TWO;",
3016                "b.rs": "const TWO: usize = 2",
3017            }),
3018        )
3019        .await;
3020
3021        // Set up a fake language server.
3022        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3023
3024        Arc::get_mut(&mut lang_registry)
3025            .unwrap()
3026            .add(Arc::new(Language::new(
3027                LanguageConfig {
3028                    name: "Rust".into(),
3029                    path_suffixes: vec!["rs".to_string()],
3030                    language_server: Some(language_server_config),
3031                    ..Default::default()
3032                },
3033                Some(tree_sitter_rust::language()),
3034            )));
3035
3036        // Connect to a server as 2 clients.
3037        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3038        let client_a = server.create_client(cx_a, "user_a").await;
3039        let client_b = server.create_client(cx_b, "user_b").await;
3040
3041        // Share a project as client A
3042        let project_a = cx_a.update(|cx| {
3043            Project::local(
3044                client_a.clone(),
3045                client_a.user_store.clone(),
3046                lang_registry.clone(),
3047                fs.clone(),
3048                cx,
3049            )
3050        });
3051
3052        let (worktree_a, _) = project_a
3053            .update(cx_a, |p, cx| {
3054                p.find_or_create_local_worktree("/root", false, cx)
3055            })
3056            .await
3057            .unwrap();
3058        worktree_a
3059            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3060            .await;
3061        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3062        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3063        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3064
3065        // Join the worktree as client B.
3066        let project_b = Project::remote(
3067            project_id,
3068            client_b.clone(),
3069            client_b.user_store.clone(),
3070            lang_registry.clone(),
3071            fs.clone(),
3072            &mut cx_b.to_async(),
3073        )
3074        .await
3075        .unwrap();
3076
3077        let buffer_b1 = cx_b
3078            .background()
3079            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3080            .await
3081            .unwrap();
3082
3083        let definitions;
3084        let buffer_b2;
3085        if rng.gen() {
3086            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3087            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3088        } else {
3089            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3090            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3091        }
3092
3093        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3094        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3095            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3096                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3097                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3098            )))
3099        });
3100
3101        let buffer_b2 = buffer_b2.await.unwrap();
3102        let definitions = definitions.await.unwrap();
3103        assert_eq!(definitions.len(), 1);
3104        assert_eq!(definitions[0].buffer, buffer_b2);
3105    }
3106
3107    #[gpui::test(iterations = 10)]
3108    async fn test_collaborating_with_code_actions(
3109        cx_a: &mut TestAppContext,
3110        cx_b: &mut TestAppContext,
3111    ) {
3112        cx_a.foreground().forbid_parking();
3113        let mut lang_registry = Arc::new(LanguageRegistry::new());
3114        let fs = FakeFs::new(cx_a.background());
3115        let mut path_openers_b = Vec::new();
3116        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3117
3118        // Set up a fake language server.
3119        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3120        Arc::get_mut(&mut lang_registry)
3121            .unwrap()
3122            .add(Arc::new(Language::new(
3123                LanguageConfig {
3124                    name: "Rust".into(),
3125                    path_suffixes: vec!["rs".to_string()],
3126                    language_server: Some(language_server_config),
3127                    ..Default::default()
3128                },
3129                Some(tree_sitter_rust::language()),
3130            )));
3131
3132        // Connect to a server as 2 clients.
3133        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3134        let client_a = server.create_client(cx_a, "user_a").await;
3135        let client_b = server.create_client(cx_b, "user_b").await;
3136
3137        // Share a project as client A
3138        fs.insert_tree(
3139            "/a",
3140            json!({
3141                ".zed.toml": r#"collaborators = ["user_b"]"#,
3142                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3143                "other.rs": "pub fn foo() -> usize { 4 }",
3144            }),
3145        )
3146        .await;
3147        let project_a = cx_a.update(|cx| {
3148            Project::local(
3149                client_a.clone(),
3150                client_a.user_store.clone(),
3151                lang_registry.clone(),
3152                fs.clone(),
3153                cx,
3154            )
3155        });
3156        let (worktree_a, _) = project_a
3157            .update(cx_a, |p, cx| {
3158                p.find_or_create_local_worktree("/a", false, cx)
3159            })
3160            .await
3161            .unwrap();
3162        worktree_a
3163            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3164            .await;
3165        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3166        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3167        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3168
3169        // Join the worktree as client B.
3170        let project_b = Project::remote(
3171            project_id,
3172            client_b.clone(),
3173            client_b.user_store.clone(),
3174            lang_registry.clone(),
3175            fs.clone(),
3176            &mut cx_b.to_async(),
3177        )
3178        .await
3179        .unwrap();
3180        let mut params = cx_b.update(WorkspaceParams::test);
3181        params.languages = lang_registry.clone();
3182        params.client = client_b.client.clone();
3183        params.user_store = client_b.user_store.clone();
3184        params.project = project_b;
3185        params.path_openers = path_openers_b.into();
3186
3187        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3188        let editor_b = workspace_b
3189            .update(cx_b, |workspace, cx| {
3190                workspace.open_path((worktree_id, "main.rs").into(), cx)
3191            })
3192            .await
3193            .unwrap()
3194            .downcast::<Editor>()
3195            .unwrap();
3196
3197        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3198        fake_language_server
3199            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3200                assert_eq!(
3201                    params.text_document.uri,
3202                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3203                );
3204                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3205                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3206                None
3207            })
3208            .next()
3209            .await;
3210
3211        // Move cursor to a location that contains code actions.
3212        editor_b.update(cx_b, |editor, cx| {
3213            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3214            cx.focus(&editor_b);
3215        });
3216
3217        fake_language_server
3218            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3219                assert_eq!(
3220                    params.text_document.uri,
3221                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3222                );
3223                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3224                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3225
3226                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3227                    lsp::CodeAction {
3228                        title: "Inline into all callers".to_string(),
3229                        edit: Some(lsp::WorkspaceEdit {
3230                            changes: Some(
3231                                [
3232                                    (
3233                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3234                                        vec![lsp::TextEdit::new(
3235                                            lsp::Range::new(
3236                                                lsp::Position::new(1, 22),
3237                                                lsp::Position::new(1, 34),
3238                                            ),
3239                                            "4".to_string(),
3240                                        )],
3241                                    ),
3242                                    (
3243                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3244                                        vec![lsp::TextEdit::new(
3245                                            lsp::Range::new(
3246                                                lsp::Position::new(0, 0),
3247                                                lsp::Position::new(0, 27),
3248                                            ),
3249                                            "".to_string(),
3250                                        )],
3251                                    ),
3252                                ]
3253                                .into_iter()
3254                                .collect(),
3255                            ),
3256                            ..Default::default()
3257                        }),
3258                        data: Some(json!({
3259                            "codeActionParams": {
3260                                "range": {
3261                                    "start": {"line": 1, "column": 31},
3262                                    "end": {"line": 1, "column": 31},
3263                                }
3264                            }
3265                        })),
3266                        ..Default::default()
3267                    },
3268                )])
3269            })
3270            .next()
3271            .await;
3272
3273        // Toggle code actions and wait for them to display.
3274        editor_b.update(cx_b, |editor, cx| {
3275            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3276        });
3277        editor_b
3278            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3279            .await;
3280
3281        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3282
3283        // Confirming the code action will trigger a resolve request.
3284        let confirm_action = workspace_b
3285            .update(cx_b, |workspace, cx| {
3286                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3287            })
3288            .unwrap();
3289        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3290            lsp::CodeAction {
3291                title: "Inline into all callers".to_string(),
3292                edit: Some(lsp::WorkspaceEdit {
3293                    changes: Some(
3294                        [
3295                            (
3296                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3297                                vec![lsp::TextEdit::new(
3298                                    lsp::Range::new(
3299                                        lsp::Position::new(1, 22),
3300                                        lsp::Position::new(1, 34),
3301                                    ),
3302                                    "4".to_string(),
3303                                )],
3304                            ),
3305                            (
3306                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3307                                vec![lsp::TextEdit::new(
3308                                    lsp::Range::new(
3309                                        lsp::Position::new(0, 0),
3310                                        lsp::Position::new(0, 27),
3311                                    ),
3312                                    "".to_string(),
3313                                )],
3314                            ),
3315                        ]
3316                        .into_iter()
3317                        .collect(),
3318                    ),
3319                    ..Default::default()
3320                }),
3321                ..Default::default()
3322            }
3323        });
3324
3325        // After the action is confirmed, an editor containing both modified files is opened.
3326        confirm_action.await.unwrap();
3327        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3328            workspace
3329                .active_item(cx)
3330                .unwrap()
3331                .downcast::<Editor>()
3332                .unwrap()
3333        });
3334        code_action_editor.update(cx_b, |editor, cx| {
3335            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3336            editor.undo(&Undo, cx);
3337            assert_eq!(
3338                editor.text(cx),
3339                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3340            );
3341            editor.redo(&Redo, cx);
3342            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3343        });
3344    }
3345
3346    #[gpui::test(iterations = 10)]
3347    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3348        cx_a.foreground().forbid_parking();
3349        let mut lang_registry = Arc::new(LanguageRegistry::new());
3350        let fs = FakeFs::new(cx_a.background());
3351        let mut path_openers_b = Vec::new();
3352        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3353
3354        // Set up a fake language server.
3355        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3356        Arc::get_mut(&mut lang_registry)
3357            .unwrap()
3358            .add(Arc::new(Language::new(
3359                LanguageConfig {
3360                    name: "Rust".into(),
3361                    path_suffixes: vec!["rs".to_string()],
3362                    language_server: Some(language_server_config),
3363                    ..Default::default()
3364                },
3365                Some(tree_sitter_rust::language()),
3366            )));
3367
3368        // Connect to a server as 2 clients.
3369        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3370        let client_a = server.create_client(cx_a, "user_a").await;
3371        let client_b = server.create_client(cx_b, "user_b").await;
3372
3373        // Share a project as client A
3374        fs.insert_tree(
3375            "/dir",
3376            json!({
3377                ".zed.toml": r#"collaborators = ["user_b"]"#,
3378                "one.rs": "const ONE: usize = 1;",
3379                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3380            }),
3381        )
3382        .await;
3383        let project_a = cx_a.update(|cx| {
3384            Project::local(
3385                client_a.clone(),
3386                client_a.user_store.clone(),
3387                lang_registry.clone(),
3388                fs.clone(),
3389                cx,
3390            )
3391        });
3392        let (worktree_a, _) = project_a
3393            .update(cx_a, |p, cx| {
3394                p.find_or_create_local_worktree("/dir", false, cx)
3395            })
3396            .await
3397            .unwrap();
3398        worktree_a
3399            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3400            .await;
3401        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3402        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3403        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3404
3405        // Join the worktree as client B.
3406        let project_b = Project::remote(
3407            project_id,
3408            client_b.clone(),
3409            client_b.user_store.clone(),
3410            lang_registry.clone(),
3411            fs.clone(),
3412            &mut cx_b.to_async(),
3413        )
3414        .await
3415        .unwrap();
3416        let mut params = cx_b.update(WorkspaceParams::test);
3417        params.languages = lang_registry.clone();
3418        params.client = client_b.client.clone();
3419        params.user_store = client_b.user_store.clone();
3420        params.project = project_b;
3421        params.path_openers = path_openers_b.into();
3422
3423        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3424        let editor_b = workspace_b
3425            .update(cx_b, |workspace, cx| {
3426                workspace.open_path((worktree_id, "one.rs").into(), cx)
3427            })
3428            .await
3429            .unwrap()
3430            .downcast::<Editor>()
3431            .unwrap();
3432        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3433
3434        // Move cursor to a location that can be renamed.
3435        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3436            editor.select_ranges([7..7], None, cx);
3437            editor.rename(&Rename, cx).unwrap()
3438        });
3439
3440        fake_language_server
3441            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3442                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3443                assert_eq!(params.position, lsp::Position::new(0, 7));
3444                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3445                    lsp::Position::new(0, 6),
3446                    lsp::Position::new(0, 9),
3447                )))
3448            })
3449            .next()
3450            .await
3451            .unwrap();
3452        prepare_rename.await.unwrap();
3453        editor_b.update(cx_b, |editor, cx| {
3454            let rename = editor.pending_rename().unwrap();
3455            let buffer = editor.buffer().read(cx).snapshot(cx);
3456            assert_eq!(
3457                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3458                6..9
3459            );
3460            rename.editor.update(cx, |rename_editor, cx| {
3461                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3462                    rename_buffer.edit([0..3], "THREE", cx);
3463                });
3464            });
3465        });
3466
3467        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3468            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3469        });
3470        fake_language_server
3471            .handle_request::<lsp::request::Rename, _>(|params, _| {
3472                assert_eq!(
3473                    params.text_document_position.text_document.uri.as_str(),
3474                    "file:///dir/one.rs"
3475                );
3476                assert_eq!(
3477                    params.text_document_position.position,
3478                    lsp::Position::new(0, 6)
3479                );
3480                assert_eq!(params.new_name, "THREE");
3481                Some(lsp::WorkspaceEdit {
3482                    changes: Some(
3483                        [
3484                            (
3485                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3486                                vec![lsp::TextEdit::new(
3487                                    lsp::Range::new(
3488                                        lsp::Position::new(0, 6),
3489                                        lsp::Position::new(0, 9),
3490                                    ),
3491                                    "THREE".to_string(),
3492                                )],
3493                            ),
3494                            (
3495                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3496                                vec![
3497                                    lsp::TextEdit::new(
3498                                        lsp::Range::new(
3499                                            lsp::Position::new(0, 24),
3500                                            lsp::Position::new(0, 27),
3501                                        ),
3502                                        "THREE".to_string(),
3503                                    ),
3504                                    lsp::TextEdit::new(
3505                                        lsp::Range::new(
3506                                            lsp::Position::new(0, 35),
3507                                            lsp::Position::new(0, 38),
3508                                        ),
3509                                        "THREE".to_string(),
3510                                    ),
3511                                ],
3512                            ),
3513                        ]
3514                        .into_iter()
3515                        .collect(),
3516                    ),
3517                    ..Default::default()
3518                })
3519            })
3520            .next()
3521            .await
3522            .unwrap();
3523        confirm_rename.await.unwrap();
3524
3525        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3526            workspace
3527                .active_item(cx)
3528                .unwrap()
3529                .downcast::<Editor>()
3530                .unwrap()
3531        });
3532        rename_editor.update(cx_b, |editor, cx| {
3533            assert_eq!(
3534                editor.text(cx),
3535                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3536            );
3537            editor.undo(&Undo, cx);
3538            assert_eq!(
3539                editor.text(cx),
3540                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3541            );
3542            editor.redo(&Redo, cx);
3543            assert_eq!(
3544                editor.text(cx),
3545                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3546            );
3547        });
3548
3549        // Ensure temporary rename edits cannot be undone/redone.
3550        editor_b.update(cx_b, |editor, cx| {
3551            editor.undo(&Undo, cx);
3552            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3553            editor.undo(&Undo, cx);
3554            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3555            editor.redo(&Redo, cx);
3556            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3557        })
3558    }
3559
3560    #[gpui::test(iterations = 10)]
3561    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3562        cx_a.foreground().forbid_parking();
3563
3564        // Connect to a server as 2 clients.
3565        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3566        let client_a = server.create_client(cx_a, "user_a").await;
3567        let client_b = server.create_client(cx_b, "user_b").await;
3568
3569        // Create an org that includes these 2 users.
3570        let db = &server.app_state.db;
3571        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3572        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3573            .await
3574            .unwrap();
3575        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3576            .await
3577            .unwrap();
3578
3579        // Create a channel that includes all the users.
3580        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3581        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3582            .await
3583            .unwrap();
3584        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3585            .await
3586            .unwrap();
3587        db.create_channel_message(
3588            channel_id,
3589            client_b.current_user_id(&cx_b),
3590            "hello A, it's B.",
3591            OffsetDateTime::now_utc(),
3592            1,
3593        )
3594        .await
3595        .unwrap();
3596
3597        let channels_a = cx_a
3598            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3599        channels_a
3600            .condition(cx_a, |list, _| list.available_channels().is_some())
3601            .await;
3602        channels_a.read_with(cx_a, |list, _| {
3603            assert_eq!(
3604                list.available_channels().unwrap(),
3605                &[ChannelDetails {
3606                    id: channel_id.to_proto(),
3607                    name: "test-channel".to_string()
3608                }]
3609            )
3610        });
3611        let channel_a = channels_a.update(cx_a, |this, cx| {
3612            this.get_channel(channel_id.to_proto(), cx).unwrap()
3613        });
3614        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3615        channel_a
3616            .condition(&cx_a, |channel, _| {
3617                channel_messages(channel)
3618                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3619            })
3620            .await;
3621
3622        let channels_b = cx_b
3623            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3624        channels_b
3625            .condition(cx_b, |list, _| list.available_channels().is_some())
3626            .await;
3627        channels_b.read_with(cx_b, |list, _| {
3628            assert_eq!(
3629                list.available_channels().unwrap(),
3630                &[ChannelDetails {
3631                    id: channel_id.to_proto(),
3632                    name: "test-channel".to_string()
3633                }]
3634            )
3635        });
3636
3637        let channel_b = channels_b.update(cx_b, |this, cx| {
3638            this.get_channel(channel_id.to_proto(), cx).unwrap()
3639        });
3640        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3641        channel_b
3642            .condition(&cx_b, |channel, _| {
3643                channel_messages(channel)
3644                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3645            })
3646            .await;
3647
3648        channel_a
3649            .update(cx_a, |channel, cx| {
3650                channel
3651                    .send_message("oh, hi B.".to_string(), cx)
3652                    .unwrap()
3653                    .detach();
3654                let task = channel.send_message("sup".to_string(), cx).unwrap();
3655                assert_eq!(
3656                    channel_messages(channel),
3657                    &[
3658                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3659                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3660                        ("user_a".to_string(), "sup".to_string(), true)
3661                    ]
3662                );
3663                task
3664            })
3665            .await
3666            .unwrap();
3667
3668        channel_b
3669            .condition(&cx_b, |channel, _| {
3670                channel_messages(channel)
3671                    == [
3672                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3673                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3674                        ("user_a".to_string(), "sup".to_string(), false),
3675                    ]
3676            })
3677            .await;
3678
3679        assert_eq!(
3680            server
3681                .state()
3682                .await
3683                .channel(channel_id)
3684                .unwrap()
3685                .connection_ids
3686                .len(),
3687            2
3688        );
3689        cx_b.update(|_| drop(channel_b));
3690        server
3691            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3692            .await;
3693
3694        cx_a.update(|_| drop(channel_a));
3695        server
3696            .condition(|state| state.channel(channel_id).is_none())
3697            .await;
3698    }
3699
3700    #[gpui::test(iterations = 10)]
3701    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3702        cx_a.foreground().forbid_parking();
3703
3704        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3705        let client_a = server.create_client(cx_a, "user_a").await;
3706
3707        let db = &server.app_state.db;
3708        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3709        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3710        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3711            .await
3712            .unwrap();
3713        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3714            .await
3715            .unwrap();
3716
3717        let channels_a = cx_a
3718            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3719        channels_a
3720            .condition(cx_a, |list, _| list.available_channels().is_some())
3721            .await;
3722        let channel_a = channels_a.update(cx_a, |this, cx| {
3723            this.get_channel(channel_id.to_proto(), cx).unwrap()
3724        });
3725
3726        // Messages aren't allowed to be too long.
3727        channel_a
3728            .update(cx_a, |channel, cx| {
3729                let long_body = "this is long.\n".repeat(1024);
3730                channel.send_message(long_body, cx).unwrap()
3731            })
3732            .await
3733            .unwrap_err();
3734
3735        // Messages aren't allowed to be blank.
3736        channel_a.update(cx_a, |channel, cx| {
3737            channel.send_message(String::new(), cx).unwrap_err()
3738        });
3739
3740        // Leading and trailing whitespace are trimmed.
3741        channel_a
3742            .update(cx_a, |channel, cx| {
3743                channel
3744                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3745                    .unwrap()
3746            })
3747            .await
3748            .unwrap();
3749        assert_eq!(
3750            db.get_channel_messages(channel_id, 10, None)
3751                .await
3752                .unwrap()
3753                .iter()
3754                .map(|m| &m.body)
3755                .collect::<Vec<_>>(),
3756            &["surrounded by whitespace"]
3757        );
3758    }
3759
3760    #[gpui::test(iterations = 10)]
3761    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3762        cx_a.foreground().forbid_parking();
3763
3764        // Connect to a server as 2 clients.
3765        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3766        let client_a = server.create_client(cx_a, "user_a").await;
3767        let client_b = server.create_client(cx_b, "user_b").await;
3768        let mut status_b = client_b.status();
3769
3770        // Create an org that includes these 2 users.
3771        let db = &server.app_state.db;
3772        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3773        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3774            .await
3775            .unwrap();
3776        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3777            .await
3778            .unwrap();
3779
3780        // Create a channel that includes all the users.
3781        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3782        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3783            .await
3784            .unwrap();
3785        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3786            .await
3787            .unwrap();
3788        db.create_channel_message(
3789            channel_id,
3790            client_b.current_user_id(&cx_b),
3791            "hello A, it's B.",
3792            OffsetDateTime::now_utc(),
3793            2,
3794        )
3795        .await
3796        .unwrap();
3797
3798        let channels_a = cx_a
3799            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3800        channels_a
3801            .condition(cx_a, |list, _| list.available_channels().is_some())
3802            .await;
3803
3804        channels_a.read_with(cx_a, |list, _| {
3805            assert_eq!(
3806                list.available_channels().unwrap(),
3807                &[ChannelDetails {
3808                    id: channel_id.to_proto(),
3809                    name: "test-channel".to_string()
3810                }]
3811            )
3812        });
3813        let channel_a = channels_a.update(cx_a, |this, cx| {
3814            this.get_channel(channel_id.to_proto(), cx).unwrap()
3815        });
3816        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3817        channel_a
3818            .condition(&cx_a, |channel, _| {
3819                channel_messages(channel)
3820                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3821            })
3822            .await;
3823
3824        let channels_b = cx_b
3825            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3826        channels_b
3827            .condition(cx_b, |list, _| list.available_channels().is_some())
3828            .await;
3829        channels_b.read_with(cx_b, |list, _| {
3830            assert_eq!(
3831                list.available_channels().unwrap(),
3832                &[ChannelDetails {
3833                    id: channel_id.to_proto(),
3834                    name: "test-channel".to_string()
3835                }]
3836            )
3837        });
3838
3839        let channel_b = channels_b.update(cx_b, |this, cx| {
3840            this.get_channel(channel_id.to_proto(), cx).unwrap()
3841        });
3842        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3843        channel_b
3844            .condition(&cx_b, |channel, _| {
3845                channel_messages(channel)
3846                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3847            })
3848            .await;
3849
3850        // Disconnect client B, ensuring we can still access its cached channel data.
3851        server.forbid_connections();
3852        server.disconnect_client(client_b.current_user_id(&cx_b));
3853        while !matches!(
3854            status_b.next().await,
3855            Some(client::Status::ReconnectionError { .. })
3856        ) {}
3857
3858        channels_b.read_with(cx_b, |channels, _| {
3859            assert_eq!(
3860                channels.available_channels().unwrap(),
3861                [ChannelDetails {
3862                    id: channel_id.to_proto(),
3863                    name: "test-channel".to_string()
3864                }]
3865            )
3866        });
3867        channel_b.read_with(cx_b, |channel, _| {
3868            assert_eq!(
3869                channel_messages(channel),
3870                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3871            )
3872        });
3873
3874        // Send a message from client B while it is disconnected.
3875        channel_b
3876            .update(cx_b, |channel, cx| {
3877                let task = channel
3878                    .send_message("can you see this?".to_string(), cx)
3879                    .unwrap();
3880                assert_eq!(
3881                    channel_messages(channel),
3882                    &[
3883                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3884                        ("user_b".to_string(), "can you see this?".to_string(), true)
3885                    ]
3886                );
3887                task
3888            })
3889            .await
3890            .unwrap_err();
3891
3892        // Send a message from client A while B is disconnected.
3893        channel_a
3894            .update(cx_a, |channel, cx| {
3895                channel
3896                    .send_message("oh, hi B.".to_string(), cx)
3897                    .unwrap()
3898                    .detach();
3899                let task = channel.send_message("sup".to_string(), cx).unwrap();
3900                assert_eq!(
3901                    channel_messages(channel),
3902                    &[
3903                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3904                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3905                        ("user_a".to_string(), "sup".to_string(), true)
3906                    ]
3907                );
3908                task
3909            })
3910            .await
3911            .unwrap();
3912
3913        // Give client B a chance to reconnect.
3914        server.allow_connections();
3915        cx_b.foreground().advance_clock(Duration::from_secs(10));
3916
3917        // Verify that B sees the new messages upon reconnection, as well as the message client B
3918        // sent while offline.
3919        channel_b
3920            .condition(&cx_b, |channel, _| {
3921                channel_messages(channel)
3922                    == [
3923                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3924                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3925                        ("user_a".to_string(), "sup".to_string(), false),
3926                        ("user_b".to_string(), "can you see this?".to_string(), false),
3927                    ]
3928            })
3929            .await;
3930
3931        // Ensure client A and B can communicate normally after reconnection.
3932        channel_a
3933            .update(cx_a, |channel, cx| {
3934                channel.send_message("you online?".to_string(), cx).unwrap()
3935            })
3936            .await
3937            .unwrap();
3938        channel_b
3939            .condition(&cx_b, |channel, _| {
3940                channel_messages(channel)
3941                    == [
3942                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3943                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3944                        ("user_a".to_string(), "sup".to_string(), false),
3945                        ("user_b".to_string(), "can you see this?".to_string(), false),
3946                        ("user_a".to_string(), "you online?".to_string(), false),
3947                    ]
3948            })
3949            .await;
3950
3951        channel_b
3952            .update(cx_b, |channel, cx| {
3953                channel.send_message("yep".to_string(), cx).unwrap()
3954            })
3955            .await
3956            .unwrap();
3957        channel_a
3958            .condition(&cx_a, |channel, _| {
3959                channel_messages(channel)
3960                    == [
3961                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3962                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3963                        ("user_a".to_string(), "sup".to_string(), false),
3964                        ("user_b".to_string(), "can you see this?".to_string(), false),
3965                        ("user_a".to_string(), "you online?".to_string(), false),
3966                        ("user_b".to_string(), "yep".to_string(), false),
3967                    ]
3968            })
3969            .await;
3970    }
3971
3972    #[gpui::test(iterations = 10)]
3973    async fn test_contacts(
3974        cx_a: &mut TestAppContext,
3975        cx_b: &mut TestAppContext,
3976        cx_c: &mut TestAppContext,
3977    ) {
3978        cx_a.foreground().forbid_parking();
3979        let lang_registry = Arc::new(LanguageRegistry::new());
3980        let fs = FakeFs::new(cx_a.background());
3981
3982        // Connect to a server as 3 clients.
3983        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3984        let client_a = server.create_client(cx_a, "user_a").await;
3985        let client_b = server.create_client(cx_b, "user_b").await;
3986        let client_c = server.create_client(cx_c, "user_c").await;
3987
3988        // Share a worktree as client A.
3989        fs.insert_tree(
3990            "/a",
3991            json!({
3992                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3993            }),
3994        )
3995        .await;
3996
3997        let project_a = cx_a.update(|cx| {
3998            Project::local(
3999                client_a.clone(),
4000                client_a.user_store.clone(),
4001                lang_registry.clone(),
4002                fs.clone(),
4003                cx,
4004            )
4005        });
4006        let (worktree_a, _) = project_a
4007            .update(cx_a, |p, cx| {
4008                p.find_or_create_local_worktree("/a", false, cx)
4009            })
4010            .await
4011            .unwrap();
4012        worktree_a
4013            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4014            .await;
4015
4016        client_a
4017            .user_store
4018            .condition(&cx_a, |user_store, _| {
4019                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4020            })
4021            .await;
4022        client_b
4023            .user_store
4024            .condition(&cx_b, |user_store, _| {
4025                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4026            })
4027            .await;
4028        client_c
4029            .user_store
4030            .condition(&cx_c, |user_store, _| {
4031                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4032            })
4033            .await;
4034
4035        let project_id = project_a
4036            .update(cx_a, |project, _| project.next_remote_id())
4037            .await;
4038        project_a
4039            .update(cx_a, |project, cx| project.share(cx))
4040            .await
4041            .unwrap();
4042
4043        let _project_b = Project::remote(
4044            project_id,
4045            client_b.clone(),
4046            client_b.user_store.clone(),
4047            lang_registry.clone(),
4048            fs.clone(),
4049            &mut cx_b.to_async(),
4050        )
4051        .await
4052        .unwrap();
4053
4054        client_a
4055            .user_store
4056            .condition(&cx_a, |user_store, _| {
4057                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4058            })
4059            .await;
4060        client_b
4061            .user_store
4062            .condition(&cx_b, |user_store, _| {
4063                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4064            })
4065            .await;
4066        client_c
4067            .user_store
4068            .condition(&cx_c, |user_store, _| {
4069                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4070            })
4071            .await;
4072
4073        project_a
4074            .condition(&cx_a, |project, _| {
4075                project.collaborators().contains_key(&client_b.peer_id)
4076            })
4077            .await;
4078
4079        cx_a.update(move |_| drop(project_a));
4080        client_a
4081            .user_store
4082            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4083            .await;
4084        client_b
4085            .user_store
4086            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4087            .await;
4088        client_c
4089            .user_store
4090            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4091            .await;
4092
4093        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4094            user_store
4095                .contacts()
4096                .iter()
4097                .map(|contact| {
4098                    let worktrees = contact
4099                        .projects
4100                        .iter()
4101                        .map(|p| {
4102                            (
4103                                p.worktree_root_names[0].as_str(),
4104                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4105                            )
4106                        })
4107                        .collect();
4108                    (contact.user.github_login.as_str(), worktrees)
4109                })
4110                .collect()
4111        }
4112    }
4113
4114    #[gpui::test(iterations = 100)]
4115    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4116        cx.foreground().forbid_parking();
4117        let max_peers = env::var("MAX_PEERS")
4118            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4119            .unwrap_or(5);
4120        let max_operations = env::var("OPERATIONS")
4121            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4122            .unwrap_or(10);
4123
4124        let rng = Arc::new(Mutex::new(rng));
4125
4126        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4127        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4128
4129        let fs = FakeFs::new(cx.background());
4130        fs.insert_tree(
4131            "/_collab",
4132            json!({
4133                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4134            }),
4135        )
4136        .await;
4137
4138        let operations = Rc::new(Cell::new(0));
4139        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4140        let mut clients = Vec::new();
4141
4142        let mut next_entity_id = 100000;
4143        let mut host_cx = TestAppContext::new(
4144            cx.foreground_platform(),
4145            cx.platform(),
4146            cx.foreground(),
4147            cx.background(),
4148            cx.font_cache(),
4149            cx.leak_detector(),
4150            next_entity_id,
4151        );
4152        let host = server.create_client(&mut host_cx, "host").await;
4153        let host_project = host_cx.update(|cx| {
4154            Project::local(
4155                host.client.clone(),
4156                host.user_store.clone(),
4157                Arc::new(LanguageRegistry::new()),
4158                fs.clone(),
4159                cx,
4160            )
4161        });
4162        let host_project_id = host_project
4163            .update(&mut host_cx, |p, _| p.next_remote_id())
4164            .await;
4165
4166        let (collab_worktree, _) = host_project
4167            .update(&mut host_cx, |project, cx| {
4168                project.find_or_create_local_worktree("/_collab", false, cx)
4169            })
4170            .await
4171            .unwrap();
4172        collab_worktree
4173            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4174            .await;
4175        host_project
4176            .update(&mut host_cx, |project, cx| project.share(cx))
4177            .await
4178            .unwrap();
4179
4180        clients.push(cx.foreground().spawn(host.simulate_host(
4181            host_project,
4182            language_server_config,
4183            operations.clone(),
4184            max_operations,
4185            rng.clone(),
4186            host_cx,
4187        )));
4188
4189        while operations.get() < max_operations {
4190            cx.background().simulate_random_delay().await;
4191            if clients.len() >= max_peers {
4192                break;
4193            } else if rng.lock().gen_bool(0.05) {
4194                operations.set(operations.get() + 1);
4195
4196                let guest_id = clients.len();
4197                log::info!("Adding guest {}", guest_id);
4198                next_entity_id += 100000;
4199                let mut guest_cx = TestAppContext::new(
4200                    cx.foreground_platform(),
4201                    cx.platform(),
4202                    cx.foreground(),
4203                    cx.background(),
4204                    cx.font_cache(),
4205                    cx.leak_detector(),
4206                    next_entity_id,
4207                );
4208                let guest = server
4209                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4210                    .await;
4211                let guest_project = Project::remote(
4212                    host_project_id,
4213                    guest.client.clone(),
4214                    guest.user_store.clone(),
4215                    guest_lang_registry.clone(),
4216                    FakeFs::new(cx.background()),
4217                    &mut guest_cx.to_async(),
4218                )
4219                .await
4220                .unwrap();
4221                clients.push(cx.foreground().spawn(guest.simulate_guest(
4222                    guest_id,
4223                    guest_project,
4224                    operations.clone(),
4225                    max_operations,
4226                    rng.clone(),
4227                    guest_cx,
4228                )));
4229
4230                log::info!("Guest {} added", guest_id);
4231            }
4232        }
4233
4234        let mut clients = futures::future::join_all(clients).await;
4235        cx.foreground().run_until_parked();
4236
4237        let (host_client, mut host_cx) = clients.remove(0);
4238        let host_project = host_client.project.as_ref().unwrap();
4239        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4240            project
4241                .worktrees(cx)
4242                .map(|worktree| {
4243                    let snapshot = worktree.read(cx).snapshot();
4244                    (snapshot.id(), snapshot)
4245                })
4246                .collect::<BTreeMap<_, _>>()
4247        });
4248
4249        for (guest_client, mut guest_cx) in clients.into_iter() {
4250            let guest_id = guest_client.client.id();
4251            let worktree_snapshots =
4252                guest_client
4253                    .project
4254                    .as_ref()
4255                    .unwrap()
4256                    .read_with(&guest_cx, |project, cx| {
4257                        project
4258                            .worktrees(cx)
4259                            .map(|worktree| {
4260                                let worktree = worktree.read(cx);
4261                                (worktree.id(), worktree.snapshot())
4262                            })
4263                            .collect::<BTreeMap<_, _>>()
4264                    });
4265
4266            assert_eq!(
4267                worktree_snapshots.keys().collect::<Vec<_>>(),
4268                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4269                "guest {} has different worktrees than the host",
4270                guest_id
4271            );
4272            for (id, host_snapshot) in &host_worktree_snapshots {
4273                let guest_snapshot = &worktree_snapshots[id];
4274                assert_eq!(
4275                    guest_snapshot.root_name(),
4276                    host_snapshot.root_name(),
4277                    "guest {} has different root name than the host for worktree {}",
4278                    guest_id,
4279                    id
4280                );
4281                assert_eq!(
4282                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4283                    host_snapshot.entries(false).collect::<Vec<_>>(),
4284                    "guest {} has different snapshot than the host for worktree {}",
4285                    guest_id,
4286                    id
4287                );
4288            }
4289
4290            guest_client
4291                .project
4292                .as_ref()
4293                .unwrap()
4294                .read_with(&guest_cx, |project, cx| {
4295                    assert!(
4296                        !project.has_deferred_operations(cx),
4297                        "guest {} has deferred operations",
4298                        guest_id,
4299                    );
4300                });
4301
4302            for guest_buffer in &guest_client.buffers {
4303                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4304                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4305                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4306                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4307                        guest_id, guest_client.peer_id, buffer_id
4308                    ))
4309                });
4310                assert_eq!(
4311                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4312                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4313                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4314                    guest_id,
4315                    buffer_id,
4316                    host_buffer
4317                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4318                );
4319            }
4320
4321            guest_cx.update(|_| drop(guest_client));
4322        }
4323
4324        host_cx.update(|_| drop(host_client));
4325    }
4326
4327    struct TestServer {
4328        peer: Arc<Peer>,
4329        app_state: Arc<AppState>,
4330        server: Arc<Server>,
4331        foreground: Rc<executor::Foreground>,
4332        notifications: mpsc::UnboundedReceiver<()>,
4333        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4334        forbid_connections: Arc<AtomicBool>,
4335        _test_db: TestDb,
4336    }
4337
4338    impl TestServer {
4339        async fn start(
4340            foreground: Rc<executor::Foreground>,
4341            background: Arc<executor::Background>,
4342        ) -> Self {
4343            let test_db = TestDb::fake(background);
4344            let app_state = Self::build_app_state(&test_db).await;
4345            let peer = Peer::new();
4346            let notifications = mpsc::unbounded();
4347            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4348            Self {
4349                peer,
4350                app_state,
4351                server,
4352                foreground,
4353                notifications: notifications.1,
4354                connection_killers: Default::default(),
4355                forbid_connections: Default::default(),
4356                _test_db: test_db,
4357            }
4358        }
4359
4360        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4361            let http = FakeHttpClient::with_404_response();
4362            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4363            let client_name = name.to_string();
4364            let mut client = Client::new(http.clone());
4365            let server = self.server.clone();
4366            let connection_killers = self.connection_killers.clone();
4367            let forbid_connections = self.forbid_connections.clone();
4368            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4369
4370            Arc::get_mut(&mut client)
4371                .unwrap()
4372                .override_authenticate(move |cx| {
4373                    cx.spawn(|_| async move {
4374                        let access_token = "the-token".to_string();
4375                        Ok(Credentials {
4376                            user_id: user_id.0 as u64,
4377                            access_token,
4378                        })
4379                    })
4380                })
4381                .override_establish_connection(move |credentials, cx| {
4382                    assert_eq!(credentials.user_id, user_id.0 as u64);
4383                    assert_eq!(credentials.access_token, "the-token");
4384
4385                    let server = server.clone();
4386                    let connection_killers = connection_killers.clone();
4387                    let forbid_connections = forbid_connections.clone();
4388                    let client_name = client_name.clone();
4389                    let connection_id_tx = connection_id_tx.clone();
4390                    cx.spawn(move |cx| async move {
4391                        if forbid_connections.load(SeqCst) {
4392                            Err(EstablishConnectionError::other(anyhow!(
4393                                "server is forbidding connections"
4394                            )))
4395                        } else {
4396                            let (client_conn, server_conn, kill_conn) =
4397                                Connection::in_memory(cx.background());
4398                            connection_killers.lock().insert(user_id, kill_conn);
4399                            cx.background()
4400                                .spawn(server.handle_connection(
4401                                    server_conn,
4402                                    client_name,
4403                                    user_id,
4404                                    Some(connection_id_tx),
4405                                    cx.background(),
4406                                ))
4407                                .detach();
4408                            Ok(client_conn)
4409                        }
4410                    })
4411                });
4412
4413            client
4414                .authenticate_and_connect(&cx.to_async())
4415                .await
4416                .unwrap();
4417
4418            Channel::init(&client);
4419            Project::init(&client);
4420
4421            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4422            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4423            let mut authed_user =
4424                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4425            while authed_user.next().await.unwrap().is_none() {}
4426
4427            TestClient {
4428                client,
4429                peer_id,
4430                user_store,
4431                project: Default::default(),
4432                buffers: Default::default(),
4433            }
4434        }
4435
4436        fn disconnect_client(&self, user_id: UserId) {
4437            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4438                let _ = kill_conn.try_send(Some(()));
4439            }
4440        }
4441
4442        fn forbid_connections(&self) {
4443            self.forbid_connections.store(true, SeqCst);
4444        }
4445
4446        fn allow_connections(&self) {
4447            self.forbid_connections.store(false, SeqCst);
4448        }
4449
4450        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4451            let mut config = Config::default();
4452            config.session_secret = "a".repeat(32);
4453            config.database_url = test_db.url.clone();
4454            let github_client = github::AppClient::test();
4455            Arc::new(AppState {
4456                db: test_db.db().clone(),
4457                handlebars: Default::default(),
4458                auth_client: auth::build_client("", ""),
4459                repo_client: github::RepoClient::test(&github_client),
4460                github_client,
4461                config,
4462            })
4463        }
4464
4465        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4466            self.server.store.read()
4467        }
4468
4469        async fn condition<F>(&mut self, mut predicate: F)
4470        where
4471            F: FnMut(&Store) -> bool,
4472        {
4473            async_std::future::timeout(Duration::from_millis(500), async {
4474                while !(predicate)(&*self.server.store.read()) {
4475                    self.foreground.start_waiting();
4476                    self.notifications.next().await;
4477                    self.foreground.finish_waiting();
4478                }
4479            })
4480            .await
4481            .expect("condition timed out");
4482        }
4483    }
4484
4485    impl Drop for TestServer {
4486        fn drop(&mut self) {
4487            self.peer.reset();
4488        }
4489    }
4490
4491    struct TestClient {
4492        client: Arc<Client>,
4493        pub peer_id: PeerId,
4494        pub user_store: ModelHandle<UserStore>,
4495        project: Option<ModelHandle<Project>>,
4496        buffers: HashSet<ModelHandle<language::Buffer>>,
4497    }
4498
4499    impl Deref for TestClient {
4500        type Target = Arc<Client>;
4501
4502        fn deref(&self) -> &Self::Target {
4503            &self.client
4504        }
4505    }
4506
4507    impl TestClient {
4508        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4509            UserId::from_proto(
4510                self.user_store
4511                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4512            )
4513        }
4514
4515        fn simulate_host(
4516            mut self,
4517            project: ModelHandle<Project>,
4518            mut language_server_config: LanguageServerConfig,
4519            operations: Rc<Cell<usize>>,
4520            max_operations: usize,
4521            rng: Arc<Mutex<StdRng>>,
4522            mut cx: TestAppContext,
4523        ) -> impl Future<Output = (Self, TestAppContext)> {
4524            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4525
4526            // Set up a fake language server.
4527            language_server_config.set_fake_initializer({
4528                let rng = rng.clone();
4529                let files = files.clone();
4530                let project = project.downgrade();
4531                move |fake_server| {
4532                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4533                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4534                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4535                                range: lsp::Range::new(
4536                                    lsp::Position::new(0, 0),
4537                                    lsp::Position::new(0, 0),
4538                                ),
4539                                new_text: "the-new-text".to_string(),
4540                            })),
4541                            ..Default::default()
4542                        }]))
4543                    });
4544
4545                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4546                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4547                            lsp::CodeAction {
4548                                title: "the-code-action".to_string(),
4549                                ..Default::default()
4550                            },
4551                        )])
4552                    });
4553
4554                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4555                        |params, _| {
4556                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4557                                params.position,
4558                                params.position,
4559                            )))
4560                        },
4561                    );
4562
4563                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4564                        let files = files.clone();
4565                        let rng = rng.clone();
4566                        move |_, _| {
4567                            let files = files.lock();
4568                            let mut rng = rng.lock();
4569                            let count = rng.gen_range::<usize, _>(1..3);
4570                            let files = (0..count)
4571                                .map(|_| files.choose(&mut *rng).unwrap())
4572                                .collect::<Vec<_>>();
4573                            log::info!("LSP: Returning definitions in files {:?}", &files);
4574                            Some(lsp::GotoDefinitionResponse::Array(
4575                                files
4576                                    .into_iter()
4577                                    .map(|file| lsp::Location {
4578                                        uri: lsp::Url::from_file_path(file).unwrap(),
4579                                        range: Default::default(),
4580                                    })
4581                                    .collect(),
4582                            ))
4583                        }
4584                    });
4585
4586                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4587                        let rng = rng.clone();
4588                        let project = project.clone();
4589                        move |params, mut cx| {
4590                            if let Some(project) = project.upgrade(&cx) {
4591                                project.update(&mut cx, |project, cx| {
4592                                    let path = params
4593                                        .text_document_position_params
4594                                        .text_document
4595                                        .uri
4596                                        .to_file_path()
4597                                        .unwrap();
4598                                    let (worktree, relative_path) =
4599                                        project.find_local_worktree(&path, cx)?;
4600                                    let project_path =
4601                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4602                                    let buffer =
4603                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4604
4605                                    let mut highlights = Vec::new();
4606                                    let highlight_count = rng.lock().gen_range(1..=5);
4607                                    let mut prev_end = 0;
4608                                    for _ in 0..highlight_count {
4609                                        let range =
4610                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4611                                        let start = buffer
4612                                            .offset_to_point_utf16(range.start)
4613                                            .to_lsp_position();
4614                                        let end = buffer
4615                                            .offset_to_point_utf16(range.end)
4616                                            .to_lsp_position();
4617                                        highlights.push(lsp::DocumentHighlight {
4618                                            range: lsp::Range::new(start, end),
4619                                            kind: Some(lsp::DocumentHighlightKind::READ),
4620                                        });
4621                                        prev_end = range.end;
4622                                    }
4623                                    Some(highlights)
4624                                })
4625                            } else {
4626                                None
4627                            }
4628                        }
4629                    });
4630                }
4631            });
4632
4633            project.update(&mut cx, |project, _| {
4634                project.languages().add(Arc::new(Language::new(
4635                    LanguageConfig {
4636                        name: "Rust".into(),
4637                        path_suffixes: vec!["rs".to_string()],
4638                        language_server: Some(language_server_config),
4639                        ..Default::default()
4640                    },
4641                    None,
4642                )));
4643            });
4644
4645            async move {
4646                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4647                while operations.get() < max_operations {
4648                    operations.set(operations.get() + 1);
4649
4650                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4651                    match distribution {
4652                        0..=20 if !files.lock().is_empty() => {
4653                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4654                            let mut path = path.as_path();
4655                            while let Some(parent_path) = path.parent() {
4656                                path = parent_path;
4657                                if rng.lock().gen() {
4658                                    break;
4659                                }
4660                            }
4661
4662                            log::info!("Host: find/create local worktree {:?}", path);
4663                            project
4664                                .update(&mut cx, |project, cx| {
4665                                    project.find_or_create_local_worktree(path, false, cx)
4666                                })
4667                                .await
4668                                .unwrap();
4669                        }
4670                        10..=80 if !files.lock().is_empty() => {
4671                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4672                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4673                                let (worktree, path) = project
4674                                    .update(&mut cx, |project, cx| {
4675                                        project.find_or_create_local_worktree(
4676                                            file.clone(),
4677                                            false,
4678                                            cx,
4679                                        )
4680                                    })
4681                                    .await
4682                                    .unwrap();
4683                                let project_path =
4684                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4685                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
4686                                let buffer = project
4687                                    .update(&mut cx, |project, cx| {
4688                                        project.open_buffer(project_path, cx)
4689                                    })
4690                                    .await
4691                                    .unwrap();
4692                                self.buffers.insert(buffer.clone());
4693                                buffer
4694                            } else {
4695                                self.buffers
4696                                    .iter()
4697                                    .choose(&mut *rng.lock())
4698                                    .unwrap()
4699                                    .clone()
4700                            };
4701
4702                            if rng.lock().gen_bool(0.1) {
4703                                cx.update(|cx| {
4704                                    log::info!(
4705                                        "Host: dropping buffer {:?}",
4706                                        buffer.read(cx).file().unwrap().full_path(cx)
4707                                    );
4708                                    self.buffers.remove(&buffer);
4709                                    drop(buffer);
4710                                });
4711                            } else {
4712                                buffer.update(&mut cx, |buffer, cx| {
4713                                    log::info!(
4714                                        "Host: updating buffer {:?} ({})",
4715                                        buffer.file().unwrap().full_path(cx),
4716                                        buffer.remote_id()
4717                                    );
4718                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4719                                });
4720                            }
4721                        }
4722                        _ => loop {
4723                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4724                            let mut path = PathBuf::new();
4725                            path.push("/");
4726                            for _ in 0..path_component_count {
4727                                let letter = rng.lock().gen_range(b'a'..=b'z');
4728                                path.push(std::str::from_utf8(&[letter]).unwrap());
4729                            }
4730                            path.set_extension("rs");
4731                            let parent_path = path.parent().unwrap();
4732
4733                            log::info!("Host: creating file {:?}", path,);
4734
4735                            if fs.create_dir(&parent_path).await.is_ok()
4736                                && fs.create_file(&path, Default::default()).await.is_ok()
4737                            {
4738                                files.lock().push(path);
4739                                break;
4740                            } else {
4741                                log::info!("Host: cannot create file");
4742                            }
4743                        },
4744                    }
4745
4746                    cx.background().simulate_random_delay().await;
4747                }
4748
4749                log::info!("Host done");
4750
4751                self.project = Some(project);
4752                (self, cx)
4753            }
4754        }
4755
4756        pub async fn simulate_guest(
4757            mut self,
4758            guest_id: usize,
4759            project: ModelHandle<Project>,
4760            operations: Rc<Cell<usize>>,
4761            max_operations: usize,
4762            rng: Arc<Mutex<StdRng>>,
4763            mut cx: TestAppContext,
4764        ) -> (Self, TestAppContext) {
4765            while operations.get() < max_operations {
4766                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4767                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4768                        project
4769                            .worktrees(&cx)
4770                            .filter(|worktree| {
4771                                let worktree = worktree.read(cx);
4772                                !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
4773                            })
4774                            .choose(&mut *rng.lock())
4775                    }) {
4776                        worktree
4777                    } else {
4778                        cx.background().simulate_random_delay().await;
4779                        continue;
4780                    };
4781
4782                    operations.set(operations.get() + 1);
4783                    let (worktree_root_name, project_path) =
4784                        worktree.read_with(&cx, |worktree, _| {
4785                            let entry = worktree
4786                                .entries(false)
4787                                .filter(|e| e.is_file())
4788                                .choose(&mut *rng.lock())
4789                                .unwrap();
4790                            (
4791                                worktree.root_name().to_string(),
4792                                (worktree.id(), entry.path.clone()),
4793                            )
4794                        });
4795                    log::info!(
4796                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
4797                        guest_id,
4798                        project_path.0,
4799                        worktree_root_name,
4800                        project_path.1
4801                    );
4802                    let buffer = project
4803                        .update(&mut cx, |project, cx| {
4804                            project.open_buffer(project_path.clone(), cx)
4805                        })
4806                        .await
4807                        .unwrap();
4808                    log::info!(
4809                        "Guest {}: path in worktree {:?} {:?} {:?} opened with buffer id {:?}",
4810                        guest_id,
4811                        project_path.0,
4812                        worktree_root_name,
4813                        project_path.1,
4814                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4815                    );
4816                    self.buffers.insert(buffer.clone());
4817                    buffer
4818                } else {
4819                    operations.set(operations.get() + 1);
4820
4821                    self.buffers
4822                        .iter()
4823                        .choose(&mut *rng.lock())
4824                        .unwrap()
4825                        .clone()
4826                };
4827
4828                let choice = rng.lock().gen_range(0..100);
4829                match choice {
4830                    0..=9 => {
4831                        cx.update(|cx| {
4832                            log::info!(
4833                                "Guest {}: dropping buffer {:?}",
4834                                guest_id,
4835                                buffer.read(cx).file().unwrap().full_path(cx)
4836                            );
4837                            self.buffers.remove(&buffer);
4838                            drop(buffer);
4839                        });
4840                    }
4841                    10..=19 => {
4842                        let completions = project.update(&mut cx, |project, cx| {
4843                            log::info!(
4844                                "Guest {}: requesting completions for buffer {:?}",
4845                                guest_id,
4846                                buffer.read(cx).file().unwrap().full_path(cx)
4847                            );
4848                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4849                            project.completions(&buffer, offset, cx)
4850                        });
4851                        let completions = cx.background().spawn(async move {
4852                            completions.await.expect("completions request failed");
4853                        });
4854                        if rng.lock().gen_bool(0.3) {
4855                            log::info!("Guest {}: detaching completions request", guest_id);
4856                            completions.detach();
4857                        } else {
4858                            completions.await;
4859                        }
4860                    }
4861                    20..=29 => {
4862                        let code_actions = project.update(&mut cx, |project, cx| {
4863                            log::info!(
4864                                "Guest {}: requesting code actions for buffer {:?}",
4865                                guest_id,
4866                                buffer.read(cx).file().unwrap().full_path(cx)
4867                            );
4868                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4869                            project.code_actions(&buffer, range, cx)
4870                        });
4871                        let code_actions = cx.background().spawn(async move {
4872                            code_actions.await.expect("code actions request failed");
4873                        });
4874                        if rng.lock().gen_bool(0.3) {
4875                            log::info!("Guest {}: detaching code actions request", guest_id);
4876                            code_actions.detach();
4877                        } else {
4878                            code_actions.await;
4879                        }
4880                    }
4881                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4882                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4883                            log::info!(
4884                                "Guest {}: saving buffer {:?}",
4885                                guest_id,
4886                                buffer.file().unwrap().full_path(cx)
4887                            );
4888                            (buffer.version(), buffer.save(cx))
4889                        });
4890                        let save = cx.spawn(|cx| async move {
4891                            let (saved_version, _) = save.await.expect("save request failed");
4892                            buffer.read_with(&cx, |buffer, _| {
4893                                assert!(buffer.version().observed_all(&saved_version));
4894                                assert!(saved_version.observed_all(&requested_version));
4895                            });
4896                        });
4897                        if rng.lock().gen_bool(0.3) {
4898                            log::info!("Guest {}: detaching save request", guest_id);
4899                            save.detach();
4900                        } else {
4901                            save.await;
4902                        }
4903                    }
4904                    40..=44 => {
4905                        let prepare_rename = project.update(&mut cx, |project, cx| {
4906                            log::info!(
4907                                "Guest {}: preparing rename for buffer {:?}",
4908                                guest_id,
4909                                buffer.read(cx).file().unwrap().full_path(cx)
4910                            );
4911                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4912                            project.prepare_rename(buffer, offset, cx)
4913                        });
4914                        let prepare_rename = cx.background().spawn(async move {
4915                            prepare_rename.await.expect("prepare rename request failed");
4916                        });
4917                        if rng.lock().gen_bool(0.3) {
4918                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4919                            prepare_rename.detach();
4920                        } else {
4921                            prepare_rename.await;
4922                        }
4923                    }
4924                    45..=49 => {
4925                        let definitions = project.update(&mut cx, |project, cx| {
4926                            log::info!(
4927                                "Guest {}: requesting definitions for buffer {:?}",
4928                                guest_id,
4929                                buffer.read(cx).file().unwrap().full_path(cx)
4930                            );
4931                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4932                            project.definition(&buffer, offset, cx)
4933                        });
4934                        let definitions = cx.background().spawn(async move {
4935                            definitions.await.expect("definitions request failed")
4936                        });
4937                        if rng.lock().gen_bool(0.3) {
4938                            log::info!("Guest {}: detaching definitions request", guest_id);
4939                            definitions.detach();
4940                        } else {
4941                            self.buffers
4942                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
4943                        }
4944                    }
4945                    50..=54 => {
4946                        let highlights = project.update(&mut cx, |project, cx| {
4947                            log::info!(
4948                                "Guest {}: requesting highlights for buffer {:?}",
4949                                guest_id,
4950                                buffer.read(cx).file().unwrap().full_path(cx)
4951                            );
4952                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4953                            project.document_highlights(&buffer, offset, cx)
4954                        });
4955                        let highlights = cx.background().spawn(async move {
4956                            highlights.await.expect("highlights request failed");
4957                        });
4958                        if rng.lock().gen_bool(0.3) {
4959                            log::info!("Guest {}: detaching highlights request", guest_id);
4960                            highlights.detach();
4961                        } else {
4962                            highlights.await;
4963                        }
4964                    }
4965                    55..=59 => {
4966                        let search = project.update(&mut cx, |project, cx| {
4967                            let query = rng.lock().gen_range('a'..='z');
4968                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
4969                            project.search(SearchQuery::text(query, false, false), cx)
4970                        });
4971                        let search = cx
4972                            .background()
4973                            .spawn(async move { search.await.expect("search request failed") });
4974                        if rng.lock().gen_bool(0.3) {
4975                            log::info!("Guest {}: detaching search request", guest_id);
4976                            search.detach();
4977                        } else {
4978                            self.buffers.extend(search.await.into_keys());
4979                        }
4980                    }
4981                    _ => {
4982                        buffer.update(&mut cx, |buffer, cx| {
4983                            log::info!(
4984                                "Guest {}: updating buffer {:?}",
4985                                guest_id,
4986                                buffer.file().unwrap().full_path(cx)
4987                            );
4988                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4989                        });
4990                    }
4991                }
4992                cx.background().simulate_random_delay().await;
4993            }
4994
4995            log::info!("Guest {} done", guest_id);
4996
4997            self.project = Some(project);
4998            (self, cx)
4999        }
5000    }
5001
5002    impl Drop for TestClient {
5003        fn drop(&mut self) {
5004            self.client.tear_down();
5005        }
5006    }
5007
5008    impl Executor for Arc<gpui::executor::Background> {
5009        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5010            self.spawn(future).detach();
5011        }
5012    }
5013
5014    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5015        channel
5016            .messages()
5017            .cursor::<()>()
5018            .map(|m| {
5019                (
5020                    m.sender.github_login.clone(),
5021                    m.body.clone(),
5022                    m.is_pending(),
5023                )
5024            })
5025            .collect()
5026    }
5027
5028    struct EmptyView;
5029
5030    impl gpui::Entity for EmptyView {
5031        type Event = ();
5032    }
5033
5034    impl gpui::View for EmptyView {
5035        fn ui_name() -> &'static str {
5036            "empty view"
5037        }
5038
5039        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5040            gpui::Element::boxed(gpui::elements::Empty)
5041        }
5042    }
5043}