rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::get_definition)
  81            .add_request_handler(Server::get_references)
  82            .add_request_handler(Server::get_document_highlights)
  83            .add_request_handler(Server::get_project_symbols)
  84            .add_request_handler(Server::open_buffer_for_symbol)
  85            .add_request_handler(Server::open_buffer)
  86            .add_message_handler(Server::close_buffer)
  87            .add_request_handler(Server::update_buffer)
  88            .add_message_handler(Server::update_buffer_file)
  89            .add_message_handler(Server::buffer_reloaded)
  90            .add_message_handler(Server::buffer_saved)
  91            .add_request_handler(Server::save_buffer)
  92            .add_request_handler(Server::format_buffers)
  93            .add_request_handler(Server::get_completions)
  94            .add_request_handler(Server::apply_additional_edits_for_completion)
  95            .add_request_handler(Server::get_code_actions)
  96            .add_request_handler(Server::apply_code_action)
  97            .add_request_handler(Server::prepare_rename)
  98            .add_request_handler(Server::perform_rename)
  99            .add_request_handler(Server::get_channels)
 100            .add_request_handler(Server::get_users)
 101            .add_request_handler(Server::join_channel)
 102            .add_message_handler(Server::leave_channel)
 103            .add_request_handler(Server::send_channel_message)
 104            .add_request_handler(Server::get_channel_messages);
 105
 106        Arc::new(server)
 107    }
 108
 109    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 110    where
 111        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 112        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 113        M: EnvelopedMessage,
 114    {
 115        let prev_handler = self.handlers.insert(
 116            TypeId::of::<M>(),
 117            Box::new(move |server, envelope| {
 118                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 119                (handler)(server, *envelope).boxed()
 120            }),
 121        );
 122        if prev_handler.is_some() {
 123            panic!("registered a handler for the same message twice");
 124        }
 125        self
 126    }
 127
 128    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 129    where
 130        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 131        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 132        M: RequestMessage,
 133    {
 134        self.add_message_handler(move |server, envelope| {
 135            let receipt = envelope.receipt();
 136            let response = (handler)(server.clone(), envelope);
 137            async move {
 138                match response.await {
 139                    Ok(response) => {
 140                        server.peer.respond(receipt, response)?;
 141                        Ok(())
 142                    }
 143                    Err(error) => {
 144                        server.peer.respond_with_error(
 145                            receipt,
 146                            proto::Error {
 147                                message: error.to_string(),
 148                            },
 149                        )?;
 150                        Err(error)
 151                    }
 152                }
 153            }
 154        })
 155    }
 156
 157    pub fn handle_connection<E: Executor>(
 158        self: &Arc<Self>,
 159        connection: Connection,
 160        addr: String,
 161        user_id: UserId,
 162        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 163        executor: E,
 164    ) -> impl Future<Output = ()> {
 165        let mut this = self.clone();
 166        async move {
 167            let (connection_id, handle_io, mut incoming_rx) =
 168                this.peer.add_connection(connection).await;
 169
 170            if let Some(send_connection_id) = send_connection_id.as_mut() {
 171                let _ = send_connection_id.send(connection_id).await;
 172            }
 173
 174            this.state_mut().add_connection(connection_id, user_id);
 175            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 176                log::error!("error updating contacts for {:?}: {}", user_id, err);
 177            }
 178
 179            let handle_io = handle_io.fuse();
 180            futures::pin_mut!(handle_io);
 181            loop {
 182                let next_message = incoming_rx.next().fuse();
 183                futures::pin_mut!(next_message);
 184                futures::select_biased! {
 185                    result = handle_io => {
 186                        if let Err(err) = result {
 187                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 188                        }
 189                        break;
 190                    }
 191                    message = next_message => {
 192                        if let Some(message) = message {
 193                            let start_time = Instant::now();
 194                            let type_name = message.payload_type_name();
 195                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 196                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 197                                let notifications = this.notifications.clone();
 198                                let is_background = message.is_background();
 199                                let handle_message = (handler)(this.clone(), message);
 200                                let handle_message = async move {
 201                                    if let Err(err) = handle_message.await {
 202                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 203                                    } else {
 204                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 205                                    }
 206                                    if let Some(mut notifications) = notifications {
 207                                        let _ = notifications.send(()).await;
 208                                    }
 209                                };
 210                                if is_background {
 211                                    executor.spawn_detached(handle_message);
 212                                } else {
 213                                    handle_message.await;
 214                                }
 215                            } else {
 216                                log::warn!("unhandled message: {}", type_name);
 217                            }
 218                        } else {
 219                            log::info!("rpc connection closed {:?}", addr);
 220                            break;
 221                        }
 222                    }
 223                }
 224            }
 225
 226            if let Err(err) = this.sign_out(connection_id).await {
 227                log::error!("error signing out connection {:?} - {:?}", addr, err);
 228            }
 229        }
 230    }
 231
 232    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 233        self.peer.disconnect(connection_id);
 234        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 235
 236        for (project_id, project) in removed_connection.hosted_projects {
 237            if let Some(share) = project.share {
 238                broadcast(
 239                    connection_id,
 240                    share.guests.keys().copied().collect(),
 241                    |conn_id| {
 242                        self.peer
 243                            .send(conn_id, proto::UnshareProject { project_id })
 244                    },
 245                )?;
 246            }
 247        }
 248
 249        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 250            broadcast(connection_id, peer_ids, |conn_id| {
 251                self.peer.send(
 252                    conn_id,
 253                    proto::RemoveProjectCollaborator {
 254                        project_id,
 255                        peer_id: connection_id.0,
 256                    },
 257                )
 258            })?;
 259        }
 260
 261        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 262        Ok(())
 263    }
 264
 265    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 266        Ok(proto::Ack {})
 267    }
 268
 269    async fn register_project(
 270        mut self: Arc<Server>,
 271        request: TypedEnvelope<proto::RegisterProject>,
 272    ) -> tide::Result<proto::RegisterProjectResponse> {
 273        let project_id = {
 274            let mut state = self.state_mut();
 275            let user_id = state.user_id_for_connection(request.sender_id)?;
 276            state.register_project(request.sender_id, user_id)
 277        };
 278        Ok(proto::RegisterProjectResponse { project_id })
 279    }
 280
 281    async fn unregister_project(
 282        mut self: Arc<Server>,
 283        request: TypedEnvelope<proto::UnregisterProject>,
 284    ) -> tide::Result<()> {
 285        let project = self
 286            .state_mut()
 287            .unregister_project(request.payload.project_id, request.sender_id)?;
 288        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 289        Ok(())
 290    }
 291
 292    async fn share_project(
 293        mut self: Arc<Server>,
 294        request: TypedEnvelope<proto::ShareProject>,
 295    ) -> tide::Result<proto::Ack> {
 296        self.state_mut()
 297            .share_project(request.payload.project_id, request.sender_id);
 298        Ok(proto::Ack {})
 299    }
 300
 301    async fn unshare_project(
 302        mut self: Arc<Server>,
 303        request: TypedEnvelope<proto::UnshareProject>,
 304    ) -> tide::Result<()> {
 305        let project_id = request.payload.project_id;
 306        let project = self
 307            .state_mut()
 308            .unshare_project(project_id, request.sender_id)?;
 309
 310        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 311            self.peer
 312                .send(conn_id, proto::UnshareProject { project_id })
 313        })?;
 314        self.update_contacts_for_users(&project.authorized_user_ids)?;
 315        Ok(())
 316    }
 317
 318    async fn join_project(
 319        mut self: Arc<Server>,
 320        request: TypedEnvelope<proto::JoinProject>,
 321    ) -> tide::Result<proto::JoinProjectResponse> {
 322        let project_id = request.payload.project_id;
 323
 324        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 325        let (response, connection_ids, contact_user_ids) = self
 326            .state_mut()
 327            .join_project(request.sender_id, user_id, project_id)
 328            .and_then(|joined| {
 329                let share = joined.project.share()?;
 330                let peer_count = share.guests.len();
 331                let mut collaborators = Vec::with_capacity(peer_count);
 332                collaborators.push(proto::Collaborator {
 333                    peer_id: joined.project.host_connection_id.0,
 334                    replica_id: 0,
 335                    user_id: joined.project.host_user_id.to_proto(),
 336                });
 337                let worktrees = share
 338                    .worktrees
 339                    .iter()
 340                    .filter_map(|(id, shared_worktree)| {
 341                        let worktree = joined.project.worktrees.get(&id)?;
 342                        Some(proto::Worktree {
 343                            id: *id,
 344                            root_name: worktree.root_name.clone(),
 345                            entries: shared_worktree.entries.values().cloned().collect(),
 346                            diagnostic_summaries: shared_worktree
 347                                .diagnostic_summaries
 348                                .values()
 349                                .cloned()
 350                                .collect(),
 351                            weak: worktree.weak,
 352                        })
 353                    })
 354                    .collect();
 355                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 356                    if *peer_conn_id != request.sender_id {
 357                        collaborators.push(proto::Collaborator {
 358                            peer_id: peer_conn_id.0,
 359                            replica_id: *peer_replica_id as u32,
 360                            user_id: peer_user_id.to_proto(),
 361                        });
 362                    }
 363                }
 364                let response = proto::JoinProjectResponse {
 365                    worktrees,
 366                    replica_id: joined.replica_id as u32,
 367                    collaborators,
 368                };
 369                let connection_ids = joined.project.connection_ids();
 370                let contact_user_ids = joined.project.authorized_user_ids();
 371                Ok((response, connection_ids, contact_user_ids))
 372            })?;
 373
 374        broadcast(request.sender_id, connection_ids, |conn_id| {
 375            self.peer.send(
 376                conn_id,
 377                proto::AddProjectCollaborator {
 378                    project_id,
 379                    collaborator: Some(proto::Collaborator {
 380                        peer_id: request.sender_id.0,
 381                        replica_id: response.replica_id,
 382                        user_id: user_id.to_proto(),
 383                    }),
 384                },
 385            )
 386        })?;
 387        self.update_contacts_for_users(&contact_user_ids)?;
 388        Ok(response)
 389    }
 390
 391    async fn leave_project(
 392        mut self: Arc<Server>,
 393        request: TypedEnvelope<proto::LeaveProject>,
 394    ) -> tide::Result<()> {
 395        let sender_id = request.sender_id;
 396        let project_id = request.payload.project_id;
 397        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 398
 399        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 400            self.peer.send(
 401                conn_id,
 402                proto::RemoveProjectCollaborator {
 403                    project_id,
 404                    peer_id: sender_id.0,
 405                },
 406            )
 407        })?;
 408        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 409
 410        Ok(())
 411    }
 412
 413    async fn register_worktree(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::RegisterWorktree>,
 416    ) -> tide::Result<proto::Ack> {
 417        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 418
 419        let mut contact_user_ids = HashSet::default();
 420        contact_user_ids.insert(host_user_id);
 421        for github_login in &request.payload.authorized_logins {
 422            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 423            contact_user_ids.insert(contact_user_id);
 424        }
 425
 426        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 427        let guest_connection_ids;
 428        {
 429            let mut state = self.state_mut();
 430            guest_connection_ids = state
 431                .read_project(request.payload.project_id, request.sender_id)?
 432                .guest_connection_ids();
 433            state.register_worktree(
 434                request.payload.project_id,
 435                request.payload.worktree_id,
 436                request.sender_id,
 437                Worktree {
 438                    authorized_user_ids: contact_user_ids.clone(),
 439                    root_name: request.payload.root_name.clone(),
 440                    weak: request.payload.weak,
 441                },
 442            )?;
 443        }
 444        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 445            self.peer
 446                .forward_send(request.sender_id, connection_id, request.payload.clone())
 447        })?;
 448        self.update_contacts_for_users(&contact_user_ids)?;
 449        Ok(proto::Ack {})
 450    }
 451
 452    async fn unregister_worktree(
 453        mut self: Arc<Server>,
 454        request: TypedEnvelope<proto::UnregisterWorktree>,
 455    ) -> tide::Result<()> {
 456        let project_id = request.payload.project_id;
 457        let worktree_id = request.payload.worktree_id;
 458        let (worktree, guest_connection_ids) =
 459            self.state_mut()
 460                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 461        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 462            self.peer.send(
 463                conn_id,
 464                proto::UnregisterWorktree {
 465                    project_id,
 466                    worktree_id,
 467                },
 468            )
 469        })?;
 470        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 471        Ok(())
 472    }
 473
 474    async fn update_worktree(
 475        mut self: Arc<Server>,
 476        request: TypedEnvelope<proto::UpdateWorktree>,
 477    ) -> tide::Result<proto::Ack> {
 478        let connection_ids = self.state_mut().update_worktree(
 479            request.sender_id,
 480            request.payload.project_id,
 481            request.payload.worktree_id,
 482            &request.payload.removed_entries,
 483            &request.payload.updated_entries,
 484        )?;
 485
 486        broadcast(request.sender_id, connection_ids, |connection_id| {
 487            self.peer
 488                .forward_send(request.sender_id, connection_id, request.payload.clone())
 489        })?;
 490
 491        Ok(proto::Ack {})
 492    }
 493
 494    async fn update_diagnostic_summary(
 495        mut self: Arc<Server>,
 496        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 497    ) -> tide::Result<()> {
 498        let summary = request
 499            .payload
 500            .summary
 501            .clone()
 502            .ok_or_else(|| anyhow!("invalid summary"))?;
 503        let receiver_ids = self.state_mut().update_diagnostic_summary(
 504            request.payload.project_id,
 505            request.payload.worktree_id,
 506            request.sender_id,
 507            summary,
 508        )?;
 509
 510        broadcast(request.sender_id, receiver_ids, |connection_id| {
 511            self.peer
 512                .forward_send(request.sender_id, connection_id, request.payload.clone())
 513        })?;
 514        Ok(())
 515    }
 516
 517    async fn disk_based_diagnostics_updating(
 518        self: Arc<Server>,
 519        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 520    ) -> tide::Result<()> {
 521        let receiver_ids = self
 522            .state()
 523            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 524        broadcast(request.sender_id, receiver_ids, |connection_id| {
 525            self.peer
 526                .forward_send(request.sender_id, connection_id, request.payload.clone())
 527        })?;
 528        Ok(())
 529    }
 530
 531    async fn disk_based_diagnostics_updated(
 532        self: Arc<Server>,
 533        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 534    ) -> tide::Result<()> {
 535        let receiver_ids = self
 536            .state()
 537            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 538        broadcast(request.sender_id, receiver_ids, |connection_id| {
 539            self.peer
 540                .forward_send(request.sender_id, connection_id, request.payload.clone())
 541        })?;
 542        Ok(())
 543    }
 544
 545    async fn get_definition(
 546        self: Arc<Server>,
 547        request: TypedEnvelope<proto::GetDefinition>,
 548    ) -> tide::Result<proto::GetDefinitionResponse> {
 549        let host_connection_id = self
 550            .state()
 551            .read_project(request.payload.project_id, request.sender_id)?
 552            .host_connection_id;
 553        Ok(self
 554            .peer
 555            .forward_request(request.sender_id, host_connection_id, request.payload)
 556            .await?)
 557    }
 558
 559    async fn get_references(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::GetReferences>,
 562    ) -> tide::Result<proto::GetReferencesResponse> {
 563        let host_connection_id = self
 564            .state()
 565            .read_project(request.payload.project_id, request.sender_id)?
 566            .host_connection_id;
 567        Ok(self
 568            .peer
 569            .forward_request(request.sender_id, host_connection_id, request.payload)
 570            .await?)
 571    }
 572
 573    async fn get_document_highlights(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::GetDocumentHighlights>,
 576    ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
 577        let host_connection_id = self
 578            .state()
 579            .read_project(request.payload.project_id, request.sender_id)?
 580            .host_connection_id;
 581        Ok(self
 582            .peer
 583            .forward_request(request.sender_id, host_connection_id, request.payload)
 584            .await?)
 585    }
 586
 587    async fn get_project_symbols(
 588        self: Arc<Server>,
 589        request: TypedEnvelope<proto::GetProjectSymbols>,
 590    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 591        let host_connection_id = self
 592            .state()
 593            .read_project(request.payload.project_id, request.sender_id)?
 594            .host_connection_id;
 595        Ok(self
 596            .peer
 597            .forward_request(request.sender_id, host_connection_id, request.payload)
 598            .await?)
 599    }
 600
 601    async fn open_buffer_for_symbol(
 602        self: Arc<Server>,
 603        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 604    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 605        let host_connection_id = self
 606            .state()
 607            .read_project(request.payload.project_id, request.sender_id)?
 608            .host_connection_id;
 609        Ok(self
 610            .peer
 611            .forward_request(request.sender_id, host_connection_id, request.payload)
 612            .await?)
 613    }
 614
 615    async fn open_buffer(
 616        self: Arc<Server>,
 617        request: TypedEnvelope<proto::OpenBuffer>,
 618    ) -> tide::Result<proto::OpenBufferResponse> {
 619        let host_connection_id = self
 620            .state()
 621            .read_project(request.payload.project_id, request.sender_id)?
 622            .host_connection_id;
 623        Ok(self
 624            .peer
 625            .forward_request(request.sender_id, host_connection_id, request.payload)
 626            .await?)
 627    }
 628
 629    async fn close_buffer(
 630        self: Arc<Server>,
 631        request: TypedEnvelope<proto::CloseBuffer>,
 632    ) -> tide::Result<()> {
 633        let host_connection_id = self
 634            .state()
 635            .read_project(request.payload.project_id, request.sender_id)?
 636            .host_connection_id;
 637        self.peer
 638            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 639        Ok(())
 640    }
 641
 642    async fn save_buffer(
 643        self: Arc<Server>,
 644        request: TypedEnvelope<proto::SaveBuffer>,
 645    ) -> tide::Result<proto::BufferSaved> {
 646        let host;
 647        let mut guests;
 648        {
 649            let state = self.state();
 650            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 651            host = project.host_connection_id;
 652            guests = project.guest_connection_ids()
 653        }
 654
 655        let response = self
 656            .peer
 657            .forward_request(request.sender_id, host, request.payload.clone())
 658            .await?;
 659
 660        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 661        broadcast(host, guests, |conn_id| {
 662            self.peer.forward_send(host, conn_id, response.clone())
 663        })?;
 664
 665        Ok(response)
 666    }
 667
 668    async fn format_buffers(
 669        self: Arc<Server>,
 670        request: TypedEnvelope<proto::FormatBuffers>,
 671    ) -> tide::Result<proto::FormatBuffersResponse> {
 672        let host = self
 673            .state()
 674            .read_project(request.payload.project_id, request.sender_id)?
 675            .host_connection_id;
 676        Ok(self
 677            .peer
 678            .forward_request(request.sender_id, host, request.payload.clone())
 679            .await?)
 680    }
 681
 682    async fn get_completions(
 683        self: Arc<Server>,
 684        request: TypedEnvelope<proto::GetCompletions>,
 685    ) -> tide::Result<proto::GetCompletionsResponse> {
 686        let host = self
 687            .state()
 688            .read_project(request.payload.project_id, request.sender_id)?
 689            .host_connection_id;
 690        Ok(self
 691            .peer
 692            .forward_request(request.sender_id, host, request.payload.clone())
 693            .await?)
 694    }
 695
 696    async fn apply_additional_edits_for_completion(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 699    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 700        let host = self
 701            .state()
 702            .read_project(request.payload.project_id, request.sender_id)?
 703            .host_connection_id;
 704        Ok(self
 705            .peer
 706            .forward_request(request.sender_id, host, request.payload.clone())
 707            .await?)
 708    }
 709
 710    async fn get_code_actions(
 711        self: Arc<Server>,
 712        request: TypedEnvelope<proto::GetCodeActions>,
 713    ) -> tide::Result<proto::GetCodeActionsResponse> {
 714        let host = self
 715            .state()
 716            .read_project(request.payload.project_id, request.sender_id)?
 717            .host_connection_id;
 718        Ok(self
 719            .peer
 720            .forward_request(request.sender_id, host, request.payload.clone())
 721            .await?)
 722    }
 723
 724    async fn apply_code_action(
 725        self: Arc<Server>,
 726        request: TypedEnvelope<proto::ApplyCodeAction>,
 727    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 728        let host = self
 729            .state()
 730            .read_project(request.payload.project_id, request.sender_id)?
 731            .host_connection_id;
 732        Ok(self
 733            .peer
 734            .forward_request(request.sender_id, host, request.payload.clone())
 735            .await?)
 736    }
 737
 738    async fn prepare_rename(
 739        self: Arc<Server>,
 740        request: TypedEnvelope<proto::PrepareRename>,
 741    ) -> tide::Result<proto::PrepareRenameResponse> {
 742        let host = self
 743            .state()
 744            .read_project(request.payload.project_id, request.sender_id)?
 745            .host_connection_id;
 746        Ok(self
 747            .peer
 748            .forward_request(request.sender_id, host, request.payload.clone())
 749            .await?)
 750    }
 751
 752    async fn perform_rename(
 753        self: Arc<Server>,
 754        request: TypedEnvelope<proto::PerformRename>,
 755    ) -> tide::Result<proto::PerformRenameResponse> {
 756        let host = self
 757            .state()
 758            .read_project(request.payload.project_id, request.sender_id)?
 759            .host_connection_id;
 760        Ok(self
 761            .peer
 762            .forward_request(request.sender_id, host, request.payload.clone())
 763            .await?)
 764    }
 765
 766    async fn update_buffer(
 767        self: Arc<Server>,
 768        request: TypedEnvelope<proto::UpdateBuffer>,
 769    ) -> tide::Result<proto::Ack> {
 770        let receiver_ids = self
 771            .state()
 772            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 773        broadcast(request.sender_id, receiver_ids, |connection_id| {
 774            self.peer
 775                .forward_send(request.sender_id, connection_id, request.payload.clone())
 776        })?;
 777        Ok(proto::Ack {})
 778    }
 779
 780    async fn update_buffer_file(
 781        self: Arc<Server>,
 782        request: TypedEnvelope<proto::UpdateBufferFile>,
 783    ) -> tide::Result<()> {
 784        let receiver_ids = self
 785            .state()
 786            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 787        broadcast(request.sender_id, receiver_ids, |connection_id| {
 788            self.peer
 789                .forward_send(request.sender_id, connection_id, request.payload.clone())
 790        })?;
 791        Ok(())
 792    }
 793
 794    async fn buffer_reloaded(
 795        self: Arc<Server>,
 796        request: TypedEnvelope<proto::BufferReloaded>,
 797    ) -> tide::Result<()> {
 798        let receiver_ids = self
 799            .state()
 800            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 801        broadcast(request.sender_id, receiver_ids, |connection_id| {
 802            self.peer
 803                .forward_send(request.sender_id, connection_id, request.payload.clone())
 804        })?;
 805        Ok(())
 806    }
 807
 808    async fn buffer_saved(
 809        self: Arc<Server>,
 810        request: TypedEnvelope<proto::BufferSaved>,
 811    ) -> tide::Result<()> {
 812        let receiver_ids = self
 813            .state()
 814            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 815        broadcast(request.sender_id, receiver_ids, |connection_id| {
 816            self.peer
 817                .forward_send(request.sender_id, connection_id, request.payload.clone())
 818        })?;
 819        Ok(())
 820    }
 821
 822    async fn get_channels(
 823        self: Arc<Server>,
 824        request: TypedEnvelope<proto::GetChannels>,
 825    ) -> tide::Result<proto::GetChannelsResponse> {
 826        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 827        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 828        Ok(proto::GetChannelsResponse {
 829            channels: channels
 830                .into_iter()
 831                .map(|chan| proto::Channel {
 832                    id: chan.id.to_proto(),
 833                    name: chan.name,
 834                })
 835                .collect(),
 836        })
 837    }
 838
 839    async fn get_users(
 840        self: Arc<Server>,
 841        request: TypedEnvelope<proto::GetUsers>,
 842    ) -> tide::Result<proto::GetUsersResponse> {
 843        let user_ids = request
 844            .payload
 845            .user_ids
 846            .into_iter()
 847            .map(UserId::from_proto)
 848            .collect();
 849        let users = self
 850            .app_state
 851            .db
 852            .get_users_by_ids(user_ids)
 853            .await?
 854            .into_iter()
 855            .map(|user| proto::User {
 856                id: user.id.to_proto(),
 857                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 858                github_login: user.github_login,
 859            })
 860            .collect();
 861        Ok(proto::GetUsersResponse { users })
 862    }
 863
 864    fn update_contacts_for_users<'a>(
 865        self: &Arc<Server>,
 866        user_ids: impl IntoIterator<Item = &'a UserId>,
 867    ) -> anyhow::Result<()> {
 868        let mut result = Ok(());
 869        let state = self.state();
 870        for user_id in user_ids {
 871            let contacts = state.contacts_for_user(*user_id);
 872            for connection_id in state.connection_ids_for_user(*user_id) {
 873                if let Err(error) = self.peer.send(
 874                    connection_id,
 875                    proto::UpdateContacts {
 876                        contacts: contacts.clone(),
 877                    },
 878                ) {
 879                    result = Err(error);
 880                }
 881            }
 882        }
 883        result
 884    }
 885
 886    async fn join_channel(
 887        mut self: Arc<Self>,
 888        request: TypedEnvelope<proto::JoinChannel>,
 889    ) -> tide::Result<proto::JoinChannelResponse> {
 890        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 891        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 892        if !self
 893            .app_state
 894            .db
 895            .can_user_access_channel(user_id, channel_id)
 896            .await?
 897        {
 898            Err(anyhow!("access denied"))?;
 899        }
 900
 901        self.state_mut().join_channel(request.sender_id, channel_id);
 902        let messages = self
 903            .app_state
 904            .db
 905            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 906            .await?
 907            .into_iter()
 908            .map(|msg| proto::ChannelMessage {
 909                id: msg.id.to_proto(),
 910                body: msg.body,
 911                timestamp: msg.sent_at.unix_timestamp() as u64,
 912                sender_id: msg.sender_id.to_proto(),
 913                nonce: Some(msg.nonce.as_u128().into()),
 914            })
 915            .collect::<Vec<_>>();
 916        Ok(proto::JoinChannelResponse {
 917            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 918            messages,
 919        })
 920    }
 921
 922    async fn leave_channel(
 923        mut self: Arc<Self>,
 924        request: TypedEnvelope<proto::LeaveChannel>,
 925    ) -> tide::Result<()> {
 926        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 927        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 928        if !self
 929            .app_state
 930            .db
 931            .can_user_access_channel(user_id, channel_id)
 932            .await?
 933        {
 934            Err(anyhow!("access denied"))?;
 935        }
 936
 937        self.state_mut()
 938            .leave_channel(request.sender_id, channel_id);
 939
 940        Ok(())
 941    }
 942
 943    async fn send_channel_message(
 944        self: Arc<Self>,
 945        request: TypedEnvelope<proto::SendChannelMessage>,
 946    ) -> tide::Result<proto::SendChannelMessageResponse> {
 947        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 948        let user_id;
 949        let connection_ids;
 950        {
 951            let state = self.state();
 952            user_id = state.user_id_for_connection(request.sender_id)?;
 953            connection_ids = state.channel_connection_ids(channel_id)?;
 954        }
 955
 956        // Validate the message body.
 957        let body = request.payload.body.trim().to_string();
 958        if body.len() > MAX_MESSAGE_LEN {
 959            return Err(anyhow!("message is too long"))?;
 960        }
 961        if body.is_empty() {
 962            return Err(anyhow!("message can't be blank"))?;
 963        }
 964
 965        let timestamp = OffsetDateTime::now_utc();
 966        let nonce = request
 967            .payload
 968            .nonce
 969            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 970
 971        let message_id = self
 972            .app_state
 973            .db
 974            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 975            .await?
 976            .to_proto();
 977        let message = proto::ChannelMessage {
 978            sender_id: user_id.to_proto(),
 979            id: message_id,
 980            body,
 981            timestamp: timestamp.unix_timestamp() as u64,
 982            nonce: Some(nonce),
 983        };
 984        broadcast(request.sender_id, connection_ids, |conn_id| {
 985            self.peer.send(
 986                conn_id,
 987                proto::ChannelMessageSent {
 988                    channel_id: channel_id.to_proto(),
 989                    message: Some(message.clone()),
 990                },
 991            )
 992        })?;
 993        Ok(proto::SendChannelMessageResponse {
 994            message: Some(message),
 995        })
 996    }
 997
 998    async fn get_channel_messages(
 999        self: Arc<Self>,
1000        request: TypedEnvelope<proto::GetChannelMessages>,
1001    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1002        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1003        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1004        if !self
1005            .app_state
1006            .db
1007            .can_user_access_channel(user_id, channel_id)
1008            .await?
1009        {
1010            Err(anyhow!("access denied"))?;
1011        }
1012
1013        let messages = self
1014            .app_state
1015            .db
1016            .get_channel_messages(
1017                channel_id,
1018                MESSAGE_COUNT_PER_PAGE,
1019                Some(MessageId::from_proto(request.payload.before_message_id)),
1020            )
1021            .await?
1022            .into_iter()
1023            .map(|msg| proto::ChannelMessage {
1024                id: msg.id.to_proto(),
1025                body: msg.body,
1026                timestamp: msg.sent_at.unix_timestamp() as u64,
1027                sender_id: msg.sender_id.to_proto(),
1028                nonce: Some(msg.nonce.as_u128().into()),
1029            })
1030            .collect::<Vec<_>>();
1031
1032        Ok(proto::GetChannelMessagesResponse {
1033            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1034            messages,
1035        })
1036    }
1037
1038    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1039        self.store.read()
1040    }
1041
1042    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1043        self.store.write()
1044    }
1045}
1046
1047impl Executor for RealExecutor {
1048    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1049        task::spawn(future);
1050    }
1051}
1052
1053fn broadcast<F>(
1054    sender_id: ConnectionId,
1055    receiver_ids: Vec<ConnectionId>,
1056    mut f: F,
1057) -> anyhow::Result<()>
1058where
1059    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1060{
1061    let mut result = Ok(());
1062    for receiver_id in receiver_ids {
1063        if receiver_id != sender_id {
1064            if let Err(error) = f(receiver_id) {
1065                if result.is_ok() {
1066                    result = Err(error);
1067                }
1068            }
1069        }
1070    }
1071    result
1072}
1073
1074pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1075    let server = Server::new(app.state().clone(), rpc.clone(), None);
1076    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1077        let server = server.clone();
1078        async move {
1079            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1080
1081            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1082            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1083            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1084            let client_protocol_version: Option<u32> = request
1085                .header("X-Zed-Protocol-Version")
1086                .and_then(|v| v.as_str().parse().ok());
1087
1088            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1089                return Ok(Response::new(StatusCode::UpgradeRequired));
1090            }
1091
1092            let header = match request.header("Sec-Websocket-Key") {
1093                Some(h) => h.as_str(),
1094                None => return Err(anyhow!("expected sec-websocket-key"))?,
1095            };
1096
1097            let user_id = process_auth_header(&request).await?;
1098
1099            let mut response = Response::new(StatusCode::SwitchingProtocols);
1100            response.insert_header(UPGRADE, "websocket");
1101            response.insert_header(CONNECTION, "Upgrade");
1102            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1103            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1104            response.insert_header("Sec-Websocket-Version", "13");
1105
1106            let http_res: &mut tide::http::Response = response.as_mut();
1107            let upgrade_receiver = http_res.recv_upgrade().await;
1108            let addr = request.remote().unwrap_or("unknown").to_string();
1109            task::spawn(async move {
1110                if let Some(stream) = upgrade_receiver.await {
1111                    server
1112                        .handle_connection(
1113                            Connection::new(
1114                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1115                            ),
1116                            addr,
1117                            user_id,
1118                            None,
1119                            RealExecutor,
1120                        )
1121                        .await;
1122                }
1123            });
1124
1125            Ok(response)
1126        }
1127    });
1128}
1129
1130fn header_contains_ignore_case<T>(
1131    request: &tide::Request<T>,
1132    header_name: HeaderName,
1133    value: &str,
1134) -> bool {
1135    request
1136        .header(header_name)
1137        .map(|h| {
1138            h.as_str()
1139                .split(',')
1140                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1141        })
1142        .unwrap_or(false)
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148    use crate::{
1149        auth,
1150        db::{tests::TestDb, UserId},
1151        github, AppState, Config,
1152    };
1153    use ::rpc::Peer;
1154    use collections::BTreeMap;
1155    use gpui::{executor, ModelHandle, TestAppContext};
1156    use parking_lot::Mutex;
1157    use postage::{sink::Sink, watch};
1158    use rand::prelude::*;
1159    use rpc::PeerId;
1160    use serde_json::json;
1161    use sqlx::types::time::OffsetDateTime;
1162    use std::{
1163        cell::Cell,
1164        env,
1165        ops::Deref,
1166        path::{Path, PathBuf},
1167        rc::Rc,
1168        sync::{
1169            atomic::{AtomicBool, Ordering::SeqCst},
1170            Arc,
1171        },
1172        time::Duration,
1173    };
1174    use zed::{
1175        client::{
1176            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1177            EstablishConnectionError, UserStore,
1178        },
1179        editor::{
1180            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1181            Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1182        },
1183        fs::{FakeFs, Fs as _},
1184        language::{
1185            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1186            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1187        },
1188        lsp,
1189        project::{DiagnosticSummary, Project, ProjectPath},
1190        workspace::{Workspace, WorkspaceParams},
1191    };
1192
1193    #[cfg(test)]
1194    #[ctor::ctor]
1195    fn init_logger() {
1196        if std::env::var("RUST_LOG").is_ok() {
1197            env_logger::init();
1198        }
1199    }
1200
1201    #[gpui::test(iterations = 10)]
1202    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1203        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1204        let lang_registry = Arc::new(LanguageRegistry::new());
1205        let fs = FakeFs::new(cx_a.background());
1206        cx_a.foreground().forbid_parking();
1207
1208        // Connect to a server as 2 clients.
1209        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1210        let client_a = server.create_client(&mut cx_a, "user_a").await;
1211        let client_b = server.create_client(&mut cx_b, "user_b").await;
1212
1213        // Share a project as client A
1214        fs.insert_tree(
1215            "/a",
1216            json!({
1217                ".zed.toml": r#"collaborators = ["user_b"]"#,
1218                "a.txt": "a-contents",
1219                "b.txt": "b-contents",
1220            }),
1221        )
1222        .await;
1223        let project_a = cx_a.update(|cx| {
1224            Project::local(
1225                client_a.clone(),
1226                client_a.user_store.clone(),
1227                lang_registry.clone(),
1228                fs.clone(),
1229                cx,
1230            )
1231        });
1232        let (worktree_a, _) = project_a
1233            .update(&mut cx_a, |p, cx| {
1234                p.find_or_create_local_worktree("/a", false, cx)
1235            })
1236            .await
1237            .unwrap();
1238        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1239        worktree_a
1240            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1241            .await;
1242        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1243        project_a
1244            .update(&mut cx_a, |p, cx| p.share(cx))
1245            .await
1246            .unwrap();
1247
1248        // Join that project as client B
1249        let project_b = Project::remote(
1250            project_id,
1251            client_b.clone(),
1252            client_b.user_store.clone(),
1253            lang_registry.clone(),
1254            fs.clone(),
1255            &mut cx_b.to_async(),
1256        )
1257        .await
1258        .unwrap();
1259
1260        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1261            assert_eq!(
1262                project
1263                    .collaborators()
1264                    .get(&client_a.peer_id)
1265                    .unwrap()
1266                    .user
1267                    .github_login,
1268                "user_a"
1269            );
1270            project.replica_id()
1271        });
1272        project_a
1273            .condition(&cx_a, |tree, _| {
1274                tree.collaborators()
1275                    .get(&client_b.peer_id)
1276                    .map_or(false, |collaborator| {
1277                        collaborator.replica_id == replica_id_b
1278                            && collaborator.user.github_login == "user_b"
1279                    })
1280            })
1281            .await;
1282
1283        // Open the same file as client B and client A.
1284        let buffer_b = project_b
1285            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1286            .await
1287            .unwrap();
1288        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1289        buffer_b.read_with(&cx_b, |buf, cx| {
1290            assert_eq!(buf.read(cx).text(), "b-contents")
1291        });
1292        project_a.read_with(&cx_a, |project, cx| {
1293            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1294        });
1295        let buffer_a = project_a
1296            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1297            .await
1298            .unwrap();
1299
1300        let editor_b = cx_b.add_view(window_b, |cx| {
1301            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1302        });
1303
1304        // TODO
1305        // // Create a selection set as client B and see that selection set as client A.
1306        // buffer_a
1307        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1308        //     .await;
1309
1310        // Edit the buffer as client B and see that edit as client A.
1311        editor_b.update(&mut cx_b, |editor, cx| {
1312            editor.handle_input(&Input("ok, ".into()), cx)
1313        });
1314        buffer_a
1315            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1316            .await;
1317
1318        // TODO
1319        // // Remove the selection set as client B, see those selections disappear as client A.
1320        cx_b.update(move |_| drop(editor_b));
1321        // buffer_a
1322        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1323        //     .await;
1324
1325        // Close the buffer as client A, see that the buffer is closed.
1326        cx_a.update(move |_| drop(buffer_a));
1327        project_a
1328            .condition(&cx_a, |project, cx| {
1329                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1330            })
1331            .await;
1332
1333        // Dropping the client B's project removes client B from client A's collaborators.
1334        cx_b.update(move |_| drop(project_b));
1335        project_a
1336            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1337            .await;
1338    }
1339
1340    #[gpui::test(iterations = 10)]
1341    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1342        let lang_registry = Arc::new(LanguageRegistry::new());
1343        let fs = FakeFs::new(cx_a.background());
1344        cx_a.foreground().forbid_parking();
1345
1346        // Connect to a server as 2 clients.
1347        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1348        let client_a = server.create_client(&mut cx_a, "user_a").await;
1349        let client_b = server.create_client(&mut cx_b, "user_b").await;
1350
1351        // Share a project as client A
1352        fs.insert_tree(
1353            "/a",
1354            json!({
1355                ".zed.toml": r#"collaborators = ["user_b"]"#,
1356                "a.txt": "a-contents",
1357                "b.txt": "b-contents",
1358            }),
1359        )
1360        .await;
1361        let project_a = cx_a.update(|cx| {
1362            Project::local(
1363                client_a.clone(),
1364                client_a.user_store.clone(),
1365                lang_registry.clone(),
1366                fs.clone(),
1367                cx,
1368            )
1369        });
1370        let (worktree_a, _) = project_a
1371            .update(&mut cx_a, |p, cx| {
1372                p.find_or_create_local_worktree("/a", false, cx)
1373            })
1374            .await
1375            .unwrap();
1376        worktree_a
1377            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1378            .await;
1379        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1380        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1381        project_a
1382            .update(&mut cx_a, |p, cx| p.share(cx))
1383            .await
1384            .unwrap();
1385        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1386
1387        // Join that project as client B
1388        let project_b = Project::remote(
1389            project_id,
1390            client_b.clone(),
1391            client_b.user_store.clone(),
1392            lang_registry.clone(),
1393            fs.clone(),
1394            &mut cx_b.to_async(),
1395        )
1396        .await
1397        .unwrap();
1398        project_b
1399            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1400            .await
1401            .unwrap();
1402
1403        // Unshare the project as client A
1404        project_a
1405            .update(&mut cx_a, |project, cx| project.unshare(cx))
1406            .await
1407            .unwrap();
1408        project_b
1409            .condition(&mut cx_b, |project, _| project.is_read_only())
1410            .await;
1411        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1412        drop(project_b);
1413
1414        // Share the project again and ensure guests can still join.
1415        project_a
1416            .update(&mut cx_a, |project, cx| project.share(cx))
1417            .await
1418            .unwrap();
1419        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1420
1421        let project_c = Project::remote(
1422            project_id,
1423            client_b.clone(),
1424            client_b.user_store.clone(),
1425            lang_registry.clone(),
1426            fs.clone(),
1427            &mut cx_b.to_async(),
1428        )
1429        .await
1430        .unwrap();
1431        project_c
1432            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1433            .await
1434            .unwrap();
1435    }
1436
1437    #[gpui::test(iterations = 10)]
1438    async fn test_propagate_saves_and_fs_changes(
1439        mut cx_a: TestAppContext,
1440        mut cx_b: TestAppContext,
1441        mut cx_c: TestAppContext,
1442    ) {
1443        let lang_registry = Arc::new(LanguageRegistry::new());
1444        let fs = FakeFs::new(cx_a.background());
1445        cx_a.foreground().forbid_parking();
1446
1447        // Connect to a server as 3 clients.
1448        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1449        let client_a = server.create_client(&mut cx_a, "user_a").await;
1450        let client_b = server.create_client(&mut cx_b, "user_b").await;
1451        let client_c = server.create_client(&mut cx_c, "user_c").await;
1452
1453        // Share a worktree as client A.
1454        fs.insert_tree(
1455            "/a",
1456            json!({
1457                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1458                "file1": "",
1459                "file2": ""
1460            }),
1461        )
1462        .await;
1463        let project_a = cx_a.update(|cx| {
1464            Project::local(
1465                client_a.clone(),
1466                client_a.user_store.clone(),
1467                lang_registry.clone(),
1468                fs.clone(),
1469                cx,
1470            )
1471        });
1472        let (worktree_a, _) = project_a
1473            .update(&mut cx_a, |p, cx| {
1474                p.find_or_create_local_worktree("/a", false, cx)
1475            })
1476            .await
1477            .unwrap();
1478        worktree_a
1479            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1480            .await;
1481        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1482        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1483        project_a
1484            .update(&mut cx_a, |p, cx| p.share(cx))
1485            .await
1486            .unwrap();
1487
1488        // Join that worktree as clients B and C.
1489        let project_b = Project::remote(
1490            project_id,
1491            client_b.clone(),
1492            client_b.user_store.clone(),
1493            lang_registry.clone(),
1494            fs.clone(),
1495            &mut cx_b.to_async(),
1496        )
1497        .await
1498        .unwrap();
1499        let project_c = Project::remote(
1500            project_id,
1501            client_c.clone(),
1502            client_c.user_store.clone(),
1503            lang_registry.clone(),
1504            fs.clone(),
1505            &mut cx_c.to_async(),
1506        )
1507        .await
1508        .unwrap();
1509        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1510        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1511
1512        // Open and edit a buffer as both guests B and C.
1513        let buffer_b = project_b
1514            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1515            .await
1516            .unwrap();
1517        let buffer_c = project_c
1518            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1519            .await
1520            .unwrap();
1521        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1522        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1523
1524        // Open and edit that buffer as the host.
1525        let buffer_a = project_a
1526            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1527            .await
1528            .unwrap();
1529
1530        buffer_a
1531            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1532            .await;
1533        buffer_a.update(&mut cx_a, |buf, cx| {
1534            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1535        });
1536
1537        // Wait for edits to propagate
1538        buffer_a
1539            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1540            .await;
1541        buffer_b
1542            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1543            .await;
1544        buffer_c
1545            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1546            .await;
1547
1548        // Edit the buffer as the host and concurrently save as guest B.
1549        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1550        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1551        save_b.await.unwrap();
1552        assert_eq!(
1553            fs.load("/a/file1".as_ref()).await.unwrap(),
1554            "hi-a, i-am-c, i-am-b, i-am-a"
1555        );
1556        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1557        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1558        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1559
1560        // Make changes on host's file system, see those changes on guest worktrees.
1561        fs.rename(
1562            "/a/file1".as_ref(),
1563            "/a/file1-renamed".as_ref(),
1564            Default::default(),
1565        )
1566        .await
1567        .unwrap();
1568
1569        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1570            .await
1571            .unwrap();
1572        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1573
1574        worktree_a
1575            .condition(&cx_a, |tree, _| {
1576                tree.paths()
1577                    .map(|p| p.to_string_lossy())
1578                    .collect::<Vec<_>>()
1579                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1580            })
1581            .await;
1582        worktree_b
1583            .condition(&cx_b, |tree, _| {
1584                tree.paths()
1585                    .map(|p| p.to_string_lossy())
1586                    .collect::<Vec<_>>()
1587                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1588            })
1589            .await;
1590        worktree_c
1591            .condition(&cx_c, |tree, _| {
1592                tree.paths()
1593                    .map(|p| p.to_string_lossy())
1594                    .collect::<Vec<_>>()
1595                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1596            })
1597            .await;
1598
1599        // Ensure buffer files are updated as well.
1600        buffer_a
1601            .condition(&cx_a, |buf, _| {
1602                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1603            })
1604            .await;
1605        buffer_b
1606            .condition(&cx_b, |buf, _| {
1607                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608            })
1609            .await;
1610        buffer_c
1611            .condition(&cx_c, |buf, _| {
1612                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613            })
1614            .await;
1615    }
1616
1617    #[gpui::test(iterations = 10)]
1618    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1619        cx_a.foreground().forbid_parking();
1620        let lang_registry = Arc::new(LanguageRegistry::new());
1621        let fs = FakeFs::new(cx_a.background());
1622
1623        // Connect to a server as 2 clients.
1624        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1625        let client_a = server.create_client(&mut cx_a, "user_a").await;
1626        let client_b = server.create_client(&mut cx_b, "user_b").await;
1627
1628        // Share a project as client A
1629        fs.insert_tree(
1630            "/dir",
1631            json!({
1632                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1633                "a.txt": "a-contents",
1634            }),
1635        )
1636        .await;
1637
1638        let project_a = cx_a.update(|cx| {
1639            Project::local(
1640                client_a.clone(),
1641                client_a.user_store.clone(),
1642                lang_registry.clone(),
1643                fs.clone(),
1644                cx,
1645            )
1646        });
1647        let (worktree_a, _) = project_a
1648            .update(&mut cx_a, |p, cx| {
1649                p.find_or_create_local_worktree("/dir", false, cx)
1650            })
1651            .await
1652            .unwrap();
1653        worktree_a
1654            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1655            .await;
1656        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1657        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1658        project_a
1659            .update(&mut cx_a, |p, cx| p.share(cx))
1660            .await
1661            .unwrap();
1662
1663        // Join that project as client B
1664        let project_b = Project::remote(
1665            project_id,
1666            client_b.clone(),
1667            client_b.user_store.clone(),
1668            lang_registry.clone(),
1669            fs.clone(),
1670            &mut cx_b.to_async(),
1671        )
1672        .await
1673        .unwrap();
1674
1675        // Open a buffer as client B
1676        let buffer_b = project_b
1677            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1678            .await
1679            .unwrap();
1680
1681        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1682        buffer_b.read_with(&cx_b, |buf, _| {
1683            assert!(buf.is_dirty());
1684            assert!(!buf.has_conflict());
1685        });
1686
1687        buffer_b
1688            .update(&mut cx_b, |buf, cx| buf.save(cx))
1689            .await
1690            .unwrap();
1691        buffer_b
1692            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1693            .await;
1694        buffer_b.read_with(&cx_b, |buf, _| {
1695            assert!(!buf.has_conflict());
1696        });
1697
1698        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1699        buffer_b.read_with(&cx_b, |buf, _| {
1700            assert!(buf.is_dirty());
1701            assert!(!buf.has_conflict());
1702        });
1703    }
1704
1705    #[gpui::test(iterations = 10)]
1706    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1707        cx_a.foreground().forbid_parking();
1708        let lang_registry = Arc::new(LanguageRegistry::new());
1709        let fs = FakeFs::new(cx_a.background());
1710
1711        // Connect to a server as 2 clients.
1712        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1713        let client_a = server.create_client(&mut cx_a, "user_a").await;
1714        let client_b = server.create_client(&mut cx_b, "user_b").await;
1715
1716        // Share a project as client A
1717        fs.insert_tree(
1718            "/dir",
1719            json!({
1720                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1721                "a.txt": "a-contents",
1722            }),
1723        )
1724        .await;
1725
1726        let project_a = cx_a.update(|cx| {
1727            Project::local(
1728                client_a.clone(),
1729                client_a.user_store.clone(),
1730                lang_registry.clone(),
1731                fs.clone(),
1732                cx,
1733            )
1734        });
1735        let (worktree_a, _) = project_a
1736            .update(&mut cx_a, |p, cx| {
1737                p.find_or_create_local_worktree("/dir", false, cx)
1738            })
1739            .await
1740            .unwrap();
1741        worktree_a
1742            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1743            .await;
1744        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1745        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1746        project_a
1747            .update(&mut cx_a, |p, cx| p.share(cx))
1748            .await
1749            .unwrap();
1750
1751        // Join that project as client B
1752        let project_b = Project::remote(
1753            project_id,
1754            client_b.clone(),
1755            client_b.user_store.clone(),
1756            lang_registry.clone(),
1757            fs.clone(),
1758            &mut cx_b.to_async(),
1759        )
1760        .await
1761        .unwrap();
1762        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1763
1764        // Open a buffer as client B
1765        let buffer_b = project_b
1766            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1767            .await
1768            .unwrap();
1769        buffer_b.read_with(&cx_b, |buf, _| {
1770            assert!(!buf.is_dirty());
1771            assert!(!buf.has_conflict());
1772        });
1773
1774        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1775            .await
1776            .unwrap();
1777        buffer_b
1778            .condition(&cx_b, |buf, _| {
1779                buf.text() == "new contents" && !buf.is_dirty()
1780            })
1781            .await;
1782        buffer_b.read_with(&cx_b, |buf, _| {
1783            assert!(!buf.has_conflict());
1784        });
1785    }
1786
1787    #[gpui::test(iterations = 10)]
1788    async fn test_editing_while_guest_opens_buffer(
1789        mut cx_a: TestAppContext,
1790        mut cx_b: TestAppContext,
1791    ) {
1792        cx_a.foreground().forbid_parking();
1793        let lang_registry = Arc::new(LanguageRegistry::new());
1794        let fs = FakeFs::new(cx_a.background());
1795
1796        // Connect to a server as 2 clients.
1797        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1798        let client_a = server.create_client(&mut cx_a, "user_a").await;
1799        let client_b = server.create_client(&mut cx_b, "user_b").await;
1800
1801        // Share a project as client A
1802        fs.insert_tree(
1803            "/dir",
1804            json!({
1805                ".zed.toml": r#"collaborators = ["user_b"]"#,
1806                "a.txt": "a-contents",
1807            }),
1808        )
1809        .await;
1810        let project_a = cx_a.update(|cx| {
1811            Project::local(
1812                client_a.clone(),
1813                client_a.user_store.clone(),
1814                lang_registry.clone(),
1815                fs.clone(),
1816                cx,
1817            )
1818        });
1819        let (worktree_a, _) = project_a
1820            .update(&mut cx_a, |p, cx| {
1821                p.find_or_create_local_worktree("/dir", false, cx)
1822            })
1823            .await
1824            .unwrap();
1825        worktree_a
1826            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1827            .await;
1828        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1829        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1830        project_a
1831            .update(&mut cx_a, |p, cx| p.share(cx))
1832            .await
1833            .unwrap();
1834
1835        // Join that project as client B
1836        let project_b = Project::remote(
1837            project_id,
1838            client_b.clone(),
1839            client_b.user_store.clone(),
1840            lang_registry.clone(),
1841            fs.clone(),
1842            &mut cx_b.to_async(),
1843        )
1844        .await
1845        .unwrap();
1846
1847        // Open a buffer as client A
1848        let buffer_a = project_a
1849            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1850            .await
1851            .unwrap();
1852
1853        // Start opening the same buffer as client B
1854        let buffer_b = cx_b
1855            .background()
1856            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1857
1858        // Edit the buffer as client A while client B is still opening it.
1859        cx_b.background().simulate_random_delay().await;
1860        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1861        cx_b.background().simulate_random_delay().await;
1862        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1863
1864        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1865        let buffer_b = buffer_b.await.unwrap();
1866        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1867    }
1868
1869    #[gpui::test(iterations = 10)]
1870    async fn test_leaving_worktree_while_opening_buffer(
1871        mut cx_a: TestAppContext,
1872        mut cx_b: TestAppContext,
1873    ) {
1874        cx_a.foreground().forbid_parking();
1875        let lang_registry = Arc::new(LanguageRegistry::new());
1876        let fs = FakeFs::new(cx_a.background());
1877
1878        // Connect to a server as 2 clients.
1879        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1880        let client_a = server.create_client(&mut cx_a, "user_a").await;
1881        let client_b = server.create_client(&mut cx_b, "user_b").await;
1882
1883        // Share a project as client A
1884        fs.insert_tree(
1885            "/dir",
1886            json!({
1887                ".zed.toml": r#"collaborators = ["user_b"]"#,
1888                "a.txt": "a-contents",
1889            }),
1890        )
1891        .await;
1892        let project_a = cx_a.update(|cx| {
1893            Project::local(
1894                client_a.clone(),
1895                client_a.user_store.clone(),
1896                lang_registry.clone(),
1897                fs.clone(),
1898                cx,
1899            )
1900        });
1901        let (worktree_a, _) = project_a
1902            .update(&mut cx_a, |p, cx| {
1903                p.find_or_create_local_worktree("/dir", false, cx)
1904            })
1905            .await
1906            .unwrap();
1907        worktree_a
1908            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1909            .await;
1910        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1911        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1912        project_a
1913            .update(&mut cx_a, |p, cx| p.share(cx))
1914            .await
1915            .unwrap();
1916
1917        // Join that project as client B
1918        let project_b = Project::remote(
1919            project_id,
1920            client_b.clone(),
1921            client_b.user_store.clone(),
1922            lang_registry.clone(),
1923            fs.clone(),
1924            &mut cx_b.to_async(),
1925        )
1926        .await
1927        .unwrap();
1928
1929        // See that a guest has joined as client A.
1930        project_a
1931            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1932            .await;
1933
1934        // Begin opening a buffer as client B, but leave the project before the open completes.
1935        let buffer_b = cx_b
1936            .background()
1937            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1938        cx_b.update(|_| drop(project_b));
1939        drop(buffer_b);
1940
1941        // See that the guest has left.
1942        project_a
1943            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1944            .await;
1945    }
1946
1947    #[gpui::test(iterations = 10)]
1948    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1949        cx_a.foreground().forbid_parking();
1950        let lang_registry = Arc::new(LanguageRegistry::new());
1951        let fs = FakeFs::new(cx_a.background());
1952
1953        // Connect to a server as 2 clients.
1954        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1955        let client_a = server.create_client(&mut cx_a, "user_a").await;
1956        let client_b = server.create_client(&mut cx_b, "user_b").await;
1957
1958        // Share a project as client A
1959        fs.insert_tree(
1960            "/a",
1961            json!({
1962                ".zed.toml": r#"collaborators = ["user_b"]"#,
1963                "a.txt": "a-contents",
1964                "b.txt": "b-contents",
1965            }),
1966        )
1967        .await;
1968        let project_a = cx_a.update(|cx| {
1969            Project::local(
1970                client_a.clone(),
1971                client_a.user_store.clone(),
1972                lang_registry.clone(),
1973                fs.clone(),
1974                cx,
1975            )
1976        });
1977        let (worktree_a, _) = project_a
1978            .update(&mut cx_a, |p, cx| {
1979                p.find_or_create_local_worktree("/a", false, cx)
1980            })
1981            .await
1982            .unwrap();
1983        worktree_a
1984            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1985            .await;
1986        let project_id = project_a
1987            .update(&mut cx_a, |project, _| project.next_remote_id())
1988            .await;
1989        project_a
1990            .update(&mut cx_a, |project, cx| project.share(cx))
1991            .await
1992            .unwrap();
1993
1994        // Join that project as client B
1995        let _project_b = Project::remote(
1996            project_id,
1997            client_b.clone(),
1998            client_b.user_store.clone(),
1999            lang_registry.clone(),
2000            fs.clone(),
2001            &mut cx_b.to_async(),
2002        )
2003        .await
2004        .unwrap();
2005
2006        // See that a guest has joined as client A.
2007        project_a
2008            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2009            .await;
2010
2011        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2012        client_b.disconnect(&cx_b.to_async()).unwrap();
2013        project_a
2014            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2015            .await;
2016    }
2017
2018    #[gpui::test(iterations = 10)]
2019    async fn test_collaborating_with_diagnostics(
2020        mut cx_a: TestAppContext,
2021        mut cx_b: TestAppContext,
2022    ) {
2023        cx_a.foreground().forbid_parking();
2024        let mut lang_registry = Arc::new(LanguageRegistry::new());
2025        let fs = FakeFs::new(cx_a.background());
2026
2027        // Set up a fake language server.
2028        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2029        Arc::get_mut(&mut lang_registry)
2030            .unwrap()
2031            .add(Arc::new(Language::new(
2032                LanguageConfig {
2033                    name: "Rust".into(),
2034                    path_suffixes: vec!["rs".to_string()],
2035                    language_server: Some(language_server_config),
2036                    ..Default::default()
2037                },
2038                Some(tree_sitter_rust::language()),
2039            )));
2040
2041        // Connect to a server as 2 clients.
2042        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2043        let client_a = server.create_client(&mut cx_a, "user_a").await;
2044        let client_b = server.create_client(&mut cx_b, "user_b").await;
2045
2046        // Share a project as client A
2047        fs.insert_tree(
2048            "/a",
2049            json!({
2050                ".zed.toml": r#"collaborators = ["user_b"]"#,
2051                "a.rs": "let one = two",
2052                "other.rs": "",
2053            }),
2054        )
2055        .await;
2056        let project_a = cx_a.update(|cx| {
2057            Project::local(
2058                client_a.clone(),
2059                client_a.user_store.clone(),
2060                lang_registry.clone(),
2061                fs.clone(),
2062                cx,
2063            )
2064        });
2065        let (worktree_a, _) = project_a
2066            .update(&mut cx_a, |p, cx| {
2067                p.find_or_create_local_worktree("/a", false, cx)
2068            })
2069            .await
2070            .unwrap();
2071        worktree_a
2072            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2073            .await;
2074        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2075        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2076        project_a
2077            .update(&mut cx_a, |p, cx| p.share(cx))
2078            .await
2079            .unwrap();
2080
2081        // Cause the language server to start.
2082        let _ = cx_a
2083            .background()
2084            .spawn(project_a.update(&mut cx_a, |project, cx| {
2085                project.open_buffer(
2086                    ProjectPath {
2087                        worktree_id,
2088                        path: Path::new("other.rs").into(),
2089                    },
2090                    cx,
2091                )
2092            }))
2093            .await
2094            .unwrap();
2095
2096        // Simulate a language server reporting errors for a file.
2097        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2098        fake_language_server
2099            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2100                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2101                version: None,
2102                diagnostics: vec![lsp::Diagnostic {
2103                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2104                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2105                    message: "message 1".to_string(),
2106                    ..Default::default()
2107                }],
2108            })
2109            .await;
2110
2111        // Wait for server to see the diagnostics update.
2112        server
2113            .condition(|store| {
2114                let worktree = store
2115                    .project(project_id)
2116                    .unwrap()
2117                    .share
2118                    .as_ref()
2119                    .unwrap()
2120                    .worktrees
2121                    .get(&worktree_id.to_proto())
2122                    .unwrap();
2123
2124                !worktree.diagnostic_summaries.is_empty()
2125            })
2126            .await;
2127
2128        // Join the worktree as client B.
2129        let project_b = Project::remote(
2130            project_id,
2131            client_b.clone(),
2132            client_b.user_store.clone(),
2133            lang_registry.clone(),
2134            fs.clone(),
2135            &mut cx_b.to_async(),
2136        )
2137        .await
2138        .unwrap();
2139
2140        project_b.read_with(&cx_b, |project, cx| {
2141            assert_eq!(
2142                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2143                &[(
2144                    ProjectPath {
2145                        worktree_id,
2146                        path: Arc::from(Path::new("a.rs")),
2147                    },
2148                    DiagnosticSummary {
2149                        error_count: 1,
2150                        warning_count: 0,
2151                        ..Default::default()
2152                    },
2153                )]
2154            )
2155        });
2156
2157        // Simulate a language server reporting more errors for a file.
2158        fake_language_server
2159            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2160                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2161                version: None,
2162                diagnostics: vec![
2163                    lsp::Diagnostic {
2164                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2165                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2166                        message: "message 1".to_string(),
2167                        ..Default::default()
2168                    },
2169                    lsp::Diagnostic {
2170                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2171                        range: lsp::Range::new(
2172                            lsp::Position::new(0, 10),
2173                            lsp::Position::new(0, 13),
2174                        ),
2175                        message: "message 2".to_string(),
2176                        ..Default::default()
2177                    },
2178                ],
2179            })
2180            .await;
2181
2182        // Client b gets the updated summaries
2183        project_b
2184            .condition(&cx_b, |project, cx| {
2185                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2186                    == &[(
2187                        ProjectPath {
2188                            worktree_id,
2189                            path: Arc::from(Path::new("a.rs")),
2190                        },
2191                        DiagnosticSummary {
2192                            error_count: 1,
2193                            warning_count: 1,
2194                            ..Default::default()
2195                        },
2196                    )]
2197            })
2198            .await;
2199
2200        // Open the file with the errors on client B. They should be present.
2201        let buffer_b = cx_b
2202            .background()
2203            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2204            .await
2205            .unwrap();
2206
2207        buffer_b.read_with(&cx_b, |buffer, _| {
2208            assert_eq!(
2209                buffer
2210                    .snapshot()
2211                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2212                    .map(|entry| entry)
2213                    .collect::<Vec<_>>(),
2214                &[
2215                    DiagnosticEntry {
2216                        range: Point::new(0, 4)..Point::new(0, 7),
2217                        diagnostic: Diagnostic {
2218                            group_id: 0,
2219                            message: "message 1".to_string(),
2220                            severity: lsp::DiagnosticSeverity::ERROR,
2221                            is_primary: true,
2222                            ..Default::default()
2223                        }
2224                    },
2225                    DiagnosticEntry {
2226                        range: Point::new(0, 10)..Point::new(0, 13),
2227                        diagnostic: Diagnostic {
2228                            group_id: 1,
2229                            severity: lsp::DiagnosticSeverity::WARNING,
2230                            message: "message 2".to_string(),
2231                            is_primary: true,
2232                            ..Default::default()
2233                        }
2234                    }
2235                ]
2236            );
2237        });
2238    }
2239
2240    #[gpui::test(iterations = 10)]
2241    async fn test_collaborating_with_completion(
2242        mut cx_a: TestAppContext,
2243        mut cx_b: TestAppContext,
2244    ) {
2245        cx_a.foreground().forbid_parking();
2246        let mut lang_registry = Arc::new(LanguageRegistry::new());
2247        let fs = FakeFs::new(cx_a.background());
2248
2249        // Set up a fake language server.
2250        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2251        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2252            completion_provider: Some(lsp::CompletionOptions {
2253                trigger_characters: Some(vec![".".to_string()]),
2254                ..Default::default()
2255            }),
2256            ..Default::default()
2257        });
2258        Arc::get_mut(&mut lang_registry)
2259            .unwrap()
2260            .add(Arc::new(Language::new(
2261                LanguageConfig {
2262                    name: "Rust".into(),
2263                    path_suffixes: vec!["rs".to_string()],
2264                    language_server: Some(language_server_config),
2265                    ..Default::default()
2266                },
2267                Some(tree_sitter_rust::language()),
2268            )));
2269
2270        // Connect to a server as 2 clients.
2271        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2272        let client_a = server.create_client(&mut cx_a, "user_a").await;
2273        let client_b = server.create_client(&mut cx_b, "user_b").await;
2274
2275        // Share a project as client A
2276        fs.insert_tree(
2277            "/a",
2278            json!({
2279                ".zed.toml": r#"collaborators = ["user_b"]"#,
2280                "main.rs": "fn main() { a }",
2281                "other.rs": "",
2282            }),
2283        )
2284        .await;
2285        let project_a = cx_a.update(|cx| {
2286            Project::local(
2287                client_a.clone(),
2288                client_a.user_store.clone(),
2289                lang_registry.clone(),
2290                fs.clone(),
2291                cx,
2292            )
2293        });
2294        let (worktree_a, _) = project_a
2295            .update(&mut cx_a, |p, cx| {
2296                p.find_or_create_local_worktree("/a", false, cx)
2297            })
2298            .await
2299            .unwrap();
2300        worktree_a
2301            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2302            .await;
2303        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2304        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2305        project_a
2306            .update(&mut cx_a, |p, cx| p.share(cx))
2307            .await
2308            .unwrap();
2309
2310        // Join the worktree as client B.
2311        let project_b = Project::remote(
2312            project_id,
2313            client_b.clone(),
2314            client_b.user_store.clone(),
2315            lang_registry.clone(),
2316            fs.clone(),
2317            &mut cx_b.to_async(),
2318        )
2319        .await
2320        .unwrap();
2321
2322        // Open a file in an editor as the guest.
2323        let buffer_b = project_b
2324            .update(&mut cx_b, |p, cx| {
2325                p.open_buffer((worktree_id, "main.rs"), cx)
2326            })
2327            .await
2328            .unwrap();
2329        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2330        let editor_b = cx_b.add_view(window_b, |cx| {
2331            Editor::for_buffer(
2332                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2333                Arc::new(|cx| EditorSettings::test(cx)),
2334                Some(project_b.clone()),
2335                cx,
2336            )
2337        });
2338
2339        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2340        buffer_b
2341            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2342            .await;
2343
2344        // Type a completion trigger character as the guest.
2345        editor_b.update(&mut cx_b, |editor, cx| {
2346            editor.select_ranges([13..13], None, cx);
2347            editor.handle_input(&Input(".".into()), cx);
2348            cx.focus(&editor_b);
2349        });
2350
2351        // Receive a completion request as the host's language server.
2352        // Return some completions from the host's language server.
2353        cx_a.foreground().start_waiting();
2354        fake_language_server
2355            .handle_request::<lsp::request::Completion, _>(|params, _| {
2356                assert_eq!(
2357                    params.text_document_position.text_document.uri,
2358                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2359                );
2360                assert_eq!(
2361                    params.text_document_position.position,
2362                    lsp::Position::new(0, 14),
2363                );
2364
2365                Some(lsp::CompletionResponse::Array(vec![
2366                    lsp::CompletionItem {
2367                        label: "first_method(…)".into(),
2368                        detail: Some("fn(&mut self, B) -> C".into()),
2369                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2370                            new_text: "first_method($1)".to_string(),
2371                            range: lsp::Range::new(
2372                                lsp::Position::new(0, 14),
2373                                lsp::Position::new(0, 14),
2374                            ),
2375                        })),
2376                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2377                        ..Default::default()
2378                    },
2379                    lsp::CompletionItem {
2380                        label: "second_method(…)".into(),
2381                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2382                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2383                            new_text: "second_method()".to_string(),
2384                            range: lsp::Range::new(
2385                                lsp::Position::new(0, 14),
2386                                lsp::Position::new(0, 14),
2387                            ),
2388                        })),
2389                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2390                        ..Default::default()
2391                    },
2392                ]))
2393            })
2394            .next()
2395            .await
2396            .unwrap();
2397        cx_a.foreground().finish_waiting();
2398
2399        // Open the buffer on the host.
2400        let buffer_a = project_a
2401            .update(&mut cx_a, |p, cx| {
2402                p.open_buffer((worktree_id, "main.rs"), cx)
2403            })
2404            .await
2405            .unwrap();
2406        buffer_a
2407            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2408            .await;
2409
2410        // Confirm a completion on the guest.
2411        editor_b
2412            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2413            .await;
2414        editor_b.update(&mut cx_b, |editor, cx| {
2415            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2416            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2417        });
2418
2419        // Return a resolved completion from the host's language server.
2420        // The resolved completion has an additional text edit.
2421        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2422            |params, _| {
2423                assert_eq!(params.label, "first_method(…)");
2424                lsp::CompletionItem {
2425                    label: "first_method(…)".into(),
2426                    detail: Some("fn(&mut self, B) -> C".into()),
2427                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2428                        new_text: "first_method($1)".to_string(),
2429                        range: lsp::Range::new(
2430                            lsp::Position::new(0, 14),
2431                            lsp::Position::new(0, 14),
2432                        ),
2433                    })),
2434                    additional_text_edits: Some(vec![lsp::TextEdit {
2435                        new_text: "use d::SomeTrait;\n".to_string(),
2436                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2437                    }]),
2438                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2439                    ..Default::default()
2440                }
2441            },
2442        );
2443
2444        // The additional edit is applied.
2445        buffer_a
2446            .condition(&cx_a, |buffer, _| {
2447                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2448            })
2449            .await;
2450        buffer_b
2451            .condition(&cx_b, |buffer, _| {
2452                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2453            })
2454            .await;
2455    }
2456
2457    #[gpui::test(iterations = 10)]
2458    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2459        cx_a.foreground().forbid_parking();
2460        let mut lang_registry = Arc::new(LanguageRegistry::new());
2461        let fs = FakeFs::new(cx_a.background());
2462
2463        // Set up a fake language server.
2464        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2465        Arc::get_mut(&mut lang_registry)
2466            .unwrap()
2467            .add(Arc::new(Language::new(
2468                LanguageConfig {
2469                    name: "Rust".into(),
2470                    path_suffixes: vec!["rs".to_string()],
2471                    language_server: Some(language_server_config),
2472                    ..Default::default()
2473                },
2474                Some(tree_sitter_rust::language()),
2475            )));
2476
2477        // Connect to a server as 2 clients.
2478        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2479        let client_a = server.create_client(&mut cx_a, "user_a").await;
2480        let client_b = server.create_client(&mut cx_b, "user_b").await;
2481
2482        // Share a project as client A
2483        fs.insert_tree(
2484            "/a",
2485            json!({
2486                ".zed.toml": r#"collaborators = ["user_b"]"#,
2487                "a.rs": "let one = two",
2488            }),
2489        )
2490        .await;
2491        let project_a = cx_a.update(|cx| {
2492            Project::local(
2493                client_a.clone(),
2494                client_a.user_store.clone(),
2495                lang_registry.clone(),
2496                fs.clone(),
2497                cx,
2498            )
2499        });
2500        let (worktree_a, _) = project_a
2501            .update(&mut cx_a, |p, cx| {
2502                p.find_or_create_local_worktree("/a", false, cx)
2503            })
2504            .await
2505            .unwrap();
2506        worktree_a
2507            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2508            .await;
2509        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2510        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2511        project_a
2512            .update(&mut cx_a, |p, cx| p.share(cx))
2513            .await
2514            .unwrap();
2515
2516        // Join the worktree as client B.
2517        let project_b = Project::remote(
2518            project_id,
2519            client_b.clone(),
2520            client_b.user_store.clone(),
2521            lang_registry.clone(),
2522            fs.clone(),
2523            &mut cx_b.to_async(),
2524        )
2525        .await
2526        .unwrap();
2527
2528        let buffer_b = cx_b
2529            .background()
2530            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2531            .await
2532            .unwrap();
2533
2534        let format = project_b.update(&mut cx_b, |project, cx| {
2535            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2536        });
2537
2538        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2539        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2540            Some(vec![
2541                lsp::TextEdit {
2542                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2543                    new_text: "h".to_string(),
2544                },
2545                lsp::TextEdit {
2546                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2547                    new_text: "y".to_string(),
2548                },
2549            ])
2550        });
2551
2552        format.await.unwrap();
2553        assert_eq!(
2554            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2555            "let honey = two"
2556        );
2557    }
2558
2559    #[gpui::test(iterations = 10)]
2560    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2561        cx_a.foreground().forbid_parking();
2562        let mut lang_registry = Arc::new(LanguageRegistry::new());
2563        let fs = FakeFs::new(cx_a.background());
2564        fs.insert_tree(
2565            "/root-1",
2566            json!({
2567                ".zed.toml": r#"collaborators = ["user_b"]"#,
2568                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2569            }),
2570        )
2571        .await;
2572        fs.insert_tree(
2573            "/root-2",
2574            json!({
2575                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2576            }),
2577        )
2578        .await;
2579
2580        // Set up a fake language server.
2581        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2582        Arc::get_mut(&mut lang_registry)
2583            .unwrap()
2584            .add(Arc::new(Language::new(
2585                LanguageConfig {
2586                    name: "Rust".into(),
2587                    path_suffixes: vec!["rs".to_string()],
2588                    language_server: Some(language_server_config),
2589                    ..Default::default()
2590                },
2591                Some(tree_sitter_rust::language()),
2592            )));
2593
2594        // Connect to a server as 2 clients.
2595        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2596        let client_a = server.create_client(&mut cx_a, "user_a").await;
2597        let client_b = server.create_client(&mut cx_b, "user_b").await;
2598
2599        // Share a project as client A
2600        let project_a = cx_a.update(|cx| {
2601            Project::local(
2602                client_a.clone(),
2603                client_a.user_store.clone(),
2604                lang_registry.clone(),
2605                fs.clone(),
2606                cx,
2607            )
2608        });
2609        let (worktree_a, _) = project_a
2610            .update(&mut cx_a, |p, cx| {
2611                p.find_or_create_local_worktree("/root-1", false, cx)
2612            })
2613            .await
2614            .unwrap();
2615        worktree_a
2616            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2617            .await;
2618        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2619        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2620        project_a
2621            .update(&mut cx_a, |p, cx| p.share(cx))
2622            .await
2623            .unwrap();
2624
2625        // Join the worktree as client B.
2626        let project_b = Project::remote(
2627            project_id,
2628            client_b.clone(),
2629            client_b.user_store.clone(),
2630            lang_registry.clone(),
2631            fs.clone(),
2632            &mut cx_b.to_async(),
2633        )
2634        .await
2635        .unwrap();
2636
2637        // Open the file on client B.
2638        let buffer_b = cx_b
2639            .background()
2640            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2641            .await
2642            .unwrap();
2643
2644        // Request the definition of a symbol as the guest.
2645        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2646
2647        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2648        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2649            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2650                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2651                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2652            )))
2653        });
2654
2655        let definitions_1 = definitions_1.await.unwrap();
2656        cx_b.read(|cx| {
2657            assert_eq!(definitions_1.len(), 1);
2658            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2659            let target_buffer = definitions_1[0].buffer.read(cx);
2660            assert_eq!(
2661                target_buffer.text(),
2662                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2663            );
2664            assert_eq!(
2665                definitions_1[0].range.to_point(target_buffer),
2666                Point::new(0, 6)..Point::new(0, 9)
2667            );
2668        });
2669
2670        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2671        // the previous call to `definition`.
2672        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2673        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2674            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2675                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2676                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2677            )))
2678        });
2679
2680        let definitions_2 = definitions_2.await.unwrap();
2681        cx_b.read(|cx| {
2682            assert_eq!(definitions_2.len(), 1);
2683            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2684            let target_buffer = definitions_2[0].buffer.read(cx);
2685            assert_eq!(
2686                target_buffer.text(),
2687                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2688            );
2689            assert_eq!(
2690                definitions_2[0].range.to_point(target_buffer),
2691                Point::new(1, 6)..Point::new(1, 11)
2692            );
2693        });
2694        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2695
2696        cx_b.update(|_| {
2697            drop(definitions_1);
2698            drop(definitions_2);
2699        });
2700        project_b
2701            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2702            .await;
2703    }
2704
2705    #[gpui::test(iterations = 10)]
2706    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2707        cx_a.foreground().forbid_parking();
2708        let mut lang_registry = Arc::new(LanguageRegistry::new());
2709        let fs = FakeFs::new(cx_a.background());
2710        fs.insert_tree(
2711            "/root-1",
2712            json!({
2713                ".zed.toml": r#"collaborators = ["user_b"]"#,
2714                "one.rs": "const ONE: usize = 1;",
2715                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2716            }),
2717        )
2718        .await;
2719        fs.insert_tree(
2720            "/root-2",
2721            json!({
2722                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2723            }),
2724        )
2725        .await;
2726
2727        // Set up a fake language server.
2728        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2729        Arc::get_mut(&mut lang_registry)
2730            .unwrap()
2731            .add(Arc::new(Language::new(
2732                LanguageConfig {
2733                    name: "Rust".into(),
2734                    path_suffixes: vec!["rs".to_string()],
2735                    language_server: Some(language_server_config),
2736                    ..Default::default()
2737                },
2738                Some(tree_sitter_rust::language()),
2739            )));
2740
2741        // Connect to a server as 2 clients.
2742        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2743        let client_a = server.create_client(&mut cx_a, "user_a").await;
2744        let client_b = server.create_client(&mut cx_b, "user_b").await;
2745
2746        // Share a project as client A
2747        let project_a = cx_a.update(|cx| {
2748            Project::local(
2749                client_a.clone(),
2750                client_a.user_store.clone(),
2751                lang_registry.clone(),
2752                fs.clone(),
2753                cx,
2754            )
2755        });
2756        let (worktree_a, _) = project_a
2757            .update(&mut cx_a, |p, cx| {
2758                p.find_or_create_local_worktree("/root-1", false, cx)
2759            })
2760            .await
2761            .unwrap();
2762        worktree_a
2763            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2764            .await;
2765        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2766        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2767        project_a
2768            .update(&mut cx_a, |p, cx| p.share(cx))
2769            .await
2770            .unwrap();
2771
2772        // Join the worktree as client B.
2773        let project_b = Project::remote(
2774            project_id,
2775            client_b.clone(),
2776            client_b.user_store.clone(),
2777            lang_registry.clone(),
2778            fs.clone(),
2779            &mut cx_b.to_async(),
2780        )
2781        .await
2782        .unwrap();
2783
2784        // Open the file on client B.
2785        let buffer_b = cx_b
2786            .background()
2787            .spawn(project_b.update(&mut cx_b, |p, cx| {
2788                p.open_buffer((worktree_id, "one.rs"), cx)
2789            }))
2790            .await
2791            .unwrap();
2792
2793        // Request references to a symbol as the guest.
2794        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2795
2796        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2797        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2798            assert_eq!(
2799                params.text_document_position.text_document.uri.as_str(),
2800                "file:///root-1/one.rs"
2801            );
2802            Some(vec![
2803                lsp::Location {
2804                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2805                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2806                },
2807                lsp::Location {
2808                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2809                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2810                },
2811                lsp::Location {
2812                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2813                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2814                },
2815            ])
2816        });
2817
2818        let references = references.await.unwrap();
2819        cx_b.read(|cx| {
2820            assert_eq!(references.len(), 3);
2821            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2822
2823            let two_buffer = references[0].buffer.read(cx);
2824            let three_buffer = references[2].buffer.read(cx);
2825            assert_eq!(
2826                two_buffer.file().unwrap().path().as_ref(),
2827                Path::new("two.rs")
2828            );
2829            assert_eq!(references[1].buffer, references[0].buffer);
2830            assert_eq!(
2831                three_buffer.file().unwrap().full_path(cx),
2832                Path::new("three.rs")
2833            );
2834
2835            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2836            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2837            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2838        });
2839    }
2840
2841    #[gpui::test(iterations = 10)]
2842    async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2843        cx_a.foreground().forbid_parking();
2844        let lang_registry = Arc::new(LanguageRegistry::new());
2845        let fs = FakeFs::new(cx_a.background());
2846        fs.insert_tree(
2847            "/root-1",
2848            json!({
2849                ".zed.toml": r#"collaborators = ["user_b"]"#,
2850                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2851            }),
2852        )
2853        .await;
2854
2855        // Set up a fake language server.
2856        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2857        lang_registry.add(Arc::new(Language::new(
2858            LanguageConfig {
2859                name: "Rust".into(),
2860                path_suffixes: vec!["rs".to_string()],
2861                language_server: Some(language_server_config),
2862                ..Default::default()
2863            },
2864            Some(tree_sitter_rust::language()),
2865        )));
2866
2867        // Connect to a server as 2 clients.
2868        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2869        let client_a = server.create_client(&mut cx_a, "user_a").await;
2870        let client_b = server.create_client(&mut cx_b, "user_b").await;
2871
2872        // Share a project as client A
2873        let project_a = cx_a.update(|cx| {
2874            Project::local(
2875                client_a.clone(),
2876                client_a.user_store.clone(),
2877                lang_registry.clone(),
2878                fs.clone(),
2879                cx,
2880            )
2881        });
2882        let (worktree_a, _) = project_a
2883            .update(&mut cx_a, |p, cx| {
2884                p.find_or_create_local_worktree("/root-1", false, cx)
2885            })
2886            .await
2887            .unwrap();
2888        worktree_a
2889            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2890            .await;
2891        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2892        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2893        project_a
2894            .update(&mut cx_a, |p, cx| p.share(cx))
2895            .await
2896            .unwrap();
2897
2898        // Join the worktree as client B.
2899        let project_b = Project::remote(
2900            project_id,
2901            client_b.clone(),
2902            client_b.user_store.clone(),
2903            lang_registry.clone(),
2904            fs.clone(),
2905            &mut cx_b.to_async(),
2906        )
2907        .await
2908        .unwrap();
2909
2910        // Open the file on client B.
2911        let buffer_b = cx_b
2912            .background()
2913            .spawn(project_b.update(&mut cx_b, |p, cx| {
2914                p.open_buffer((worktree_id, "main.rs"), cx)
2915            }))
2916            .await
2917            .unwrap();
2918
2919        // Request document highlights as the guest.
2920        let highlights =
2921            project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2922
2923        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2924        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2925            |params, _| {
2926                assert_eq!(
2927                    params
2928                        .text_document_position_params
2929                        .text_document
2930                        .uri
2931                        .as_str(),
2932                    "file:///root-1/main.rs"
2933                );
2934                assert_eq!(
2935                    params.text_document_position_params.position,
2936                    lsp::Position::new(0, 34)
2937                );
2938                Some(vec![
2939                    lsp::DocumentHighlight {
2940                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2941                        range: lsp::Range::new(
2942                            lsp::Position::new(0, 10),
2943                            lsp::Position::new(0, 16),
2944                        ),
2945                    },
2946                    lsp::DocumentHighlight {
2947                        kind: Some(lsp::DocumentHighlightKind::READ),
2948                        range: lsp::Range::new(
2949                            lsp::Position::new(0, 32),
2950                            lsp::Position::new(0, 38),
2951                        ),
2952                    },
2953                    lsp::DocumentHighlight {
2954                        kind: Some(lsp::DocumentHighlightKind::READ),
2955                        range: lsp::Range::new(
2956                            lsp::Position::new(0, 41),
2957                            lsp::Position::new(0, 47),
2958                        ),
2959                    },
2960                ])
2961            },
2962        );
2963
2964        let highlights = highlights.await.unwrap();
2965        buffer_b.read_with(&cx_b, |buffer, _| {
2966            let snapshot = buffer.snapshot();
2967
2968            let highlights = highlights
2969                .into_iter()
2970                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2971                .collect::<Vec<_>>();
2972            assert_eq!(
2973                highlights,
2974                &[
2975                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2976                    (lsp::DocumentHighlightKind::READ, 32..38),
2977                    (lsp::DocumentHighlightKind::READ, 41..47)
2978                ]
2979            )
2980        });
2981    }
2982
2983    #[gpui::test(iterations = 10)]
2984    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2985        cx_a.foreground().forbid_parking();
2986        let mut lang_registry = Arc::new(LanguageRegistry::new());
2987        let fs = FakeFs::new(cx_a.background());
2988        fs.insert_tree(
2989            "/code",
2990            json!({
2991                "crate-1": {
2992                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2993                    "one.rs": "const ONE: usize = 1;",
2994                },
2995                "crate-2": {
2996                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2997                },
2998                "private": {
2999                    "passwords.txt": "the-password",
3000                }
3001            }),
3002        )
3003        .await;
3004
3005        // Set up a fake language server.
3006        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3007        Arc::get_mut(&mut lang_registry)
3008            .unwrap()
3009            .add(Arc::new(Language::new(
3010                LanguageConfig {
3011                    name: "Rust".into(),
3012                    path_suffixes: vec!["rs".to_string()],
3013                    language_server: Some(language_server_config),
3014                    ..Default::default()
3015                },
3016                Some(tree_sitter_rust::language()),
3017            )));
3018
3019        // Connect to a server as 2 clients.
3020        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3021        let client_a = server.create_client(&mut cx_a, "user_a").await;
3022        let client_b = server.create_client(&mut cx_b, "user_b").await;
3023
3024        // Share a project as client A
3025        let project_a = cx_a.update(|cx| {
3026            Project::local(
3027                client_a.clone(),
3028                client_a.user_store.clone(),
3029                lang_registry.clone(),
3030                fs.clone(),
3031                cx,
3032            )
3033        });
3034        let (worktree_a, _) = project_a
3035            .update(&mut cx_a, |p, cx| {
3036                p.find_or_create_local_worktree("/code/crate-1", false, cx)
3037            })
3038            .await
3039            .unwrap();
3040        worktree_a
3041            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3042            .await;
3043        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3044        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3045        project_a
3046            .update(&mut cx_a, |p, cx| p.share(cx))
3047            .await
3048            .unwrap();
3049
3050        // Join the worktree as client B.
3051        let project_b = Project::remote(
3052            project_id,
3053            client_b.clone(),
3054            client_b.user_store.clone(),
3055            lang_registry.clone(),
3056            fs.clone(),
3057            &mut cx_b.to_async(),
3058        )
3059        .await
3060        .unwrap();
3061
3062        // Cause the language server to start.
3063        let _buffer = cx_b
3064            .background()
3065            .spawn(project_b.update(&mut cx_b, |p, cx| {
3066                p.open_buffer((worktree_id, "one.rs"), cx)
3067            }))
3068            .await
3069            .unwrap();
3070
3071        // Request the definition of a symbol as the guest.
3072        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3073        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3074        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3075            #[allow(deprecated)]
3076            Some(vec![lsp::SymbolInformation {
3077                name: "TWO".into(),
3078                location: lsp::Location {
3079                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3080                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3081                },
3082                kind: lsp::SymbolKind::CONSTANT,
3083                tags: None,
3084                container_name: None,
3085                deprecated: None,
3086            }])
3087        });
3088
3089        let symbols = symbols.await.unwrap();
3090        assert_eq!(symbols.len(), 1);
3091        assert_eq!(symbols[0].name, "TWO");
3092
3093        // Open one of the returned symbols.
3094        let buffer_b_2 = project_b
3095            .update(&mut cx_b, |project, cx| {
3096                project.open_buffer_for_symbol(&symbols[0], cx)
3097            })
3098            .await
3099            .unwrap();
3100        buffer_b_2.read_with(&cx_b, |buffer, _| {
3101            assert_eq!(
3102                buffer.file().unwrap().path().as_ref(),
3103                Path::new("../crate-2/two.rs")
3104            );
3105        });
3106
3107        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3108        let mut fake_symbol = symbols[0].clone();
3109        fake_symbol.path = Path::new("/code/secrets").into();
3110        let error = project_b
3111            .update(&mut cx_b, |project, cx| {
3112                project.open_buffer_for_symbol(&fake_symbol, cx)
3113            })
3114            .await
3115            .unwrap_err();
3116        assert!(error.to_string().contains("invalid symbol signature"));
3117    }
3118
3119    #[gpui::test(iterations = 10)]
3120    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3121        mut cx_a: TestAppContext,
3122        mut cx_b: TestAppContext,
3123        mut rng: StdRng,
3124    ) {
3125        cx_a.foreground().forbid_parking();
3126        let mut lang_registry = Arc::new(LanguageRegistry::new());
3127        let fs = FakeFs::new(cx_a.background());
3128        fs.insert_tree(
3129            "/root",
3130            json!({
3131                ".zed.toml": r#"collaborators = ["user_b"]"#,
3132                "a.rs": "const ONE: usize = b::TWO;",
3133                "b.rs": "const TWO: usize = 2",
3134            }),
3135        )
3136        .await;
3137
3138        // Set up a fake language server.
3139        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3140
3141        Arc::get_mut(&mut lang_registry)
3142            .unwrap()
3143            .add(Arc::new(Language::new(
3144                LanguageConfig {
3145                    name: "Rust".into(),
3146                    path_suffixes: vec!["rs".to_string()],
3147                    language_server: Some(language_server_config),
3148                    ..Default::default()
3149                },
3150                Some(tree_sitter_rust::language()),
3151            )));
3152
3153        // Connect to a server as 2 clients.
3154        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3155        let client_a = server.create_client(&mut cx_a, "user_a").await;
3156        let client_b = server.create_client(&mut cx_b, "user_b").await;
3157
3158        // Share a project as client A
3159        let project_a = cx_a.update(|cx| {
3160            Project::local(
3161                client_a.clone(),
3162                client_a.user_store.clone(),
3163                lang_registry.clone(),
3164                fs.clone(),
3165                cx,
3166            )
3167        });
3168
3169        let (worktree_a, _) = project_a
3170            .update(&mut cx_a, |p, cx| {
3171                p.find_or_create_local_worktree("/root", false, cx)
3172            })
3173            .await
3174            .unwrap();
3175        worktree_a
3176            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3177            .await;
3178        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3179        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3180        project_a
3181            .update(&mut cx_a, |p, cx| p.share(cx))
3182            .await
3183            .unwrap();
3184
3185        // Join the worktree as client B.
3186        let project_b = Project::remote(
3187            project_id,
3188            client_b.clone(),
3189            client_b.user_store.clone(),
3190            lang_registry.clone(),
3191            fs.clone(),
3192            &mut cx_b.to_async(),
3193        )
3194        .await
3195        .unwrap();
3196
3197        let buffer_b1 = cx_b
3198            .background()
3199            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3200            .await
3201            .unwrap();
3202
3203        let definitions;
3204        let buffer_b2;
3205        if rng.gen() {
3206            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3207            buffer_b2 =
3208                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3209        } else {
3210            buffer_b2 =
3211                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3212            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3213        }
3214
3215        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3216        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3217            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3218                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3219                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3220            )))
3221        });
3222
3223        let buffer_b2 = buffer_b2.await.unwrap();
3224        let definitions = definitions.await.unwrap();
3225        assert_eq!(definitions.len(), 1);
3226        assert_eq!(definitions[0].buffer, buffer_b2);
3227    }
3228
3229    #[gpui::test(iterations = 10)]
3230    async fn test_collaborating_with_code_actions(
3231        mut cx_a: TestAppContext,
3232        mut cx_b: TestAppContext,
3233    ) {
3234        cx_a.foreground().forbid_parking();
3235        let mut lang_registry = Arc::new(LanguageRegistry::new());
3236        let fs = FakeFs::new(cx_a.background());
3237        let mut path_openers_b = Vec::new();
3238        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3239
3240        // Set up a fake language server.
3241        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3242        Arc::get_mut(&mut lang_registry)
3243            .unwrap()
3244            .add(Arc::new(Language::new(
3245                LanguageConfig {
3246                    name: "Rust".into(),
3247                    path_suffixes: vec!["rs".to_string()],
3248                    language_server: Some(language_server_config),
3249                    ..Default::default()
3250                },
3251                Some(tree_sitter_rust::language()),
3252            )));
3253
3254        // Connect to a server as 2 clients.
3255        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3256        let client_a = server.create_client(&mut cx_a, "user_a").await;
3257        let client_b = server.create_client(&mut cx_b, "user_b").await;
3258
3259        // Share a project as client A
3260        fs.insert_tree(
3261            "/a",
3262            json!({
3263                ".zed.toml": r#"collaborators = ["user_b"]"#,
3264                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3265                "other.rs": "pub fn foo() -> usize { 4 }",
3266            }),
3267        )
3268        .await;
3269        let project_a = cx_a.update(|cx| {
3270            Project::local(
3271                client_a.clone(),
3272                client_a.user_store.clone(),
3273                lang_registry.clone(),
3274                fs.clone(),
3275                cx,
3276            )
3277        });
3278        let (worktree_a, _) = project_a
3279            .update(&mut cx_a, |p, cx| {
3280                p.find_or_create_local_worktree("/a", false, cx)
3281            })
3282            .await
3283            .unwrap();
3284        worktree_a
3285            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3286            .await;
3287        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3288        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3289        project_a
3290            .update(&mut cx_a, |p, cx| p.share(cx))
3291            .await
3292            .unwrap();
3293
3294        // Join the worktree as client B.
3295        let project_b = Project::remote(
3296            project_id,
3297            client_b.clone(),
3298            client_b.user_store.clone(),
3299            lang_registry.clone(),
3300            fs.clone(),
3301            &mut cx_b.to_async(),
3302        )
3303        .await
3304        .unwrap();
3305        let mut params = cx_b.update(WorkspaceParams::test);
3306        params.languages = lang_registry.clone();
3307        params.client = client_b.client.clone();
3308        params.user_store = client_b.user_store.clone();
3309        params.project = project_b;
3310        params.path_openers = path_openers_b.into();
3311
3312        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3313        let editor_b = workspace_b
3314            .update(&mut cx_b, |workspace, cx| {
3315                workspace.open_path((worktree_id, "main.rs").into(), cx)
3316            })
3317            .await
3318            .unwrap()
3319            .downcast::<Editor>()
3320            .unwrap();
3321
3322        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3323        fake_language_server
3324            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3325                assert_eq!(
3326                    params.text_document.uri,
3327                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3328                );
3329                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3330                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3331                None
3332            })
3333            .next()
3334            .await;
3335
3336        // Move cursor to a location that contains code actions.
3337        editor_b.update(&mut cx_b, |editor, cx| {
3338            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3339            cx.focus(&editor_b);
3340        });
3341
3342        fake_language_server
3343            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3344                assert_eq!(
3345                    params.text_document.uri,
3346                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3347                );
3348                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3349                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3350
3351                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3352                    lsp::CodeAction {
3353                        title: "Inline into all callers".to_string(),
3354                        edit: Some(lsp::WorkspaceEdit {
3355                            changes: Some(
3356                                [
3357                                    (
3358                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3359                                        vec![lsp::TextEdit::new(
3360                                            lsp::Range::new(
3361                                                lsp::Position::new(1, 22),
3362                                                lsp::Position::new(1, 34),
3363                                            ),
3364                                            "4".to_string(),
3365                                        )],
3366                                    ),
3367                                    (
3368                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3369                                        vec![lsp::TextEdit::new(
3370                                            lsp::Range::new(
3371                                                lsp::Position::new(0, 0),
3372                                                lsp::Position::new(0, 27),
3373                                            ),
3374                                            "".to_string(),
3375                                        )],
3376                                    ),
3377                                ]
3378                                .into_iter()
3379                                .collect(),
3380                            ),
3381                            ..Default::default()
3382                        }),
3383                        data: Some(json!({
3384                            "codeActionParams": {
3385                                "range": {
3386                                    "start": {"line": 1, "column": 31},
3387                                    "end": {"line": 1, "column": 31},
3388                                }
3389                            }
3390                        })),
3391                        ..Default::default()
3392                    },
3393                )])
3394            })
3395            .next()
3396            .await;
3397
3398        // Toggle code actions and wait for them to display.
3399        editor_b.update(&mut cx_b, |editor, cx| {
3400            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3401        });
3402        editor_b
3403            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3404            .await;
3405
3406        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3407
3408        // Confirming the code action will trigger a resolve request.
3409        let confirm_action = workspace_b
3410            .update(&mut cx_b, |workspace, cx| {
3411                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3412            })
3413            .unwrap();
3414        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3415            lsp::CodeAction {
3416                title: "Inline into all callers".to_string(),
3417                edit: Some(lsp::WorkspaceEdit {
3418                    changes: Some(
3419                        [
3420                            (
3421                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3422                                vec![lsp::TextEdit::new(
3423                                    lsp::Range::new(
3424                                        lsp::Position::new(1, 22),
3425                                        lsp::Position::new(1, 34),
3426                                    ),
3427                                    "4".to_string(),
3428                                )],
3429                            ),
3430                            (
3431                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3432                                vec![lsp::TextEdit::new(
3433                                    lsp::Range::new(
3434                                        lsp::Position::new(0, 0),
3435                                        lsp::Position::new(0, 27),
3436                                    ),
3437                                    "".to_string(),
3438                                )],
3439                            ),
3440                        ]
3441                        .into_iter()
3442                        .collect(),
3443                    ),
3444                    ..Default::default()
3445                }),
3446                ..Default::default()
3447            }
3448        });
3449
3450        // After the action is confirmed, an editor containing both modified files is opened.
3451        confirm_action.await.unwrap();
3452        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3453            workspace
3454                .active_item(cx)
3455                .unwrap()
3456                .downcast::<Editor>()
3457                .unwrap()
3458        });
3459        code_action_editor.update(&mut cx_b, |editor, cx| {
3460            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3461            editor.undo(&Undo, cx);
3462            assert_eq!(
3463                editor.text(cx),
3464                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3465            );
3466            editor.redo(&Redo, cx);
3467            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3468        });
3469    }
3470
3471    #[gpui::test(iterations = 10)]
3472    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3473        cx_a.foreground().forbid_parking();
3474        let mut lang_registry = Arc::new(LanguageRegistry::new());
3475        let fs = FakeFs::new(cx_a.background());
3476        let mut path_openers_b = Vec::new();
3477        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3478
3479        // Set up a fake language server.
3480        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3481        Arc::get_mut(&mut lang_registry)
3482            .unwrap()
3483            .add(Arc::new(Language::new(
3484                LanguageConfig {
3485                    name: "Rust".into(),
3486                    path_suffixes: vec!["rs".to_string()],
3487                    language_server: Some(language_server_config),
3488                    ..Default::default()
3489                },
3490                Some(tree_sitter_rust::language()),
3491            )));
3492
3493        // Connect to a server as 2 clients.
3494        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3495        let client_a = server.create_client(&mut cx_a, "user_a").await;
3496        let client_b = server.create_client(&mut cx_b, "user_b").await;
3497
3498        // Share a project as client A
3499        fs.insert_tree(
3500            "/dir",
3501            json!({
3502                ".zed.toml": r#"collaborators = ["user_b"]"#,
3503                "one.rs": "const ONE: usize = 1;",
3504                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3505            }),
3506        )
3507        .await;
3508        let project_a = cx_a.update(|cx| {
3509            Project::local(
3510                client_a.clone(),
3511                client_a.user_store.clone(),
3512                lang_registry.clone(),
3513                fs.clone(),
3514                cx,
3515            )
3516        });
3517        let (worktree_a, _) = project_a
3518            .update(&mut cx_a, |p, cx| {
3519                p.find_or_create_local_worktree("/dir", false, cx)
3520            })
3521            .await
3522            .unwrap();
3523        worktree_a
3524            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3525            .await;
3526        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3527        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3528        project_a
3529            .update(&mut cx_a, |p, cx| p.share(cx))
3530            .await
3531            .unwrap();
3532
3533        // Join the worktree as client B.
3534        let project_b = Project::remote(
3535            project_id,
3536            client_b.clone(),
3537            client_b.user_store.clone(),
3538            lang_registry.clone(),
3539            fs.clone(),
3540            &mut cx_b.to_async(),
3541        )
3542        .await
3543        .unwrap();
3544        let mut params = cx_b.update(WorkspaceParams::test);
3545        params.languages = lang_registry.clone();
3546        params.client = client_b.client.clone();
3547        params.user_store = client_b.user_store.clone();
3548        params.project = project_b;
3549        params.path_openers = path_openers_b.into();
3550
3551        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3552        let editor_b = workspace_b
3553            .update(&mut cx_b, |workspace, cx| {
3554                workspace.open_path((worktree_id, "one.rs").into(), cx)
3555            })
3556            .await
3557            .unwrap()
3558            .downcast::<Editor>()
3559            .unwrap();
3560        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3561
3562        // Move cursor to a location that can be renamed.
3563        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3564            editor.select_ranges([7..7], None, cx);
3565            editor.rename(&Rename, cx).unwrap()
3566        });
3567
3568        fake_language_server
3569            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3570                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3571                assert_eq!(params.position, lsp::Position::new(0, 7));
3572                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3573                    lsp::Position::new(0, 6),
3574                    lsp::Position::new(0, 9),
3575                )))
3576            })
3577            .next()
3578            .await
3579            .unwrap();
3580        prepare_rename.await.unwrap();
3581        editor_b.update(&mut cx_b, |editor, cx| {
3582            let rename = editor.pending_rename().unwrap();
3583            let buffer = editor.buffer().read(cx).snapshot(cx);
3584            assert_eq!(
3585                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3586                6..9
3587            );
3588            rename.editor.update(cx, |rename_editor, cx| {
3589                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3590                    rename_buffer.edit([0..3], "THREE", cx);
3591                });
3592            });
3593        });
3594
3595        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3596            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3597        });
3598        fake_language_server
3599            .handle_request::<lsp::request::Rename, _>(|params, _| {
3600                assert_eq!(
3601                    params.text_document_position.text_document.uri.as_str(),
3602                    "file:///dir/one.rs"
3603                );
3604                assert_eq!(
3605                    params.text_document_position.position,
3606                    lsp::Position::new(0, 6)
3607                );
3608                assert_eq!(params.new_name, "THREE");
3609                Some(lsp::WorkspaceEdit {
3610                    changes: Some(
3611                        [
3612                            (
3613                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3614                                vec![lsp::TextEdit::new(
3615                                    lsp::Range::new(
3616                                        lsp::Position::new(0, 6),
3617                                        lsp::Position::new(0, 9),
3618                                    ),
3619                                    "THREE".to_string(),
3620                                )],
3621                            ),
3622                            (
3623                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3624                                vec![
3625                                    lsp::TextEdit::new(
3626                                        lsp::Range::new(
3627                                            lsp::Position::new(0, 24),
3628                                            lsp::Position::new(0, 27),
3629                                        ),
3630                                        "THREE".to_string(),
3631                                    ),
3632                                    lsp::TextEdit::new(
3633                                        lsp::Range::new(
3634                                            lsp::Position::new(0, 35),
3635                                            lsp::Position::new(0, 38),
3636                                        ),
3637                                        "THREE".to_string(),
3638                                    ),
3639                                ],
3640                            ),
3641                        ]
3642                        .into_iter()
3643                        .collect(),
3644                    ),
3645                    ..Default::default()
3646                })
3647            })
3648            .next()
3649            .await
3650            .unwrap();
3651        confirm_rename.await.unwrap();
3652
3653        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3654            workspace
3655                .active_item(cx)
3656                .unwrap()
3657                .downcast::<Editor>()
3658                .unwrap()
3659        });
3660        rename_editor.update(&mut cx_b, |editor, cx| {
3661            assert_eq!(
3662                editor.text(cx),
3663                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3664            );
3665            editor.undo(&Undo, cx);
3666            assert_eq!(
3667                editor.text(cx),
3668                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3669            );
3670            editor.redo(&Redo, cx);
3671            assert_eq!(
3672                editor.text(cx),
3673                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3674            );
3675        });
3676
3677        // Ensure temporary rename edits cannot be undone/redone.
3678        editor_b.update(&mut cx_b, |editor, cx| {
3679            editor.undo(&Undo, cx);
3680            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3681            editor.undo(&Undo, cx);
3682            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3683            editor.redo(&Redo, cx);
3684            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3685        })
3686    }
3687
3688    #[gpui::test(iterations = 10)]
3689    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3690        cx_a.foreground().forbid_parking();
3691
3692        // Connect to a server as 2 clients.
3693        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3694        let client_a = server.create_client(&mut cx_a, "user_a").await;
3695        let client_b = server.create_client(&mut cx_b, "user_b").await;
3696
3697        // Create an org that includes these 2 users.
3698        let db = &server.app_state.db;
3699        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3700        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3701            .await
3702            .unwrap();
3703        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3704            .await
3705            .unwrap();
3706
3707        // Create a channel that includes all the users.
3708        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3709        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3710            .await
3711            .unwrap();
3712        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3713            .await
3714            .unwrap();
3715        db.create_channel_message(
3716            channel_id,
3717            client_b.current_user_id(&cx_b),
3718            "hello A, it's B.",
3719            OffsetDateTime::now_utc(),
3720            1,
3721        )
3722        .await
3723        .unwrap();
3724
3725        let channels_a = cx_a
3726            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3727        channels_a
3728            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3729            .await;
3730        channels_a.read_with(&cx_a, |list, _| {
3731            assert_eq!(
3732                list.available_channels().unwrap(),
3733                &[ChannelDetails {
3734                    id: channel_id.to_proto(),
3735                    name: "test-channel".to_string()
3736                }]
3737            )
3738        });
3739        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3740            this.get_channel(channel_id.to_proto(), cx).unwrap()
3741        });
3742        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3743        channel_a
3744            .condition(&cx_a, |channel, _| {
3745                channel_messages(channel)
3746                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3747            })
3748            .await;
3749
3750        let channels_b = cx_b
3751            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3752        channels_b
3753            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3754            .await;
3755        channels_b.read_with(&cx_b, |list, _| {
3756            assert_eq!(
3757                list.available_channels().unwrap(),
3758                &[ChannelDetails {
3759                    id: channel_id.to_proto(),
3760                    name: "test-channel".to_string()
3761                }]
3762            )
3763        });
3764
3765        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3766            this.get_channel(channel_id.to_proto(), cx).unwrap()
3767        });
3768        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3769        channel_b
3770            .condition(&cx_b, |channel, _| {
3771                channel_messages(channel)
3772                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3773            })
3774            .await;
3775
3776        channel_a
3777            .update(&mut cx_a, |channel, cx| {
3778                channel
3779                    .send_message("oh, hi B.".to_string(), cx)
3780                    .unwrap()
3781                    .detach();
3782                let task = channel.send_message("sup".to_string(), cx).unwrap();
3783                assert_eq!(
3784                    channel_messages(channel),
3785                    &[
3786                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3787                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3788                        ("user_a".to_string(), "sup".to_string(), true)
3789                    ]
3790                );
3791                task
3792            })
3793            .await
3794            .unwrap();
3795
3796        channel_b
3797            .condition(&cx_b, |channel, _| {
3798                channel_messages(channel)
3799                    == [
3800                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3801                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3802                        ("user_a".to_string(), "sup".to_string(), false),
3803                    ]
3804            })
3805            .await;
3806
3807        assert_eq!(
3808            server
3809                .state()
3810                .await
3811                .channel(channel_id)
3812                .unwrap()
3813                .connection_ids
3814                .len(),
3815            2
3816        );
3817        cx_b.update(|_| drop(channel_b));
3818        server
3819            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3820            .await;
3821
3822        cx_a.update(|_| drop(channel_a));
3823        server
3824            .condition(|state| state.channel(channel_id).is_none())
3825            .await;
3826    }
3827
3828    #[gpui::test(iterations = 10)]
3829    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3830        cx_a.foreground().forbid_parking();
3831
3832        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3833        let client_a = server.create_client(&mut cx_a, "user_a").await;
3834
3835        let db = &server.app_state.db;
3836        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3837        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3838        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3839            .await
3840            .unwrap();
3841        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3842            .await
3843            .unwrap();
3844
3845        let channels_a = cx_a
3846            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3847        channels_a
3848            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3849            .await;
3850        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3851            this.get_channel(channel_id.to_proto(), cx).unwrap()
3852        });
3853
3854        // Messages aren't allowed to be too long.
3855        channel_a
3856            .update(&mut cx_a, |channel, cx| {
3857                let long_body = "this is long.\n".repeat(1024);
3858                channel.send_message(long_body, cx).unwrap()
3859            })
3860            .await
3861            .unwrap_err();
3862
3863        // Messages aren't allowed to be blank.
3864        channel_a.update(&mut cx_a, |channel, cx| {
3865            channel.send_message(String::new(), cx).unwrap_err()
3866        });
3867
3868        // Leading and trailing whitespace are trimmed.
3869        channel_a
3870            .update(&mut cx_a, |channel, cx| {
3871                channel
3872                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3873                    .unwrap()
3874            })
3875            .await
3876            .unwrap();
3877        assert_eq!(
3878            db.get_channel_messages(channel_id, 10, None)
3879                .await
3880                .unwrap()
3881                .iter()
3882                .map(|m| &m.body)
3883                .collect::<Vec<_>>(),
3884            &["surrounded by whitespace"]
3885        );
3886    }
3887
3888    #[gpui::test(iterations = 10)]
3889    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3890        cx_a.foreground().forbid_parking();
3891
3892        // Connect to a server as 2 clients.
3893        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3894        let client_a = server.create_client(&mut cx_a, "user_a").await;
3895        let client_b = server.create_client(&mut cx_b, "user_b").await;
3896        let mut status_b = client_b.status();
3897
3898        // Create an org that includes these 2 users.
3899        let db = &server.app_state.db;
3900        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3901        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3902            .await
3903            .unwrap();
3904        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3905            .await
3906            .unwrap();
3907
3908        // Create a channel that includes all the users.
3909        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3910        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3911            .await
3912            .unwrap();
3913        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3914            .await
3915            .unwrap();
3916        db.create_channel_message(
3917            channel_id,
3918            client_b.current_user_id(&cx_b),
3919            "hello A, it's B.",
3920            OffsetDateTime::now_utc(),
3921            2,
3922        )
3923        .await
3924        .unwrap();
3925
3926        let channels_a = cx_a
3927            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3928        channels_a
3929            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3930            .await;
3931
3932        channels_a.read_with(&cx_a, |list, _| {
3933            assert_eq!(
3934                list.available_channels().unwrap(),
3935                &[ChannelDetails {
3936                    id: channel_id.to_proto(),
3937                    name: "test-channel".to_string()
3938                }]
3939            )
3940        });
3941        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3942            this.get_channel(channel_id.to_proto(), cx).unwrap()
3943        });
3944        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3945        channel_a
3946            .condition(&cx_a, |channel, _| {
3947                channel_messages(channel)
3948                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3949            })
3950            .await;
3951
3952        let channels_b = cx_b
3953            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3954        channels_b
3955            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3956            .await;
3957        channels_b.read_with(&cx_b, |list, _| {
3958            assert_eq!(
3959                list.available_channels().unwrap(),
3960                &[ChannelDetails {
3961                    id: channel_id.to_proto(),
3962                    name: "test-channel".to_string()
3963                }]
3964            )
3965        });
3966
3967        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3968            this.get_channel(channel_id.to_proto(), cx).unwrap()
3969        });
3970        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3971        channel_b
3972            .condition(&cx_b, |channel, _| {
3973                channel_messages(channel)
3974                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3975            })
3976            .await;
3977
3978        // Disconnect client B, ensuring we can still access its cached channel data.
3979        server.forbid_connections();
3980        server.disconnect_client(client_b.current_user_id(&cx_b));
3981        while !matches!(
3982            status_b.next().await,
3983            Some(client::Status::ReconnectionError { .. })
3984        ) {}
3985
3986        channels_b.read_with(&cx_b, |channels, _| {
3987            assert_eq!(
3988                channels.available_channels().unwrap(),
3989                [ChannelDetails {
3990                    id: channel_id.to_proto(),
3991                    name: "test-channel".to_string()
3992                }]
3993            )
3994        });
3995        channel_b.read_with(&cx_b, |channel, _| {
3996            assert_eq!(
3997                channel_messages(channel),
3998                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3999            )
4000        });
4001
4002        // Send a message from client B while it is disconnected.
4003        channel_b
4004            .update(&mut cx_b, |channel, cx| {
4005                let task = channel
4006                    .send_message("can you see this?".to_string(), cx)
4007                    .unwrap();
4008                assert_eq!(
4009                    channel_messages(channel),
4010                    &[
4011                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4012                        ("user_b".to_string(), "can you see this?".to_string(), true)
4013                    ]
4014                );
4015                task
4016            })
4017            .await
4018            .unwrap_err();
4019
4020        // Send a message from client A while B is disconnected.
4021        channel_a
4022            .update(&mut cx_a, |channel, cx| {
4023                channel
4024                    .send_message("oh, hi B.".to_string(), cx)
4025                    .unwrap()
4026                    .detach();
4027                let task = channel.send_message("sup".to_string(), cx).unwrap();
4028                assert_eq!(
4029                    channel_messages(channel),
4030                    &[
4031                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4032                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4033                        ("user_a".to_string(), "sup".to_string(), true)
4034                    ]
4035                );
4036                task
4037            })
4038            .await
4039            .unwrap();
4040
4041        // Give client B a chance to reconnect.
4042        server.allow_connections();
4043        cx_b.foreground().advance_clock(Duration::from_secs(10));
4044
4045        // Verify that B sees the new messages upon reconnection, as well as the message client B
4046        // sent while offline.
4047        channel_b
4048            .condition(&cx_b, |channel, _| {
4049                channel_messages(channel)
4050                    == [
4051                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4052                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4053                        ("user_a".to_string(), "sup".to_string(), false),
4054                        ("user_b".to_string(), "can you see this?".to_string(), false),
4055                    ]
4056            })
4057            .await;
4058
4059        // Ensure client A and B can communicate normally after reconnection.
4060        channel_a
4061            .update(&mut cx_a, |channel, cx| {
4062                channel.send_message("you online?".to_string(), cx).unwrap()
4063            })
4064            .await
4065            .unwrap();
4066        channel_b
4067            .condition(&cx_b, |channel, _| {
4068                channel_messages(channel)
4069                    == [
4070                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4071                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4072                        ("user_a".to_string(), "sup".to_string(), false),
4073                        ("user_b".to_string(), "can you see this?".to_string(), false),
4074                        ("user_a".to_string(), "you online?".to_string(), false),
4075                    ]
4076            })
4077            .await;
4078
4079        channel_b
4080            .update(&mut cx_b, |channel, cx| {
4081                channel.send_message("yep".to_string(), cx).unwrap()
4082            })
4083            .await
4084            .unwrap();
4085        channel_a
4086            .condition(&cx_a, |channel, _| {
4087                channel_messages(channel)
4088                    == [
4089                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4090                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4091                        ("user_a".to_string(), "sup".to_string(), false),
4092                        ("user_b".to_string(), "can you see this?".to_string(), false),
4093                        ("user_a".to_string(), "you online?".to_string(), false),
4094                        ("user_b".to_string(), "yep".to_string(), false),
4095                    ]
4096            })
4097            .await;
4098    }
4099
4100    #[gpui::test(iterations = 10)]
4101    async fn test_contacts(
4102        mut cx_a: TestAppContext,
4103        mut cx_b: TestAppContext,
4104        mut cx_c: TestAppContext,
4105    ) {
4106        cx_a.foreground().forbid_parking();
4107        let lang_registry = Arc::new(LanguageRegistry::new());
4108        let fs = FakeFs::new(cx_a.background());
4109
4110        // Connect to a server as 3 clients.
4111        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4112        let client_a = server.create_client(&mut cx_a, "user_a").await;
4113        let client_b = server.create_client(&mut cx_b, "user_b").await;
4114        let client_c = server.create_client(&mut cx_c, "user_c").await;
4115
4116        // Share a worktree as client A.
4117        fs.insert_tree(
4118            "/a",
4119            json!({
4120                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4121            }),
4122        )
4123        .await;
4124
4125        let project_a = cx_a.update(|cx| {
4126            Project::local(
4127                client_a.clone(),
4128                client_a.user_store.clone(),
4129                lang_registry.clone(),
4130                fs.clone(),
4131                cx,
4132            )
4133        });
4134        let (worktree_a, _) = project_a
4135            .update(&mut cx_a, |p, cx| {
4136                p.find_or_create_local_worktree("/a", false, cx)
4137            })
4138            .await
4139            .unwrap();
4140        worktree_a
4141            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4142            .await;
4143
4144        client_a
4145            .user_store
4146            .condition(&cx_a, |user_store, _| {
4147                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4148            })
4149            .await;
4150        client_b
4151            .user_store
4152            .condition(&cx_b, |user_store, _| {
4153                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4154            })
4155            .await;
4156        client_c
4157            .user_store
4158            .condition(&cx_c, |user_store, _| {
4159                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4160            })
4161            .await;
4162
4163        let project_id = project_a
4164            .update(&mut cx_a, |project, _| project.next_remote_id())
4165            .await;
4166        project_a
4167            .update(&mut cx_a, |project, cx| project.share(cx))
4168            .await
4169            .unwrap();
4170
4171        let _project_b = Project::remote(
4172            project_id,
4173            client_b.clone(),
4174            client_b.user_store.clone(),
4175            lang_registry.clone(),
4176            fs.clone(),
4177            &mut cx_b.to_async(),
4178        )
4179        .await
4180        .unwrap();
4181
4182        client_a
4183            .user_store
4184            .condition(&cx_a, |user_store, _| {
4185                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4186            })
4187            .await;
4188        client_b
4189            .user_store
4190            .condition(&cx_b, |user_store, _| {
4191                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4192            })
4193            .await;
4194        client_c
4195            .user_store
4196            .condition(&cx_c, |user_store, _| {
4197                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4198            })
4199            .await;
4200
4201        project_a
4202            .condition(&cx_a, |project, _| {
4203                project.collaborators().contains_key(&client_b.peer_id)
4204            })
4205            .await;
4206
4207        cx_a.update(move |_| drop(project_a));
4208        client_a
4209            .user_store
4210            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4211            .await;
4212        client_b
4213            .user_store
4214            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4215            .await;
4216        client_c
4217            .user_store
4218            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4219            .await;
4220
4221        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4222            user_store
4223                .contacts()
4224                .iter()
4225                .map(|contact| {
4226                    let worktrees = contact
4227                        .projects
4228                        .iter()
4229                        .map(|p| {
4230                            (
4231                                p.worktree_root_names[0].as_str(),
4232                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4233                            )
4234                        })
4235                        .collect();
4236                    (contact.user.github_login.as_str(), worktrees)
4237                })
4238                .collect()
4239        }
4240    }
4241
4242    #[gpui::test(iterations = 100)]
4243    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4244        cx.foreground().forbid_parking();
4245        let max_peers = env::var("MAX_PEERS")
4246            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4247            .unwrap_or(5);
4248        let max_operations = env::var("OPERATIONS")
4249            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4250            .unwrap_or(10);
4251
4252        let rng = Arc::new(Mutex::new(rng));
4253
4254        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4255        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4256
4257        let fs = FakeFs::new(cx.background());
4258        fs.insert_tree(
4259            "/_collab",
4260            json!({
4261                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4262            }),
4263        )
4264        .await;
4265
4266        let operations = Rc::new(Cell::new(0));
4267        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4268        let mut clients = Vec::new();
4269
4270        let mut next_entity_id = 100000;
4271        let mut host_cx = TestAppContext::new(
4272            cx.foreground_platform(),
4273            cx.platform(),
4274            cx.foreground(),
4275            cx.background(),
4276            cx.font_cache(),
4277            next_entity_id,
4278        );
4279        let host = server.create_client(&mut host_cx, "host").await;
4280        let host_project = host_cx.update(|cx| {
4281            Project::local(
4282                host.client.clone(),
4283                host.user_store.clone(),
4284                Arc::new(LanguageRegistry::new()),
4285                fs.clone(),
4286                cx,
4287            )
4288        });
4289        let host_project_id = host_project
4290            .update(&mut host_cx, |p, _| p.next_remote_id())
4291            .await;
4292
4293        let (collab_worktree, _) = host_project
4294            .update(&mut host_cx, |project, cx| {
4295                project.find_or_create_local_worktree("/_collab", false, cx)
4296            })
4297            .await
4298            .unwrap();
4299        collab_worktree
4300            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4301            .await;
4302        host_project
4303            .update(&mut host_cx, |project, cx| project.share(cx))
4304            .await
4305            .unwrap();
4306
4307        clients.push(cx.foreground().spawn(host.simulate_host(
4308            host_project.clone(),
4309            language_server_config,
4310            operations.clone(),
4311            max_operations,
4312            rng.clone(),
4313            host_cx.clone(),
4314        )));
4315
4316        while operations.get() < max_operations {
4317            cx.background().simulate_random_delay().await;
4318            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4319                operations.set(operations.get() + 1);
4320
4321                let guest_id = clients.len();
4322                log::info!("Adding guest {}", guest_id);
4323                next_entity_id += 100000;
4324                let mut guest_cx = TestAppContext::new(
4325                    cx.foreground_platform(),
4326                    cx.platform(),
4327                    cx.foreground(),
4328                    cx.background(),
4329                    cx.font_cache(),
4330                    next_entity_id,
4331                );
4332                let guest = server
4333                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4334                    .await;
4335                let guest_project = Project::remote(
4336                    host_project_id,
4337                    guest.client.clone(),
4338                    guest.user_store.clone(),
4339                    guest_lang_registry.clone(),
4340                    fs.clone(),
4341                    &mut guest_cx.to_async(),
4342                )
4343                .await
4344                .unwrap();
4345                clients.push(cx.foreground().spawn(guest.simulate_guest(
4346                    guest_id,
4347                    guest_project,
4348                    operations.clone(),
4349                    max_operations,
4350                    rng.clone(),
4351                    guest_cx,
4352                )));
4353
4354                log::info!("Guest {} added", guest_id);
4355            }
4356        }
4357
4358        let clients = futures::future::join_all(clients).await;
4359        cx.foreground().run_until_parked();
4360
4361        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4362            project
4363                .worktrees(cx)
4364                .map(|worktree| {
4365                    let snapshot = worktree.read(cx).snapshot();
4366                    (snapshot.id(), snapshot)
4367                })
4368                .collect::<BTreeMap<_, _>>()
4369        });
4370
4371        for (guest_client, guest_cx) in clients.iter().skip(1) {
4372            let guest_id = guest_client.client.id();
4373            let worktree_snapshots =
4374                guest_client
4375                    .project
4376                    .as_ref()
4377                    .unwrap()
4378                    .read_with(guest_cx, |project, cx| {
4379                        project
4380                            .worktrees(cx)
4381                            .map(|worktree| {
4382                                let worktree = worktree.read(cx);
4383                                assert!(
4384                                    !worktree.as_remote().unwrap().has_pending_updates(),
4385                                    "Guest {} worktree {:?} contains deferred updates",
4386                                    guest_id,
4387                                    worktree.id()
4388                                );
4389                                (worktree.id(), worktree.snapshot())
4390                            })
4391                            .collect::<BTreeMap<_, _>>()
4392                    });
4393
4394            assert_eq!(
4395                worktree_snapshots.keys().collect::<Vec<_>>(),
4396                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4397                "guest {} has different worktrees than the host",
4398                guest_id
4399            );
4400            for (id, host_snapshot) in &host_worktree_snapshots {
4401                let guest_snapshot = &worktree_snapshots[id];
4402                assert_eq!(
4403                    guest_snapshot.root_name(),
4404                    host_snapshot.root_name(),
4405                    "guest {} has different root name than the host for worktree {}",
4406                    guest_id,
4407                    id
4408                );
4409                assert_eq!(
4410                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4411                    host_snapshot.entries(false).collect::<Vec<_>>(),
4412                    "guest {} has different snapshot than the host for worktree {}",
4413                    guest_id,
4414                    id
4415                );
4416            }
4417
4418            guest_client
4419                .project
4420                .as_ref()
4421                .unwrap()
4422                .read_with(guest_cx, |project, _| {
4423                    assert!(
4424                        !project.has_buffered_operations(),
4425                        "guest {} has buffered operations ",
4426                        guest_id,
4427                    );
4428                });
4429
4430            for guest_buffer in &guest_client.buffers {
4431                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4432                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4433                    project
4434                        .shared_buffer(guest_client.peer_id, buffer_id)
4435                        .expect(&format!(
4436                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4437                            guest_id, guest_client.peer_id, buffer_id
4438                        ))
4439                });
4440                assert_eq!(
4441                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4442                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4443                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4444                    guest_id,
4445                    buffer_id,
4446                    host_buffer
4447                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4448                );
4449            }
4450        }
4451    }
4452
4453    struct TestServer {
4454        peer: Arc<Peer>,
4455        app_state: Arc<AppState>,
4456        server: Arc<Server>,
4457        foreground: Rc<executor::Foreground>,
4458        notifications: mpsc::UnboundedReceiver<()>,
4459        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4460        forbid_connections: Arc<AtomicBool>,
4461        _test_db: TestDb,
4462    }
4463
4464    impl TestServer {
4465        async fn start(
4466            foreground: Rc<executor::Foreground>,
4467            background: Arc<executor::Background>,
4468        ) -> Self {
4469            let test_db = TestDb::fake(background);
4470            let app_state = Self::build_app_state(&test_db).await;
4471            let peer = Peer::new();
4472            let notifications = mpsc::unbounded();
4473            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4474            Self {
4475                peer,
4476                app_state,
4477                server,
4478                foreground,
4479                notifications: notifications.1,
4480                connection_killers: Default::default(),
4481                forbid_connections: Default::default(),
4482                _test_db: test_db,
4483            }
4484        }
4485
4486        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4487            let http = FakeHttpClient::with_404_response();
4488            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4489            let client_name = name.to_string();
4490            let mut client = Client::new(http.clone());
4491            let server = self.server.clone();
4492            let connection_killers = self.connection_killers.clone();
4493            let forbid_connections = self.forbid_connections.clone();
4494            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4495
4496            Arc::get_mut(&mut client)
4497                .unwrap()
4498                .override_authenticate(move |cx| {
4499                    cx.spawn(|_| async move {
4500                        let access_token = "the-token".to_string();
4501                        Ok(Credentials {
4502                            user_id: user_id.0 as u64,
4503                            access_token,
4504                        })
4505                    })
4506                })
4507                .override_establish_connection(move |credentials, cx| {
4508                    assert_eq!(credentials.user_id, user_id.0 as u64);
4509                    assert_eq!(credentials.access_token, "the-token");
4510
4511                    let server = server.clone();
4512                    let connection_killers = connection_killers.clone();
4513                    let forbid_connections = forbid_connections.clone();
4514                    let client_name = client_name.clone();
4515                    let connection_id_tx = connection_id_tx.clone();
4516                    cx.spawn(move |cx| async move {
4517                        if forbid_connections.load(SeqCst) {
4518                            Err(EstablishConnectionError::other(anyhow!(
4519                                "server is forbidding connections"
4520                            )))
4521                        } else {
4522                            let (client_conn, server_conn, kill_conn) =
4523                                Connection::in_memory(cx.background());
4524                            connection_killers.lock().insert(user_id, kill_conn);
4525                            cx.background()
4526                                .spawn(server.handle_connection(
4527                                    server_conn,
4528                                    client_name,
4529                                    user_id,
4530                                    Some(connection_id_tx),
4531                                    cx.background(),
4532                                ))
4533                                .detach();
4534                            Ok(client_conn)
4535                        }
4536                    })
4537                });
4538
4539            client
4540                .authenticate_and_connect(&cx.to_async())
4541                .await
4542                .unwrap();
4543
4544            Channel::init(&client);
4545            Project::init(&client);
4546
4547            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4548            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4549            let mut authed_user =
4550                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4551            while authed_user.next().await.unwrap().is_none() {}
4552
4553            TestClient {
4554                client,
4555                peer_id,
4556                user_store,
4557                project: Default::default(),
4558                buffers: Default::default(),
4559            }
4560        }
4561
4562        fn disconnect_client(&self, user_id: UserId) {
4563            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4564                let _ = kill_conn.try_send(Some(()));
4565            }
4566        }
4567
4568        fn forbid_connections(&self) {
4569            self.forbid_connections.store(true, SeqCst);
4570        }
4571
4572        fn allow_connections(&self) {
4573            self.forbid_connections.store(false, SeqCst);
4574        }
4575
4576        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4577            let mut config = Config::default();
4578            config.session_secret = "a".repeat(32);
4579            config.database_url = test_db.url.clone();
4580            let github_client = github::AppClient::test();
4581            Arc::new(AppState {
4582                db: test_db.db().clone(),
4583                handlebars: Default::default(),
4584                auth_client: auth::build_client("", ""),
4585                repo_client: github::RepoClient::test(&github_client),
4586                github_client,
4587                config,
4588            })
4589        }
4590
4591        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4592            self.server.store.read()
4593        }
4594
4595        async fn condition<F>(&mut self, mut predicate: F)
4596        where
4597            F: FnMut(&Store) -> bool,
4598        {
4599            async_std::future::timeout(Duration::from_millis(500), async {
4600                while !(predicate)(&*self.server.store.read()) {
4601                    self.foreground.start_waiting();
4602                    self.notifications.next().await;
4603                    self.foreground.finish_waiting();
4604                }
4605            })
4606            .await
4607            .expect("condition timed out");
4608        }
4609    }
4610
4611    impl Drop for TestServer {
4612        fn drop(&mut self) {
4613            self.peer.reset();
4614        }
4615    }
4616
4617    struct TestClient {
4618        client: Arc<Client>,
4619        pub peer_id: PeerId,
4620        pub user_store: ModelHandle<UserStore>,
4621        project: Option<ModelHandle<Project>>,
4622        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4623    }
4624
4625    impl Deref for TestClient {
4626        type Target = Arc<Client>;
4627
4628        fn deref(&self) -> &Self::Target {
4629            &self.client
4630        }
4631    }
4632
4633    impl TestClient {
4634        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4635            UserId::from_proto(
4636                self.user_store
4637                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4638            )
4639        }
4640
4641        fn simulate_host(
4642            mut self,
4643            project: ModelHandle<Project>,
4644            mut language_server_config: LanguageServerConfig,
4645            operations: Rc<Cell<usize>>,
4646            max_operations: usize,
4647            rng: Arc<Mutex<StdRng>>,
4648            mut cx: TestAppContext,
4649        ) -> impl Future<Output = (Self, TestAppContext)> {
4650            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4651
4652            // Set up a fake language server.
4653            language_server_config.set_fake_initializer({
4654                let rng = rng.clone();
4655                let files = files.clone();
4656                let project = project.clone();
4657                move |fake_server| {
4658                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4659                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4660                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4661                                range: lsp::Range::new(
4662                                    lsp::Position::new(0, 0),
4663                                    lsp::Position::new(0, 0),
4664                                ),
4665                                new_text: "the-new-text".to_string(),
4666                            })),
4667                            ..Default::default()
4668                        }]))
4669                    });
4670
4671                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4672                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4673                            lsp::CodeAction {
4674                                title: "the-code-action".to_string(),
4675                                ..Default::default()
4676                            },
4677                        )])
4678                    });
4679
4680                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4681                        |params, _| {
4682                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4683                                params.position,
4684                                params.position,
4685                            )))
4686                        },
4687                    );
4688
4689                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4690                        let files = files.clone();
4691                        let rng = rng.clone();
4692                        move |_, _| {
4693                            let files = files.lock();
4694                            let mut rng = rng.lock();
4695                            let count = rng.gen_range::<usize, _>(1..3);
4696                            let files = (0..count)
4697                                .map(|_| files.choose(&mut *rng).unwrap())
4698                                .collect::<Vec<_>>();
4699                            log::info!("LSP: Returning definitions in files {:?}", &files);
4700                            Some(lsp::GotoDefinitionResponse::Array(
4701                                files
4702                                    .into_iter()
4703                                    .map(|file| lsp::Location {
4704                                        uri: lsp::Url::from_file_path(file).unwrap(),
4705                                        range: Default::default(),
4706                                    })
4707                                    .collect(),
4708                            ))
4709                        }
4710                    });
4711
4712                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4713                        let rng = rng.clone();
4714                        let project = project.clone();
4715                        move |params, mut cx| {
4716                            project.update(&mut cx, |project, cx| {
4717                                let path = params
4718                                    .text_document_position_params
4719                                    .text_document
4720                                    .uri
4721                                    .to_file_path()
4722                                    .unwrap();
4723                                let (worktree, relative_path) =
4724                                    project.find_local_worktree(&path, cx)?;
4725                                let project_path =
4726                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4727                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4728
4729                                let mut highlights = Vec::new();
4730                                let highlight_count = rng.lock().gen_range(1..=5);
4731                                let mut prev_end = 0;
4732                                for _ in 0..highlight_count {
4733                                    let range =
4734                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4735                                    let start =
4736                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4737                                    let end =
4738                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4739                                    highlights.push(lsp::DocumentHighlight {
4740                                        range: lsp::Range::new(start, end),
4741                                        kind: Some(lsp::DocumentHighlightKind::READ),
4742                                    });
4743                                    prev_end = range.end;
4744                                }
4745                                Some(highlights)
4746                            })
4747                        }
4748                    });
4749                }
4750            });
4751
4752            project.update(&mut cx, |project, _| {
4753                project.languages().add(Arc::new(Language::new(
4754                    LanguageConfig {
4755                        name: "Rust".into(),
4756                        path_suffixes: vec!["rs".to_string()],
4757                        language_server: Some(language_server_config),
4758                        ..Default::default()
4759                    },
4760                    None,
4761                )));
4762            });
4763
4764            async move {
4765                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4766                while operations.get() < max_operations {
4767                    operations.set(operations.get() + 1);
4768
4769                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4770                    match distribution {
4771                        0..=20 if !files.lock().is_empty() => {
4772                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4773                            let mut path = path.as_path();
4774                            while let Some(parent_path) = path.parent() {
4775                                path = parent_path;
4776                                if rng.lock().gen() {
4777                                    break;
4778                                }
4779                            }
4780
4781                            log::info!("Host: find/create local worktree {:?}", path);
4782                            project
4783                                .update(&mut cx, |project, cx| {
4784                                    project.find_or_create_local_worktree(path, false, cx)
4785                                })
4786                                .await
4787                                .unwrap();
4788                        }
4789                        10..=80 if !files.lock().is_empty() => {
4790                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4791                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4792                                let (worktree, path) = project
4793                                    .update(&mut cx, |project, cx| {
4794                                        project.find_or_create_local_worktree(
4795                                            file.clone(),
4796                                            false,
4797                                            cx,
4798                                        )
4799                                    })
4800                                    .await
4801                                    .unwrap();
4802                                let project_path =
4803                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4804                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
4805                                let buffer = project
4806                                    .update(&mut cx, |project, cx| {
4807                                        project.open_buffer(project_path, cx)
4808                                    })
4809                                    .await
4810                                    .unwrap();
4811                                self.buffers.insert(buffer.clone());
4812                                buffer
4813                            } else {
4814                                self.buffers
4815                                    .iter()
4816                                    .choose(&mut *rng.lock())
4817                                    .unwrap()
4818                                    .clone()
4819                            };
4820
4821                            if rng.lock().gen_bool(0.1) {
4822                                cx.update(|cx| {
4823                                    log::info!(
4824                                        "Host: dropping buffer {:?}",
4825                                        buffer.read(cx).file().unwrap().full_path(cx)
4826                                    );
4827                                    self.buffers.remove(&buffer);
4828                                    drop(buffer);
4829                                });
4830                            } else {
4831                                buffer.update(&mut cx, |buffer, cx| {
4832                                    log::info!(
4833                                        "Host: updating buffer {:?}",
4834                                        buffer.file().unwrap().full_path(cx)
4835                                    );
4836                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4837                                });
4838                            }
4839                        }
4840                        _ => loop {
4841                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4842                            let mut path = PathBuf::new();
4843                            path.push("/");
4844                            for _ in 0..path_component_count {
4845                                let letter = rng.lock().gen_range(b'a'..=b'z');
4846                                path.push(std::str::from_utf8(&[letter]).unwrap());
4847                            }
4848                            path.set_extension("rs");
4849                            let parent_path = path.parent().unwrap();
4850
4851                            log::info!("Host: creating file {:?}", path,);
4852
4853                            if fs.create_dir(&parent_path).await.is_ok()
4854                                && fs.create_file(&path, Default::default()).await.is_ok()
4855                            {
4856                                files.lock().push(path);
4857                                break;
4858                            } else {
4859                                log::info!("Host: cannot create file");
4860                            }
4861                        },
4862                    }
4863
4864                    cx.background().simulate_random_delay().await;
4865                }
4866
4867                log::info!("Host done");
4868
4869                self.project = Some(project);
4870                (self, cx)
4871            }
4872        }
4873
4874        pub async fn simulate_guest(
4875            mut self,
4876            guest_id: usize,
4877            project: ModelHandle<Project>,
4878            operations: Rc<Cell<usize>>,
4879            max_operations: usize,
4880            rng: Arc<Mutex<StdRng>>,
4881            mut cx: TestAppContext,
4882        ) -> (Self, TestAppContext) {
4883            while operations.get() < max_operations {
4884                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4885                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4886                        project
4887                            .worktrees(&cx)
4888                            .filter(|worktree| {
4889                                worktree.read(cx).entries(false).any(|e| e.is_file())
4890                            })
4891                            .choose(&mut *rng.lock())
4892                    }) {
4893                        worktree
4894                    } else {
4895                        cx.background().simulate_random_delay().await;
4896                        continue;
4897                    };
4898
4899                    operations.set(operations.get() + 1);
4900                    let (worktree_root_name, project_path) =
4901                        worktree.read_with(&cx, |worktree, _| {
4902                            let entry = worktree
4903                                .entries(false)
4904                                .filter(|e| e.is_file())
4905                                .choose(&mut *rng.lock())
4906                                .unwrap();
4907                            (
4908                                worktree.root_name().to_string(),
4909                                (worktree.id(), entry.path.clone()),
4910                            )
4911                        });
4912                    log::info!(
4913                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
4914                        guest_id,
4915                        project_path.0,
4916                        worktree_root_name,
4917                        project_path.1
4918                    );
4919                    let buffer = project
4920                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4921                        .await
4922                        .unwrap();
4923                    self.buffers.insert(buffer.clone());
4924                    buffer
4925                } else {
4926                    operations.set(operations.get() + 1);
4927
4928                    self.buffers
4929                        .iter()
4930                        .choose(&mut *rng.lock())
4931                        .unwrap()
4932                        .clone()
4933                };
4934
4935                let choice = rng.lock().gen_range(0..100);
4936                match choice {
4937                    0..=9 => {
4938                        cx.update(|cx| {
4939                            log::info!(
4940                                "Guest {}: dropping buffer {:?}",
4941                                guest_id,
4942                                buffer.read(cx).file().unwrap().full_path(cx)
4943                            );
4944                            self.buffers.remove(&buffer);
4945                            drop(buffer);
4946                        });
4947                    }
4948                    10..=19 => {
4949                        let completions = project.update(&mut cx, |project, cx| {
4950                            log::info!(
4951                                "Guest {}: requesting completions for buffer {:?}",
4952                                guest_id,
4953                                buffer.read(cx).file().unwrap().full_path(cx)
4954                            );
4955                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4956                            project.completions(&buffer, offset, cx)
4957                        });
4958                        let completions = cx.background().spawn(async move {
4959                            completions.await.expect("completions request failed");
4960                        });
4961                        if rng.lock().gen_bool(0.3) {
4962                            log::info!("Guest {}: detaching completions request", guest_id);
4963                            completions.detach();
4964                        } else {
4965                            completions.await;
4966                        }
4967                    }
4968                    20..=29 => {
4969                        let code_actions = project.update(&mut cx, |project, cx| {
4970                            log::info!(
4971                                "Guest {}: requesting code actions for buffer {:?}",
4972                                guest_id,
4973                                buffer.read(cx).file().unwrap().full_path(cx)
4974                            );
4975                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4976                            project.code_actions(&buffer, range, cx)
4977                        });
4978                        let code_actions = cx.background().spawn(async move {
4979                            code_actions.await.expect("code actions request failed");
4980                        });
4981                        if rng.lock().gen_bool(0.3) {
4982                            log::info!("Guest {}: detaching code actions request", guest_id);
4983                            code_actions.detach();
4984                        } else {
4985                            code_actions.await;
4986                        }
4987                    }
4988                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4989                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4990                            log::info!(
4991                                "Guest {}: saving buffer {:?}",
4992                                guest_id,
4993                                buffer.file().unwrap().full_path(cx)
4994                            );
4995                            (buffer.version(), buffer.save(cx))
4996                        });
4997                        let save = cx.spawn(|cx| async move {
4998                            let (saved_version, _) = save.await.expect("save request failed");
4999                            buffer.read_with(&cx, |buffer, _| {
5000                                assert!(buffer.version().observed_all(&saved_version));
5001                                assert!(saved_version.observed_all(&requested_version));
5002                            });
5003                        });
5004                        if rng.lock().gen_bool(0.3) {
5005                            log::info!("Guest {}: detaching save request", guest_id);
5006                            save.detach();
5007                        } else {
5008                            save.await;
5009                        }
5010                    }
5011                    40..=45 => {
5012                        let prepare_rename = project.update(&mut cx, |project, cx| {
5013                            log::info!(
5014                                "Guest {}: preparing rename for buffer {:?}",
5015                                guest_id,
5016                                buffer.read(cx).file().unwrap().full_path(cx)
5017                            );
5018                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5019                            project.prepare_rename(buffer, offset, cx)
5020                        });
5021                        let prepare_rename = cx.background().spawn(async move {
5022                            prepare_rename.await.expect("prepare rename request failed");
5023                        });
5024                        if rng.lock().gen_bool(0.3) {
5025                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5026                            prepare_rename.detach();
5027                        } else {
5028                            prepare_rename.await;
5029                        }
5030                    }
5031                    46..=49 => {
5032                        let definitions = project.update(&mut cx, |project, cx| {
5033                            log::info!(
5034                                "Guest {}: requesting defintions for buffer {:?}",
5035                                guest_id,
5036                                buffer.read(cx).file().unwrap().full_path(cx)
5037                            );
5038                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5039                            project.definition(&buffer, offset, cx)
5040                        });
5041                        let definitions = cx.background().spawn(async move {
5042                            definitions.await.expect("definitions request failed");
5043                        });
5044                        if rng.lock().gen_bool(0.3) {
5045                            log::info!("Guest {}: detaching definitions request", guest_id);
5046                            definitions.detach();
5047                        } else {
5048                            definitions.await;
5049                        }
5050                    }
5051                    50..=55 => {
5052                        let highlights = project.update(&mut cx, |project, cx| {
5053                            log::info!(
5054                                "Guest {}: requesting highlights for buffer {:?}",
5055                                guest_id,
5056                                buffer.read(cx).file().unwrap().full_path(cx)
5057                            );
5058                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5059                            project.document_highlights(&buffer, offset, cx)
5060                        });
5061                        let highlights = cx.background().spawn(async move {
5062                            highlights.await.expect("highlights request failed");
5063                        });
5064                        if rng.lock().gen_bool(0.3) {
5065                            log::info!("Guest {}: detaching highlights request", guest_id);
5066                            highlights.detach();
5067                        } else {
5068                            highlights.await;
5069                        }
5070                    }
5071                    _ => {
5072                        buffer.update(&mut cx, |buffer, cx| {
5073                            log::info!(
5074                                "Guest {}: updating buffer {:?}",
5075                                guest_id,
5076                                buffer.file().unwrap().full_path(cx)
5077                            );
5078                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5079                        });
5080                    }
5081                }
5082                cx.background().simulate_random_delay().await;
5083            }
5084
5085            log::info!("Guest {} done", guest_id);
5086
5087            self.project = Some(project);
5088            (self, cx)
5089        }
5090    }
5091
5092    impl Executor for Arc<gpui::executor::Background> {
5093        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5094            self.spawn(future).detach();
5095        }
5096    }
5097
5098    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5099        channel
5100            .messages()
5101            .cursor::<()>()
5102            .map(|m| {
5103                (
5104                    m.sender.github_login.clone(),
5105                    m.body.clone(),
5106                    m.is_pending(),
5107                )
5108            })
5109            .collect()
5110    }
5111
5112    struct EmptyView;
5113
5114    impl gpui::Entity for EmptyView {
5115        type Event = ();
5116    }
5117
5118    impl gpui::View for EmptyView {
5119        fn ui_name() -> &'static str {
5120            "empty view"
5121        }
5122
5123        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5124            gpui::Element::boxed(gpui::elements::Empty)
5125        }
5126    }
5127}