rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::get_references)
  83            .add_request_handler(Server::get_document_highlights)
  84            .add_request_handler(Server::get_project_symbols)
  85            .add_request_handler(Server::open_buffer_for_symbol)
  86            .add_request_handler(Server::open_buffer)
  87            .add_message_handler(Server::close_buffer)
  88            .add_request_handler(Server::update_buffer)
  89            .add_message_handler(Server::update_buffer_file)
  90            .add_message_handler(Server::buffer_reloaded)
  91            .add_message_handler(Server::buffer_saved)
  92            .add_request_handler(Server::save_buffer)
  93            .add_request_handler(Server::format_buffers)
  94            .add_request_handler(Server::get_completions)
  95            .add_request_handler(Server::apply_additional_edits_for_completion)
  96            .add_request_handler(Server::get_code_actions)
  97            .add_request_handler(Server::apply_code_action)
  98            .add_request_handler(Server::prepare_rename)
  99            .add_request_handler(Server::perform_rename)
 100            .add_request_handler(Server::get_channels)
 101            .add_request_handler(Server::get_users)
 102            .add_request_handler(Server::join_channel)
 103            .add_message_handler(Server::leave_channel)
 104            .add_request_handler(Server::send_channel_message)
 105            .add_request_handler(Server::get_channel_messages);
 106
 107        Arc::new(server)
 108    }
 109
 110    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 111    where
 112        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 113        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 114        M: EnvelopedMessage,
 115    {
 116        let prev_handler = self.handlers.insert(
 117            TypeId::of::<M>(),
 118            Box::new(move |server, envelope| {
 119                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 120                (handler)(server, *envelope).boxed()
 121            }),
 122        );
 123        if prev_handler.is_some() {
 124            panic!("registered a handler for the same message twice");
 125        }
 126        self
 127    }
 128
 129    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 130    where
 131        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 132        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 133        M: RequestMessage,
 134    {
 135        self.add_message_handler(move |server, envelope| {
 136            let receipt = envelope.receipt();
 137            let response = (handler)(server.clone(), envelope);
 138            async move {
 139                match response.await {
 140                    Ok(response) => {
 141                        server.peer.respond(receipt, response)?;
 142                        Ok(())
 143                    }
 144                    Err(error) => {
 145                        server.peer.respond_with_error(
 146                            receipt,
 147                            proto::Error {
 148                                message: error.to_string(),
 149                            },
 150                        )?;
 151                        Err(error)
 152                    }
 153                }
 154            }
 155        })
 156    }
 157
 158    pub fn handle_connection<E: Executor>(
 159        self: &Arc<Self>,
 160        connection: Connection,
 161        addr: String,
 162        user_id: UserId,
 163        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 164        executor: E,
 165    ) -> impl Future<Output = ()> {
 166        let mut this = self.clone();
 167        async move {
 168            let (connection_id, handle_io, mut incoming_rx) =
 169                this.peer.add_connection(connection).await;
 170
 171            if let Some(send_connection_id) = send_connection_id.as_mut() {
 172                let _ = send_connection_id.send(connection_id).await;
 173            }
 174
 175            this.state_mut().add_connection(connection_id, user_id);
 176            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 177                log::error!("error updating contacts for {:?}: {}", user_id, err);
 178            }
 179
 180            let handle_io = handle_io.fuse();
 181            futures::pin_mut!(handle_io);
 182            loop {
 183                let next_message = incoming_rx.next().fuse();
 184                futures::pin_mut!(next_message);
 185                futures::select_biased! {
 186                    result = handle_io => {
 187                        if let Err(err) = result {
 188                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 189                        }
 190                        break;
 191                    }
 192                    message = next_message => {
 193                        if let Some(message) = message {
 194                            let start_time = Instant::now();
 195                            let type_name = message.payload_type_name();
 196                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 197                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 198                                let notifications = this.notifications.clone();
 199                                let is_background = message.is_background();
 200                                let handle_message = (handler)(this.clone(), message);
 201                                let handle_message = async move {
 202                                    if let Err(err) = handle_message.await {
 203                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 204                                    } else {
 205                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 206                                    }
 207                                    if let Some(mut notifications) = notifications {
 208                                        let _ = notifications.send(()).await;
 209                                    }
 210                                };
 211                                if is_background {
 212                                    executor.spawn_detached(handle_message);
 213                                } else {
 214                                    handle_message.await;
 215                                }
 216                            } else {
 217                                log::warn!("unhandled message: {}", type_name);
 218                            }
 219                        } else {
 220                            log::info!("rpc connection closed {:?}", addr);
 221                            break;
 222                        }
 223                    }
 224                }
 225            }
 226
 227            if let Err(err) = this.sign_out(connection_id).await {
 228                log::error!("error signing out connection {:?} - {:?}", addr, err);
 229            }
 230        }
 231    }
 232
 233    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 234        self.peer.disconnect(connection_id);
 235        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 236
 237        for (project_id, project) in removed_connection.hosted_projects {
 238            if let Some(share) = project.share {
 239                broadcast(
 240                    connection_id,
 241                    share.guests.keys().copied().collect(),
 242                    |conn_id| {
 243                        self.peer
 244                            .send(conn_id, proto::UnshareProject { project_id })
 245                    },
 246                )?;
 247            }
 248        }
 249
 250        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 251            broadcast(connection_id, peer_ids, |conn_id| {
 252                self.peer.send(
 253                    conn_id,
 254                    proto::RemoveProjectCollaborator {
 255                        project_id,
 256                        peer_id: connection_id.0,
 257                    },
 258                )
 259            })?;
 260        }
 261
 262        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 263        Ok(())
 264    }
 265
 266    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 267        Ok(proto::Ack {})
 268    }
 269
 270    async fn register_project(
 271        mut self: Arc<Server>,
 272        request: TypedEnvelope<proto::RegisterProject>,
 273    ) -> tide::Result<proto::RegisterProjectResponse> {
 274        let project_id = {
 275            let mut state = self.state_mut();
 276            let user_id = state.user_id_for_connection(request.sender_id)?;
 277            state.register_project(request.sender_id, user_id)
 278        };
 279        Ok(proto::RegisterProjectResponse { project_id })
 280    }
 281
 282    async fn unregister_project(
 283        mut self: Arc<Server>,
 284        request: TypedEnvelope<proto::UnregisterProject>,
 285    ) -> tide::Result<()> {
 286        let project = self
 287            .state_mut()
 288            .unregister_project(request.payload.project_id, request.sender_id)?;
 289        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 290        Ok(())
 291    }
 292
 293    async fn share_project(
 294        mut self: Arc<Server>,
 295        request: TypedEnvelope<proto::ShareProject>,
 296    ) -> tide::Result<proto::Ack> {
 297        self.state_mut()
 298            .share_project(request.payload.project_id, request.sender_id);
 299        Ok(proto::Ack {})
 300    }
 301
 302    async fn unshare_project(
 303        mut self: Arc<Server>,
 304        request: TypedEnvelope<proto::UnshareProject>,
 305    ) -> tide::Result<()> {
 306        let project_id = request.payload.project_id;
 307        let project = self
 308            .state_mut()
 309            .unshare_project(project_id, request.sender_id)?;
 310
 311        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 312            self.peer
 313                .send(conn_id, proto::UnshareProject { project_id })
 314        })?;
 315        self.update_contacts_for_users(&project.authorized_user_ids)?;
 316        Ok(())
 317    }
 318
 319    async fn join_project(
 320        mut self: Arc<Server>,
 321        request: TypedEnvelope<proto::JoinProject>,
 322    ) -> tide::Result<proto::JoinProjectResponse> {
 323        let project_id = request.payload.project_id;
 324
 325        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 326        let (response, connection_ids, contact_user_ids) = self
 327            .state_mut()
 328            .join_project(request.sender_id, user_id, project_id)
 329            .and_then(|joined| {
 330                let share = joined.project.share()?;
 331                let peer_count = share.guests.len();
 332                let mut collaborators = Vec::with_capacity(peer_count);
 333                collaborators.push(proto::Collaborator {
 334                    peer_id: joined.project.host_connection_id.0,
 335                    replica_id: 0,
 336                    user_id: joined.project.host_user_id.to_proto(),
 337                });
 338                let worktrees = joined
 339                    .project
 340                    .worktrees
 341                    .iter()
 342                    .filter_map(|(id, worktree)| {
 343                        worktree.share.as_ref().map(|share| proto::Worktree {
 344                            id: *id,
 345                            root_name: worktree.root_name.clone(),
 346                            entries: share.entries.values().cloned().collect(),
 347                            diagnostic_summaries: share
 348                                .diagnostic_summaries
 349                                .values()
 350                                .cloned()
 351                                .collect(),
 352                            weak: worktree.weak,
 353                        })
 354                    })
 355                    .collect();
 356                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 357                    if *peer_conn_id != request.sender_id {
 358                        collaborators.push(proto::Collaborator {
 359                            peer_id: peer_conn_id.0,
 360                            replica_id: *peer_replica_id as u32,
 361                            user_id: peer_user_id.to_proto(),
 362                        });
 363                    }
 364                }
 365                let response = proto::JoinProjectResponse {
 366                    worktrees,
 367                    replica_id: joined.replica_id as u32,
 368                    collaborators,
 369                };
 370                let connection_ids = joined.project.connection_ids();
 371                let contact_user_ids = joined.project.authorized_user_ids();
 372                Ok((response, connection_ids, contact_user_ids))
 373            })?;
 374
 375        broadcast(request.sender_id, connection_ids, |conn_id| {
 376            self.peer.send(
 377                conn_id,
 378                proto::AddProjectCollaborator {
 379                    project_id,
 380                    collaborator: Some(proto::Collaborator {
 381                        peer_id: request.sender_id.0,
 382                        replica_id: response.replica_id,
 383                        user_id: user_id.to_proto(),
 384                    }),
 385                },
 386            )
 387        })?;
 388        self.update_contacts_for_users(&contact_user_ids)?;
 389        Ok(response)
 390    }
 391
 392    async fn leave_project(
 393        mut self: Arc<Server>,
 394        request: TypedEnvelope<proto::LeaveProject>,
 395    ) -> tide::Result<()> {
 396        let sender_id = request.sender_id;
 397        let project_id = request.payload.project_id;
 398        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 399
 400        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 401            self.peer.send(
 402                conn_id,
 403                proto::RemoveProjectCollaborator {
 404                    project_id,
 405                    peer_id: sender_id.0,
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 410
 411        Ok(())
 412    }
 413
 414    async fn register_worktree(
 415        mut self: Arc<Server>,
 416        request: TypedEnvelope<proto::RegisterWorktree>,
 417    ) -> tide::Result<proto::Ack> {
 418        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 419
 420        let mut contact_user_ids = HashSet::default();
 421        contact_user_ids.insert(host_user_id);
 422        for github_login in request.payload.authorized_logins {
 423            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 424            contact_user_ids.insert(contact_user_id);
 425        }
 426
 427        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 428        self.state_mut().register_worktree(
 429            request.payload.project_id,
 430            request.payload.worktree_id,
 431            request.sender_id,
 432            Worktree {
 433                authorized_user_ids: contact_user_ids.clone(),
 434                root_name: request.payload.root_name,
 435                share: None,
 436                weak: false,
 437            },
 438        )?;
 439        self.update_contacts_for_users(&contact_user_ids)?;
 440        Ok(proto::Ack {})
 441    }
 442
 443    async fn unregister_worktree(
 444        mut self: Arc<Server>,
 445        request: TypedEnvelope<proto::UnregisterWorktree>,
 446    ) -> tide::Result<()> {
 447        let project_id = request.payload.project_id;
 448        let worktree_id = request.payload.worktree_id;
 449        let (worktree, guest_connection_ids) =
 450            self.state_mut()
 451                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 452        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 453            self.peer.send(
 454                conn_id,
 455                proto::UnregisterWorktree {
 456                    project_id,
 457                    worktree_id,
 458                },
 459            )
 460        })?;
 461        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 462        Ok(())
 463    }
 464
 465    async fn share_worktree(
 466        mut self: Arc<Server>,
 467        mut request: TypedEnvelope<proto::ShareWorktree>,
 468    ) -> tide::Result<proto::Ack> {
 469        let worktree = request
 470            .payload
 471            .worktree
 472            .as_mut()
 473            .ok_or_else(|| anyhow!("missing worktree"))?;
 474        let entries = worktree
 475            .entries
 476            .iter()
 477            .map(|entry| (entry.id, entry.clone()))
 478            .collect();
 479        let diagnostic_summaries = worktree
 480            .diagnostic_summaries
 481            .iter()
 482            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 483            .collect();
 484
 485        let shared_worktree = self.state_mut().share_worktree(
 486            request.payload.project_id,
 487            worktree.id,
 488            request.sender_id,
 489            entries,
 490            diagnostic_summaries,
 491        )?;
 492
 493        broadcast(
 494            request.sender_id,
 495            shared_worktree.connection_ids,
 496            |connection_id| {
 497                self.peer
 498                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 499            },
 500        )?;
 501        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 502
 503        Ok(proto::Ack {})
 504    }
 505
 506    async fn update_worktree(
 507        mut self: Arc<Server>,
 508        request: TypedEnvelope<proto::UpdateWorktree>,
 509    ) -> tide::Result<proto::Ack> {
 510        let connection_ids = self.state_mut().update_worktree(
 511            request.sender_id,
 512            request.payload.project_id,
 513            request.payload.worktree_id,
 514            &request.payload.removed_entries,
 515            &request.payload.updated_entries,
 516        )?;
 517
 518        broadcast(request.sender_id, connection_ids, |connection_id| {
 519            self.peer
 520                .forward_send(request.sender_id, connection_id, request.payload.clone())
 521        })?;
 522
 523        Ok(proto::Ack {})
 524    }
 525
 526    async fn update_diagnostic_summary(
 527        mut self: Arc<Server>,
 528        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 529    ) -> tide::Result<()> {
 530        let summary = request
 531            .payload
 532            .summary
 533            .clone()
 534            .ok_or_else(|| anyhow!("invalid summary"))?;
 535        let receiver_ids = self.state_mut().update_diagnostic_summary(
 536            request.payload.project_id,
 537            request.payload.worktree_id,
 538            request.sender_id,
 539            summary,
 540        )?;
 541
 542        broadcast(request.sender_id, receiver_ids, |connection_id| {
 543            self.peer
 544                .forward_send(request.sender_id, connection_id, request.payload.clone())
 545        })?;
 546        Ok(())
 547    }
 548
 549    async fn disk_based_diagnostics_updating(
 550        self: Arc<Server>,
 551        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 552    ) -> tide::Result<()> {
 553        let receiver_ids = self
 554            .state()
 555            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 556        broadcast(request.sender_id, receiver_ids, |connection_id| {
 557            self.peer
 558                .forward_send(request.sender_id, connection_id, request.payload.clone())
 559        })?;
 560        Ok(())
 561    }
 562
 563    async fn disk_based_diagnostics_updated(
 564        self: Arc<Server>,
 565        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 566    ) -> tide::Result<()> {
 567        let receiver_ids = self
 568            .state()
 569            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 570        broadcast(request.sender_id, receiver_ids, |connection_id| {
 571            self.peer
 572                .forward_send(request.sender_id, connection_id, request.payload.clone())
 573        })?;
 574        Ok(())
 575    }
 576
 577    async fn get_definition(
 578        self: Arc<Server>,
 579        request: TypedEnvelope<proto::GetDefinition>,
 580    ) -> tide::Result<proto::GetDefinitionResponse> {
 581        let host_connection_id = self
 582            .state()
 583            .read_project(request.payload.project_id, request.sender_id)?
 584            .host_connection_id;
 585        Ok(self
 586            .peer
 587            .forward_request(request.sender_id, host_connection_id, request.payload)
 588            .await?)
 589    }
 590
 591    async fn get_references(
 592        self: Arc<Server>,
 593        request: TypedEnvelope<proto::GetReferences>,
 594    ) -> tide::Result<proto::GetReferencesResponse> {
 595        let host_connection_id = self
 596            .state()
 597            .read_project(request.payload.project_id, request.sender_id)?
 598            .host_connection_id;
 599        Ok(self
 600            .peer
 601            .forward_request(request.sender_id, host_connection_id, request.payload)
 602            .await?)
 603    }
 604
 605    async fn get_document_highlights(
 606        self: Arc<Server>,
 607        request: TypedEnvelope<proto::GetDocumentHighlights>,
 608    ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
 609        let host_connection_id = self
 610            .state()
 611            .read_project(request.payload.project_id, request.sender_id)?
 612            .host_connection_id;
 613        Ok(self
 614            .peer
 615            .forward_request(request.sender_id, host_connection_id, request.payload)
 616            .await?)
 617    }
 618
 619    async fn get_project_symbols(
 620        self: Arc<Server>,
 621        request: TypedEnvelope<proto::GetProjectSymbols>,
 622    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 623        let host_connection_id = self
 624            .state()
 625            .read_project(request.payload.project_id, request.sender_id)?
 626            .host_connection_id;
 627        Ok(self
 628            .peer
 629            .forward_request(request.sender_id, host_connection_id, request.payload)
 630            .await?)
 631    }
 632
 633    async fn open_buffer_for_symbol(
 634        self: Arc<Server>,
 635        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 636    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 637        let host_connection_id = self
 638            .state()
 639            .read_project(request.payload.project_id, request.sender_id)?
 640            .host_connection_id;
 641        Ok(self
 642            .peer
 643            .forward_request(request.sender_id, host_connection_id, request.payload)
 644            .await?)
 645    }
 646
 647    async fn open_buffer(
 648        self: Arc<Server>,
 649        request: TypedEnvelope<proto::OpenBuffer>,
 650    ) -> tide::Result<proto::OpenBufferResponse> {
 651        let host_connection_id = self
 652            .state()
 653            .read_project(request.payload.project_id, request.sender_id)?
 654            .host_connection_id;
 655        Ok(self
 656            .peer
 657            .forward_request(request.sender_id, host_connection_id, request.payload)
 658            .await?)
 659    }
 660
 661    async fn close_buffer(
 662        self: Arc<Server>,
 663        request: TypedEnvelope<proto::CloseBuffer>,
 664    ) -> tide::Result<()> {
 665        let host_connection_id = self
 666            .state()
 667            .read_project(request.payload.project_id, request.sender_id)?
 668            .host_connection_id;
 669        self.peer
 670            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 671        Ok(())
 672    }
 673
 674    async fn save_buffer(
 675        self: Arc<Server>,
 676        request: TypedEnvelope<proto::SaveBuffer>,
 677    ) -> tide::Result<proto::BufferSaved> {
 678        let host;
 679        let mut guests;
 680        {
 681            let state = self.state();
 682            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 683            host = project.host_connection_id;
 684            guests = project.guest_connection_ids()
 685        }
 686
 687        let response = self
 688            .peer
 689            .forward_request(request.sender_id, host, request.payload.clone())
 690            .await?;
 691
 692        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 693        broadcast(host, guests, |conn_id| {
 694            self.peer.forward_send(host, conn_id, response.clone())
 695        })?;
 696
 697        Ok(response)
 698    }
 699
 700    async fn format_buffers(
 701        self: Arc<Server>,
 702        request: TypedEnvelope<proto::FormatBuffers>,
 703    ) -> tide::Result<proto::FormatBuffersResponse> {
 704        let host = self
 705            .state()
 706            .read_project(request.payload.project_id, request.sender_id)?
 707            .host_connection_id;
 708        Ok(self
 709            .peer
 710            .forward_request(request.sender_id, host, request.payload.clone())
 711            .await?)
 712    }
 713
 714    async fn get_completions(
 715        self: Arc<Server>,
 716        request: TypedEnvelope<proto::GetCompletions>,
 717    ) -> tide::Result<proto::GetCompletionsResponse> {
 718        let host = self
 719            .state()
 720            .read_project(request.payload.project_id, request.sender_id)?
 721            .host_connection_id;
 722        Ok(self
 723            .peer
 724            .forward_request(request.sender_id, host, request.payload.clone())
 725            .await?)
 726    }
 727
 728    async fn apply_additional_edits_for_completion(
 729        self: Arc<Server>,
 730        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 731    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 732        let host = self
 733            .state()
 734            .read_project(request.payload.project_id, request.sender_id)?
 735            .host_connection_id;
 736        Ok(self
 737            .peer
 738            .forward_request(request.sender_id, host, request.payload.clone())
 739            .await?)
 740    }
 741
 742    async fn get_code_actions(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::GetCodeActions>,
 745    ) -> tide::Result<proto::GetCodeActionsResponse> {
 746        let host = self
 747            .state()
 748            .read_project(request.payload.project_id, request.sender_id)?
 749            .host_connection_id;
 750        Ok(self
 751            .peer
 752            .forward_request(request.sender_id, host, request.payload.clone())
 753            .await?)
 754    }
 755
 756    async fn apply_code_action(
 757        self: Arc<Server>,
 758        request: TypedEnvelope<proto::ApplyCodeAction>,
 759    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 760        let host = self
 761            .state()
 762            .read_project(request.payload.project_id, request.sender_id)?
 763            .host_connection_id;
 764        Ok(self
 765            .peer
 766            .forward_request(request.sender_id, host, request.payload.clone())
 767            .await?)
 768    }
 769
 770    async fn prepare_rename(
 771        self: Arc<Server>,
 772        request: TypedEnvelope<proto::PrepareRename>,
 773    ) -> tide::Result<proto::PrepareRenameResponse> {
 774        let host = self
 775            .state()
 776            .read_project(request.payload.project_id, request.sender_id)?
 777            .host_connection_id;
 778        Ok(self
 779            .peer
 780            .forward_request(request.sender_id, host, request.payload.clone())
 781            .await?)
 782    }
 783
 784    async fn perform_rename(
 785        self: Arc<Server>,
 786        request: TypedEnvelope<proto::PerformRename>,
 787    ) -> tide::Result<proto::PerformRenameResponse> {
 788        let host = self
 789            .state()
 790            .read_project(request.payload.project_id, request.sender_id)?
 791            .host_connection_id;
 792        Ok(self
 793            .peer
 794            .forward_request(request.sender_id, host, request.payload.clone())
 795            .await?)
 796    }
 797
 798    async fn update_buffer(
 799        self: Arc<Server>,
 800        request: TypedEnvelope<proto::UpdateBuffer>,
 801    ) -> tide::Result<proto::Ack> {
 802        let receiver_ids = self
 803            .state()
 804            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 805        broadcast(request.sender_id, receiver_ids, |connection_id| {
 806            self.peer
 807                .forward_send(request.sender_id, connection_id, request.payload.clone())
 808        })?;
 809        Ok(proto::Ack {})
 810    }
 811
 812    async fn update_buffer_file(
 813        self: Arc<Server>,
 814        request: TypedEnvelope<proto::UpdateBufferFile>,
 815    ) -> tide::Result<()> {
 816        let receiver_ids = self
 817            .state()
 818            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 819        broadcast(request.sender_id, receiver_ids, |connection_id| {
 820            self.peer
 821                .forward_send(request.sender_id, connection_id, request.payload.clone())
 822        })?;
 823        Ok(())
 824    }
 825
 826    async fn buffer_reloaded(
 827        self: Arc<Server>,
 828        request: TypedEnvelope<proto::BufferReloaded>,
 829    ) -> tide::Result<()> {
 830        let receiver_ids = self
 831            .state()
 832            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 833        broadcast(request.sender_id, receiver_ids, |connection_id| {
 834            self.peer
 835                .forward_send(request.sender_id, connection_id, request.payload.clone())
 836        })?;
 837        Ok(())
 838    }
 839
 840    async fn buffer_saved(
 841        self: Arc<Server>,
 842        request: TypedEnvelope<proto::BufferSaved>,
 843    ) -> tide::Result<()> {
 844        let receiver_ids = self
 845            .state()
 846            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 847        broadcast(request.sender_id, receiver_ids, |connection_id| {
 848            self.peer
 849                .forward_send(request.sender_id, connection_id, request.payload.clone())
 850        })?;
 851        Ok(())
 852    }
 853
 854    async fn get_channels(
 855        self: Arc<Server>,
 856        request: TypedEnvelope<proto::GetChannels>,
 857    ) -> tide::Result<proto::GetChannelsResponse> {
 858        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 859        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 860        Ok(proto::GetChannelsResponse {
 861            channels: channels
 862                .into_iter()
 863                .map(|chan| proto::Channel {
 864                    id: chan.id.to_proto(),
 865                    name: chan.name,
 866                })
 867                .collect(),
 868        })
 869    }
 870
 871    async fn get_users(
 872        self: Arc<Server>,
 873        request: TypedEnvelope<proto::GetUsers>,
 874    ) -> tide::Result<proto::GetUsersResponse> {
 875        let user_ids = request
 876            .payload
 877            .user_ids
 878            .into_iter()
 879            .map(UserId::from_proto)
 880            .collect();
 881        let users = self
 882            .app_state
 883            .db
 884            .get_users_by_ids(user_ids)
 885            .await?
 886            .into_iter()
 887            .map(|user| proto::User {
 888                id: user.id.to_proto(),
 889                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 890                github_login: user.github_login,
 891            })
 892            .collect();
 893        Ok(proto::GetUsersResponse { users })
 894    }
 895
 896    fn update_contacts_for_users<'a>(
 897        self: &Arc<Server>,
 898        user_ids: impl IntoIterator<Item = &'a UserId>,
 899    ) -> anyhow::Result<()> {
 900        let mut result = Ok(());
 901        let state = self.state();
 902        for user_id in user_ids {
 903            let contacts = state.contacts_for_user(*user_id);
 904            for connection_id in state.connection_ids_for_user(*user_id) {
 905                if let Err(error) = self.peer.send(
 906                    connection_id,
 907                    proto::UpdateContacts {
 908                        contacts: contacts.clone(),
 909                    },
 910                ) {
 911                    result = Err(error);
 912                }
 913            }
 914        }
 915        result
 916    }
 917
 918    async fn join_channel(
 919        mut self: Arc<Self>,
 920        request: TypedEnvelope<proto::JoinChannel>,
 921    ) -> tide::Result<proto::JoinChannelResponse> {
 922        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 923        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 924        if !self
 925            .app_state
 926            .db
 927            .can_user_access_channel(user_id, channel_id)
 928            .await?
 929        {
 930            Err(anyhow!("access denied"))?;
 931        }
 932
 933        self.state_mut().join_channel(request.sender_id, channel_id);
 934        let messages = self
 935            .app_state
 936            .db
 937            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 938            .await?
 939            .into_iter()
 940            .map(|msg| proto::ChannelMessage {
 941                id: msg.id.to_proto(),
 942                body: msg.body,
 943                timestamp: msg.sent_at.unix_timestamp() as u64,
 944                sender_id: msg.sender_id.to_proto(),
 945                nonce: Some(msg.nonce.as_u128().into()),
 946            })
 947            .collect::<Vec<_>>();
 948        Ok(proto::JoinChannelResponse {
 949            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 950            messages,
 951        })
 952    }
 953
 954    async fn leave_channel(
 955        mut self: Arc<Self>,
 956        request: TypedEnvelope<proto::LeaveChannel>,
 957    ) -> tide::Result<()> {
 958        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 959        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 960        if !self
 961            .app_state
 962            .db
 963            .can_user_access_channel(user_id, channel_id)
 964            .await?
 965        {
 966            Err(anyhow!("access denied"))?;
 967        }
 968
 969        self.state_mut()
 970            .leave_channel(request.sender_id, channel_id);
 971
 972        Ok(())
 973    }
 974
 975    async fn send_channel_message(
 976        self: Arc<Self>,
 977        request: TypedEnvelope<proto::SendChannelMessage>,
 978    ) -> tide::Result<proto::SendChannelMessageResponse> {
 979        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 980        let user_id;
 981        let connection_ids;
 982        {
 983            let state = self.state();
 984            user_id = state.user_id_for_connection(request.sender_id)?;
 985            connection_ids = state.channel_connection_ids(channel_id)?;
 986        }
 987
 988        // Validate the message body.
 989        let body = request.payload.body.trim().to_string();
 990        if body.len() > MAX_MESSAGE_LEN {
 991            return Err(anyhow!("message is too long"))?;
 992        }
 993        if body.is_empty() {
 994            return Err(anyhow!("message can't be blank"))?;
 995        }
 996
 997        let timestamp = OffsetDateTime::now_utc();
 998        let nonce = request
 999            .payload
1000            .nonce
1001            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1002
1003        let message_id = self
1004            .app_state
1005            .db
1006            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1007            .await?
1008            .to_proto();
1009        let message = proto::ChannelMessage {
1010            sender_id: user_id.to_proto(),
1011            id: message_id,
1012            body,
1013            timestamp: timestamp.unix_timestamp() as u64,
1014            nonce: Some(nonce),
1015        };
1016        broadcast(request.sender_id, connection_ids, |conn_id| {
1017            self.peer.send(
1018                conn_id,
1019                proto::ChannelMessageSent {
1020                    channel_id: channel_id.to_proto(),
1021                    message: Some(message.clone()),
1022                },
1023            )
1024        })?;
1025        Ok(proto::SendChannelMessageResponse {
1026            message: Some(message),
1027        })
1028    }
1029
1030    async fn get_channel_messages(
1031        self: Arc<Self>,
1032        request: TypedEnvelope<proto::GetChannelMessages>,
1033    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1034        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1035        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1036        if !self
1037            .app_state
1038            .db
1039            .can_user_access_channel(user_id, channel_id)
1040            .await?
1041        {
1042            Err(anyhow!("access denied"))?;
1043        }
1044
1045        let messages = self
1046            .app_state
1047            .db
1048            .get_channel_messages(
1049                channel_id,
1050                MESSAGE_COUNT_PER_PAGE,
1051                Some(MessageId::from_proto(request.payload.before_message_id)),
1052            )
1053            .await?
1054            .into_iter()
1055            .map(|msg| proto::ChannelMessage {
1056                id: msg.id.to_proto(),
1057                body: msg.body,
1058                timestamp: msg.sent_at.unix_timestamp() as u64,
1059                sender_id: msg.sender_id.to_proto(),
1060                nonce: Some(msg.nonce.as_u128().into()),
1061            })
1062            .collect::<Vec<_>>();
1063
1064        Ok(proto::GetChannelMessagesResponse {
1065            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1066            messages,
1067        })
1068    }
1069
1070    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1071        self.store.read()
1072    }
1073
1074    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1075        self.store.write()
1076    }
1077}
1078
1079impl Executor for RealExecutor {
1080    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1081        task::spawn(future);
1082    }
1083}
1084
1085fn broadcast<F>(
1086    sender_id: ConnectionId,
1087    receiver_ids: Vec<ConnectionId>,
1088    mut f: F,
1089) -> anyhow::Result<()>
1090where
1091    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1092{
1093    let mut result = Ok(());
1094    for receiver_id in receiver_ids {
1095        if receiver_id != sender_id {
1096            if let Err(error) = f(receiver_id) {
1097                if result.is_ok() {
1098                    result = Err(error);
1099                }
1100            }
1101        }
1102    }
1103    result
1104}
1105
1106pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1107    let server = Server::new(app.state().clone(), rpc.clone(), None);
1108    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1109        let server = server.clone();
1110        async move {
1111            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1112
1113            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1114            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1115            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1116            let client_protocol_version: Option<u32> = request
1117                .header("X-Zed-Protocol-Version")
1118                .and_then(|v| v.as_str().parse().ok());
1119
1120            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1121                return Ok(Response::new(StatusCode::UpgradeRequired));
1122            }
1123
1124            let header = match request.header("Sec-Websocket-Key") {
1125                Some(h) => h.as_str(),
1126                None => return Err(anyhow!("expected sec-websocket-key"))?,
1127            };
1128
1129            let user_id = process_auth_header(&request).await?;
1130
1131            let mut response = Response::new(StatusCode::SwitchingProtocols);
1132            response.insert_header(UPGRADE, "websocket");
1133            response.insert_header(CONNECTION, "Upgrade");
1134            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1135            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1136            response.insert_header("Sec-Websocket-Version", "13");
1137
1138            let http_res: &mut tide::http::Response = response.as_mut();
1139            let upgrade_receiver = http_res.recv_upgrade().await;
1140            let addr = request.remote().unwrap_or("unknown").to_string();
1141            task::spawn(async move {
1142                if let Some(stream) = upgrade_receiver.await {
1143                    server
1144                        .handle_connection(
1145                            Connection::new(
1146                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1147                            ),
1148                            addr,
1149                            user_id,
1150                            None,
1151                            RealExecutor,
1152                        )
1153                        .await;
1154                }
1155            });
1156
1157            Ok(response)
1158        }
1159    });
1160}
1161
1162fn header_contains_ignore_case<T>(
1163    request: &tide::Request<T>,
1164    header_name: HeaderName,
1165    value: &str,
1166) -> bool {
1167    request
1168        .header(header_name)
1169        .map(|h| {
1170            h.as_str()
1171                .split(',')
1172                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1173        })
1174        .unwrap_or(false)
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179    use super::*;
1180    use crate::{
1181        auth,
1182        db::{tests::TestDb, UserId},
1183        github, AppState, Config,
1184    };
1185    use ::rpc::Peer;
1186    use collections::BTreeMap;
1187    use gpui::{executor, ModelHandle, TestAppContext};
1188    use parking_lot::Mutex;
1189    use postage::{sink::Sink, watch};
1190    use rand::prelude::*;
1191    use rpc::PeerId;
1192    use serde_json::json;
1193    use sqlx::types::time::OffsetDateTime;
1194    use std::{
1195        cell::Cell,
1196        env,
1197        ops::Deref,
1198        path::Path,
1199        rc::Rc,
1200        sync::{
1201            atomic::{AtomicBool, Ordering::SeqCst},
1202            Arc,
1203        },
1204        time::Duration,
1205    };
1206    use zed::{
1207        client::{
1208            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1209            EstablishConnectionError, UserStore,
1210        },
1211        editor::{
1212            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1213            Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1214        },
1215        fs::{FakeFs, Fs as _},
1216        language::{
1217            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1218            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1219        },
1220        lsp,
1221        project::{DiagnosticSummary, Project, ProjectPath},
1222        workspace::{Workspace, WorkspaceParams},
1223    };
1224
1225    #[cfg(test)]
1226    #[ctor::ctor]
1227    fn init_logger() {
1228        if std::env::var("RUST_LOG").is_ok() {
1229            env_logger::init();
1230        }
1231    }
1232
1233    #[gpui::test(iterations = 10)]
1234    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1235        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1236        let lang_registry = Arc::new(LanguageRegistry::new());
1237        let fs = FakeFs::new(cx_a.background());
1238        cx_a.foreground().forbid_parking();
1239
1240        // Connect to a server as 2 clients.
1241        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1242        let client_a = server.create_client(&mut cx_a, "user_a").await;
1243        let client_b = server.create_client(&mut cx_b, "user_b").await;
1244
1245        // Share a project as client A
1246        fs.insert_tree(
1247            "/a",
1248            json!({
1249                ".zed.toml": r#"collaborators = ["user_b"]"#,
1250                "a.txt": "a-contents",
1251                "b.txt": "b-contents",
1252            }),
1253        )
1254        .await;
1255        let project_a = cx_a.update(|cx| {
1256            Project::local(
1257                client_a.clone(),
1258                client_a.user_store.clone(),
1259                lang_registry.clone(),
1260                fs.clone(),
1261                cx,
1262            )
1263        });
1264        let (worktree_a, _) = project_a
1265            .update(&mut cx_a, |p, cx| {
1266                p.find_or_create_local_worktree("/a", false, cx)
1267            })
1268            .await
1269            .unwrap();
1270        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1271        worktree_a
1272            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1273            .await;
1274        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1275        project_a
1276            .update(&mut cx_a, |p, cx| p.share(cx))
1277            .await
1278            .unwrap();
1279
1280        // Join that project as client B
1281        let project_b = Project::remote(
1282            project_id,
1283            client_b.clone(),
1284            client_b.user_store.clone(),
1285            lang_registry.clone(),
1286            fs.clone(),
1287            &mut cx_b.to_async(),
1288        )
1289        .await
1290        .unwrap();
1291
1292        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1293            assert_eq!(
1294                project
1295                    .collaborators()
1296                    .get(&client_a.peer_id)
1297                    .unwrap()
1298                    .user
1299                    .github_login,
1300                "user_a"
1301            );
1302            project.replica_id()
1303        });
1304        project_a
1305            .condition(&cx_a, |tree, _| {
1306                tree.collaborators()
1307                    .get(&client_b.peer_id)
1308                    .map_or(false, |collaborator| {
1309                        collaborator.replica_id == replica_id_b
1310                            && collaborator.user.github_login == "user_b"
1311                    })
1312            })
1313            .await;
1314
1315        // Open the same file as client B and client A.
1316        let buffer_b = project_b
1317            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1318            .await
1319            .unwrap();
1320        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1321        buffer_b.read_with(&cx_b, |buf, cx| {
1322            assert_eq!(buf.read(cx).text(), "b-contents")
1323        });
1324        project_a.read_with(&cx_a, |project, cx| {
1325            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1326        });
1327        let buffer_a = project_a
1328            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1329            .await
1330            .unwrap();
1331
1332        let editor_b = cx_b.add_view(window_b, |cx| {
1333            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1334        });
1335
1336        // TODO
1337        // // Create a selection set as client B and see that selection set as client A.
1338        // buffer_a
1339        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1340        //     .await;
1341
1342        // Edit the buffer as client B and see that edit as client A.
1343        editor_b.update(&mut cx_b, |editor, cx| {
1344            editor.handle_input(&Input("ok, ".into()), cx)
1345        });
1346        buffer_a
1347            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1348            .await;
1349
1350        // TODO
1351        // // Remove the selection set as client B, see those selections disappear as client A.
1352        cx_b.update(move |_| drop(editor_b));
1353        // buffer_a
1354        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1355        //     .await;
1356
1357        // Close the buffer as client A, see that the buffer is closed.
1358        cx_a.update(move |_| drop(buffer_a));
1359        project_a
1360            .condition(&cx_a, |project, cx| {
1361                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1362            })
1363            .await;
1364
1365        // Dropping the client B's project removes client B from client A's collaborators.
1366        cx_b.update(move |_| drop(project_b));
1367        project_a
1368            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1369            .await;
1370    }
1371
1372    #[gpui::test(iterations = 10)]
1373    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1374        let lang_registry = Arc::new(LanguageRegistry::new());
1375        let fs = FakeFs::new(cx_a.background());
1376        cx_a.foreground().forbid_parking();
1377
1378        // Connect to a server as 2 clients.
1379        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1380        let client_a = server.create_client(&mut cx_a, "user_a").await;
1381        let client_b = server.create_client(&mut cx_b, "user_b").await;
1382
1383        // Share a project as client A
1384        fs.insert_tree(
1385            "/a",
1386            json!({
1387                ".zed.toml": r#"collaborators = ["user_b"]"#,
1388                "a.txt": "a-contents",
1389                "b.txt": "b-contents",
1390            }),
1391        )
1392        .await;
1393        let project_a = cx_a.update(|cx| {
1394            Project::local(
1395                client_a.clone(),
1396                client_a.user_store.clone(),
1397                lang_registry.clone(),
1398                fs.clone(),
1399                cx,
1400            )
1401        });
1402        let (worktree_a, _) = project_a
1403            .update(&mut cx_a, |p, cx| {
1404                p.find_or_create_local_worktree("/a", false, cx)
1405            })
1406            .await
1407            .unwrap();
1408        worktree_a
1409            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1410            .await;
1411        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1412        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1413        project_a
1414            .update(&mut cx_a, |p, cx| p.share(cx))
1415            .await
1416            .unwrap();
1417        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1418
1419        // Join that project as client B
1420        let project_b = Project::remote(
1421            project_id,
1422            client_b.clone(),
1423            client_b.user_store.clone(),
1424            lang_registry.clone(),
1425            fs.clone(),
1426            &mut cx_b.to_async(),
1427        )
1428        .await
1429        .unwrap();
1430        project_b
1431            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1432            .await
1433            .unwrap();
1434
1435        // Unshare the project as client A
1436        project_a
1437            .update(&mut cx_a, |project, cx| project.unshare(cx))
1438            .await
1439            .unwrap();
1440        project_b
1441            .condition(&mut cx_b, |project, _| project.is_read_only())
1442            .await;
1443        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1444        drop(project_b);
1445
1446        // Share the project again and ensure guests can still join.
1447        project_a
1448            .update(&mut cx_a, |project, cx| project.share(cx))
1449            .await
1450            .unwrap();
1451        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1452
1453        let project_c = Project::remote(
1454            project_id,
1455            client_b.clone(),
1456            client_b.user_store.clone(),
1457            lang_registry.clone(),
1458            fs.clone(),
1459            &mut cx_b.to_async(),
1460        )
1461        .await
1462        .unwrap();
1463        project_c
1464            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1465            .await
1466            .unwrap();
1467    }
1468
1469    #[gpui::test(iterations = 10)]
1470    async fn test_propagate_saves_and_fs_changes(
1471        mut cx_a: TestAppContext,
1472        mut cx_b: TestAppContext,
1473        mut cx_c: TestAppContext,
1474    ) {
1475        let lang_registry = Arc::new(LanguageRegistry::new());
1476        let fs = FakeFs::new(cx_a.background());
1477        cx_a.foreground().forbid_parking();
1478
1479        // Connect to a server as 3 clients.
1480        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1481        let client_a = server.create_client(&mut cx_a, "user_a").await;
1482        let client_b = server.create_client(&mut cx_b, "user_b").await;
1483        let client_c = server.create_client(&mut cx_c, "user_c").await;
1484
1485        // Share a worktree as client A.
1486        fs.insert_tree(
1487            "/a",
1488            json!({
1489                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1490                "file1": "",
1491                "file2": ""
1492            }),
1493        )
1494        .await;
1495        let project_a = cx_a.update(|cx| {
1496            Project::local(
1497                client_a.clone(),
1498                client_a.user_store.clone(),
1499                lang_registry.clone(),
1500                fs.clone(),
1501                cx,
1502            )
1503        });
1504        let (worktree_a, _) = project_a
1505            .update(&mut cx_a, |p, cx| {
1506                p.find_or_create_local_worktree("/a", false, cx)
1507            })
1508            .await
1509            .unwrap();
1510        worktree_a
1511            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1512            .await;
1513        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1514        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1515        project_a
1516            .update(&mut cx_a, |p, cx| p.share(cx))
1517            .await
1518            .unwrap();
1519
1520        // Join that worktree as clients B and C.
1521        let project_b = Project::remote(
1522            project_id,
1523            client_b.clone(),
1524            client_b.user_store.clone(),
1525            lang_registry.clone(),
1526            fs.clone(),
1527            &mut cx_b.to_async(),
1528        )
1529        .await
1530        .unwrap();
1531        let project_c = Project::remote(
1532            project_id,
1533            client_c.clone(),
1534            client_c.user_store.clone(),
1535            lang_registry.clone(),
1536            fs.clone(),
1537            &mut cx_c.to_async(),
1538        )
1539        .await
1540        .unwrap();
1541        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1542        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1543
1544        // Open and edit a buffer as both guests B and C.
1545        let buffer_b = project_b
1546            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1547            .await
1548            .unwrap();
1549        let buffer_c = project_c
1550            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1551            .await
1552            .unwrap();
1553        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1554        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1555
1556        // Open and edit that buffer as the host.
1557        let buffer_a = project_a
1558            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1559            .await
1560            .unwrap();
1561
1562        buffer_a
1563            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1564            .await;
1565        buffer_a.update(&mut cx_a, |buf, cx| {
1566            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1567        });
1568
1569        // Wait for edits to propagate
1570        buffer_a
1571            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1572            .await;
1573        buffer_b
1574            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1575            .await;
1576        buffer_c
1577            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1578            .await;
1579
1580        // Edit the buffer as the host and concurrently save as guest B.
1581        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1582        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1583        save_b.await.unwrap();
1584        assert_eq!(
1585            fs.load("/a/file1".as_ref()).await.unwrap(),
1586            "hi-a, i-am-c, i-am-b, i-am-a"
1587        );
1588        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1589        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1590        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1591
1592        // Make changes on host's file system, see those changes on guest worktrees.
1593        fs.rename(
1594            "/a/file1".as_ref(),
1595            "/a/file1-renamed".as_ref(),
1596            Default::default(),
1597        )
1598        .await
1599        .unwrap();
1600
1601        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1602            .await
1603            .unwrap();
1604        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1605
1606        worktree_a
1607            .condition(&cx_a, |tree, _| {
1608                tree.paths()
1609                    .map(|p| p.to_string_lossy())
1610                    .collect::<Vec<_>>()
1611                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1612            })
1613            .await;
1614        worktree_b
1615            .condition(&cx_b, |tree, _| {
1616                tree.paths()
1617                    .map(|p| p.to_string_lossy())
1618                    .collect::<Vec<_>>()
1619                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1620            })
1621            .await;
1622        worktree_c
1623            .condition(&cx_c, |tree, _| {
1624                tree.paths()
1625                    .map(|p| p.to_string_lossy())
1626                    .collect::<Vec<_>>()
1627                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1628            })
1629            .await;
1630
1631        // Ensure buffer files are updated as well.
1632        buffer_a
1633            .condition(&cx_a, |buf, _| {
1634                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1635            })
1636            .await;
1637        buffer_b
1638            .condition(&cx_b, |buf, _| {
1639                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1640            })
1641            .await;
1642        buffer_c
1643            .condition(&cx_c, |buf, _| {
1644                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1645            })
1646            .await;
1647    }
1648
1649    #[gpui::test(iterations = 10)]
1650    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1651        cx_a.foreground().forbid_parking();
1652        let lang_registry = Arc::new(LanguageRegistry::new());
1653        let fs = FakeFs::new(cx_a.background());
1654
1655        // Connect to a server as 2 clients.
1656        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1657        let client_a = server.create_client(&mut cx_a, "user_a").await;
1658        let client_b = server.create_client(&mut cx_b, "user_b").await;
1659
1660        // Share a project as client A
1661        fs.insert_tree(
1662            "/dir",
1663            json!({
1664                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1665                "a.txt": "a-contents",
1666            }),
1667        )
1668        .await;
1669
1670        let project_a = cx_a.update(|cx| {
1671            Project::local(
1672                client_a.clone(),
1673                client_a.user_store.clone(),
1674                lang_registry.clone(),
1675                fs.clone(),
1676                cx,
1677            )
1678        });
1679        let (worktree_a, _) = project_a
1680            .update(&mut cx_a, |p, cx| {
1681                p.find_or_create_local_worktree("/dir", false, cx)
1682            })
1683            .await
1684            .unwrap();
1685        worktree_a
1686            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1687            .await;
1688        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1689        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1690        project_a
1691            .update(&mut cx_a, |p, cx| p.share(cx))
1692            .await
1693            .unwrap();
1694
1695        // Join that project as client B
1696        let project_b = Project::remote(
1697            project_id,
1698            client_b.clone(),
1699            client_b.user_store.clone(),
1700            lang_registry.clone(),
1701            fs.clone(),
1702            &mut cx_b.to_async(),
1703        )
1704        .await
1705        .unwrap();
1706
1707        // Open a buffer as client B
1708        let buffer_b = project_b
1709            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1710            .await
1711            .unwrap();
1712
1713        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1714        buffer_b.read_with(&cx_b, |buf, _| {
1715            assert!(buf.is_dirty());
1716            assert!(!buf.has_conflict());
1717        });
1718
1719        buffer_b
1720            .update(&mut cx_b, |buf, cx| buf.save(cx))
1721            .await
1722            .unwrap();
1723        buffer_b
1724            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1725            .await;
1726        buffer_b.read_with(&cx_b, |buf, _| {
1727            assert!(!buf.has_conflict());
1728        });
1729
1730        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1731        buffer_b.read_with(&cx_b, |buf, _| {
1732            assert!(buf.is_dirty());
1733            assert!(!buf.has_conflict());
1734        });
1735    }
1736
1737    #[gpui::test(iterations = 10)]
1738    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1739        cx_a.foreground().forbid_parking();
1740        let lang_registry = Arc::new(LanguageRegistry::new());
1741        let fs = FakeFs::new(cx_a.background());
1742
1743        // Connect to a server as 2 clients.
1744        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1745        let client_a = server.create_client(&mut cx_a, "user_a").await;
1746        let client_b = server.create_client(&mut cx_b, "user_b").await;
1747
1748        // Share a project as client A
1749        fs.insert_tree(
1750            "/dir",
1751            json!({
1752                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1753                "a.txt": "a-contents",
1754            }),
1755        )
1756        .await;
1757
1758        let project_a = cx_a.update(|cx| {
1759            Project::local(
1760                client_a.clone(),
1761                client_a.user_store.clone(),
1762                lang_registry.clone(),
1763                fs.clone(),
1764                cx,
1765            )
1766        });
1767        let (worktree_a, _) = project_a
1768            .update(&mut cx_a, |p, cx| {
1769                p.find_or_create_local_worktree("/dir", false, cx)
1770            })
1771            .await
1772            .unwrap();
1773        worktree_a
1774            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1775            .await;
1776        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1777        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1778        project_a
1779            .update(&mut cx_a, |p, cx| p.share(cx))
1780            .await
1781            .unwrap();
1782
1783        // Join that project as client B
1784        let project_b = Project::remote(
1785            project_id,
1786            client_b.clone(),
1787            client_b.user_store.clone(),
1788            lang_registry.clone(),
1789            fs.clone(),
1790            &mut cx_b.to_async(),
1791        )
1792        .await
1793        .unwrap();
1794        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1795
1796        // Open a buffer as client B
1797        let buffer_b = project_b
1798            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1799            .await
1800            .unwrap();
1801        buffer_b.read_with(&cx_b, |buf, _| {
1802            assert!(!buf.is_dirty());
1803            assert!(!buf.has_conflict());
1804        });
1805
1806        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1807            .await
1808            .unwrap();
1809        buffer_b
1810            .condition(&cx_b, |buf, _| {
1811                buf.text() == "new contents" && !buf.is_dirty()
1812            })
1813            .await;
1814        buffer_b.read_with(&cx_b, |buf, _| {
1815            assert!(!buf.has_conflict());
1816        });
1817    }
1818
1819    #[gpui::test(iterations = 10)]
1820    async fn test_editing_while_guest_opens_buffer(
1821        mut cx_a: TestAppContext,
1822        mut cx_b: TestAppContext,
1823    ) {
1824        cx_a.foreground().forbid_parking();
1825        let lang_registry = Arc::new(LanguageRegistry::new());
1826        let fs = FakeFs::new(cx_a.background());
1827
1828        // Connect to a server as 2 clients.
1829        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1830        let client_a = server.create_client(&mut cx_a, "user_a").await;
1831        let client_b = server.create_client(&mut cx_b, "user_b").await;
1832
1833        // Share a project as client A
1834        fs.insert_tree(
1835            "/dir",
1836            json!({
1837                ".zed.toml": r#"collaborators = ["user_b"]"#,
1838                "a.txt": "a-contents",
1839            }),
1840        )
1841        .await;
1842        let project_a = cx_a.update(|cx| {
1843            Project::local(
1844                client_a.clone(),
1845                client_a.user_store.clone(),
1846                lang_registry.clone(),
1847                fs.clone(),
1848                cx,
1849            )
1850        });
1851        let (worktree_a, _) = project_a
1852            .update(&mut cx_a, |p, cx| {
1853                p.find_or_create_local_worktree("/dir", false, cx)
1854            })
1855            .await
1856            .unwrap();
1857        worktree_a
1858            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1859            .await;
1860        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1861        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1862        project_a
1863            .update(&mut cx_a, |p, cx| p.share(cx))
1864            .await
1865            .unwrap();
1866
1867        // Join that project as client B
1868        let project_b = Project::remote(
1869            project_id,
1870            client_b.clone(),
1871            client_b.user_store.clone(),
1872            lang_registry.clone(),
1873            fs.clone(),
1874            &mut cx_b.to_async(),
1875        )
1876        .await
1877        .unwrap();
1878
1879        // Open a buffer as client A
1880        let buffer_a = project_a
1881            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1882            .await
1883            .unwrap();
1884
1885        // Start opening the same buffer as client B
1886        let buffer_b = cx_b
1887            .background()
1888            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1889
1890        // Edit the buffer as client A while client B is still opening it.
1891        cx_b.background().simulate_random_delay().await;
1892        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1893        cx_b.background().simulate_random_delay().await;
1894        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1895
1896        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1897        let buffer_b = buffer_b.await.unwrap();
1898        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1899    }
1900
1901    #[gpui::test(iterations = 10)]
1902    async fn test_leaving_worktree_while_opening_buffer(
1903        mut cx_a: TestAppContext,
1904        mut cx_b: TestAppContext,
1905    ) {
1906        cx_a.foreground().forbid_parking();
1907        let lang_registry = Arc::new(LanguageRegistry::new());
1908        let fs = FakeFs::new(cx_a.background());
1909
1910        // Connect to a server as 2 clients.
1911        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1912        let client_a = server.create_client(&mut cx_a, "user_a").await;
1913        let client_b = server.create_client(&mut cx_b, "user_b").await;
1914
1915        // Share a project as client A
1916        fs.insert_tree(
1917            "/dir",
1918            json!({
1919                ".zed.toml": r#"collaborators = ["user_b"]"#,
1920                "a.txt": "a-contents",
1921            }),
1922        )
1923        .await;
1924        let project_a = cx_a.update(|cx| {
1925            Project::local(
1926                client_a.clone(),
1927                client_a.user_store.clone(),
1928                lang_registry.clone(),
1929                fs.clone(),
1930                cx,
1931            )
1932        });
1933        let (worktree_a, _) = project_a
1934            .update(&mut cx_a, |p, cx| {
1935                p.find_or_create_local_worktree("/dir", false, cx)
1936            })
1937            .await
1938            .unwrap();
1939        worktree_a
1940            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1941            .await;
1942        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1943        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1944        project_a
1945            .update(&mut cx_a, |p, cx| p.share(cx))
1946            .await
1947            .unwrap();
1948
1949        // Join that project as client B
1950        let project_b = Project::remote(
1951            project_id,
1952            client_b.clone(),
1953            client_b.user_store.clone(),
1954            lang_registry.clone(),
1955            fs.clone(),
1956            &mut cx_b.to_async(),
1957        )
1958        .await
1959        .unwrap();
1960
1961        // See that a guest has joined as client A.
1962        project_a
1963            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1964            .await;
1965
1966        // Begin opening a buffer as client B, but leave the project before the open completes.
1967        let buffer_b = cx_b
1968            .background()
1969            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1970        cx_b.update(|_| drop(project_b));
1971        drop(buffer_b);
1972
1973        // See that the guest has left.
1974        project_a
1975            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1976            .await;
1977    }
1978
1979    #[gpui::test(iterations = 10)]
1980    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1981        cx_a.foreground().forbid_parking();
1982        let lang_registry = Arc::new(LanguageRegistry::new());
1983        let fs = FakeFs::new(cx_a.background());
1984
1985        // Connect to a server as 2 clients.
1986        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1987        let client_a = server.create_client(&mut cx_a, "user_a").await;
1988        let client_b = server.create_client(&mut cx_b, "user_b").await;
1989
1990        // Share a project as client A
1991        fs.insert_tree(
1992            "/a",
1993            json!({
1994                ".zed.toml": r#"collaborators = ["user_b"]"#,
1995                "a.txt": "a-contents",
1996                "b.txt": "b-contents",
1997            }),
1998        )
1999        .await;
2000        let project_a = cx_a.update(|cx| {
2001            Project::local(
2002                client_a.clone(),
2003                client_a.user_store.clone(),
2004                lang_registry.clone(),
2005                fs.clone(),
2006                cx,
2007            )
2008        });
2009        let (worktree_a, _) = project_a
2010            .update(&mut cx_a, |p, cx| {
2011                p.find_or_create_local_worktree("/a", false, cx)
2012            })
2013            .await
2014            .unwrap();
2015        worktree_a
2016            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2017            .await;
2018        let project_id = project_a
2019            .update(&mut cx_a, |project, _| project.next_remote_id())
2020            .await;
2021        project_a
2022            .update(&mut cx_a, |project, cx| project.share(cx))
2023            .await
2024            .unwrap();
2025
2026        // Join that project as client B
2027        let _project_b = Project::remote(
2028            project_id,
2029            client_b.clone(),
2030            client_b.user_store.clone(),
2031            lang_registry.clone(),
2032            fs.clone(),
2033            &mut cx_b.to_async(),
2034        )
2035        .await
2036        .unwrap();
2037
2038        // See that a guest has joined as client A.
2039        project_a
2040            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2041            .await;
2042
2043        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2044        client_b.disconnect(&cx_b.to_async()).unwrap();
2045        project_a
2046            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2047            .await;
2048    }
2049
2050    #[gpui::test(iterations = 10)]
2051    async fn test_collaborating_with_diagnostics(
2052        mut cx_a: TestAppContext,
2053        mut cx_b: TestAppContext,
2054    ) {
2055        cx_a.foreground().forbid_parking();
2056        let mut lang_registry = Arc::new(LanguageRegistry::new());
2057        let fs = FakeFs::new(cx_a.background());
2058
2059        // Set up a fake language server.
2060        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2061        Arc::get_mut(&mut lang_registry)
2062            .unwrap()
2063            .add(Arc::new(Language::new(
2064                LanguageConfig {
2065                    name: "Rust".into(),
2066                    path_suffixes: vec!["rs".to_string()],
2067                    language_server: Some(language_server_config),
2068                    ..Default::default()
2069                },
2070                Some(tree_sitter_rust::language()),
2071            )));
2072
2073        // Connect to a server as 2 clients.
2074        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2075        let client_a = server.create_client(&mut cx_a, "user_a").await;
2076        let client_b = server.create_client(&mut cx_b, "user_b").await;
2077
2078        // Share a project as client A
2079        fs.insert_tree(
2080            "/a",
2081            json!({
2082                ".zed.toml": r#"collaborators = ["user_b"]"#,
2083                "a.rs": "let one = two",
2084                "other.rs": "",
2085            }),
2086        )
2087        .await;
2088        let project_a = cx_a.update(|cx| {
2089            Project::local(
2090                client_a.clone(),
2091                client_a.user_store.clone(),
2092                lang_registry.clone(),
2093                fs.clone(),
2094                cx,
2095            )
2096        });
2097        let (worktree_a, _) = project_a
2098            .update(&mut cx_a, |p, cx| {
2099                p.find_or_create_local_worktree("/a", false, cx)
2100            })
2101            .await
2102            .unwrap();
2103        worktree_a
2104            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2105            .await;
2106        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2107        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2108        project_a
2109            .update(&mut cx_a, |p, cx| p.share(cx))
2110            .await
2111            .unwrap();
2112
2113        // Cause the language server to start.
2114        let _ = cx_a
2115            .background()
2116            .spawn(project_a.update(&mut cx_a, |project, cx| {
2117                project.open_buffer(
2118                    ProjectPath {
2119                        worktree_id,
2120                        path: Path::new("other.rs").into(),
2121                    },
2122                    cx,
2123                )
2124            }))
2125            .await
2126            .unwrap();
2127
2128        // Simulate a language server reporting errors for a file.
2129        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2130        fake_language_server
2131            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2132                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2133                version: None,
2134                diagnostics: vec![lsp::Diagnostic {
2135                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2136                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2137                    message: "message 1".to_string(),
2138                    ..Default::default()
2139                }],
2140            })
2141            .await;
2142
2143        // Wait for server to see the diagnostics update.
2144        server
2145            .condition(|store| {
2146                let worktree = store
2147                    .project(project_id)
2148                    .unwrap()
2149                    .worktrees
2150                    .get(&worktree_id.to_proto())
2151                    .unwrap();
2152
2153                !worktree
2154                    .share
2155                    .as_ref()
2156                    .unwrap()
2157                    .diagnostic_summaries
2158                    .is_empty()
2159            })
2160            .await;
2161
2162        // Join the worktree as client B.
2163        let project_b = Project::remote(
2164            project_id,
2165            client_b.clone(),
2166            client_b.user_store.clone(),
2167            lang_registry.clone(),
2168            fs.clone(),
2169            &mut cx_b.to_async(),
2170        )
2171        .await
2172        .unwrap();
2173
2174        project_b.read_with(&cx_b, |project, cx| {
2175            assert_eq!(
2176                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2177                &[(
2178                    ProjectPath {
2179                        worktree_id,
2180                        path: Arc::from(Path::new("a.rs")),
2181                    },
2182                    DiagnosticSummary {
2183                        error_count: 1,
2184                        warning_count: 0,
2185                        ..Default::default()
2186                    },
2187                )]
2188            )
2189        });
2190
2191        // Simulate a language server reporting more errors for a file.
2192        fake_language_server
2193            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2194                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2195                version: None,
2196                diagnostics: vec![
2197                    lsp::Diagnostic {
2198                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2199                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2200                        message: "message 1".to_string(),
2201                        ..Default::default()
2202                    },
2203                    lsp::Diagnostic {
2204                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2205                        range: lsp::Range::new(
2206                            lsp::Position::new(0, 10),
2207                            lsp::Position::new(0, 13),
2208                        ),
2209                        message: "message 2".to_string(),
2210                        ..Default::default()
2211                    },
2212                ],
2213            })
2214            .await;
2215
2216        // Client b gets the updated summaries
2217        project_b
2218            .condition(&cx_b, |project, cx| {
2219                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2220                    == &[(
2221                        ProjectPath {
2222                            worktree_id,
2223                            path: Arc::from(Path::new("a.rs")),
2224                        },
2225                        DiagnosticSummary {
2226                            error_count: 1,
2227                            warning_count: 1,
2228                            ..Default::default()
2229                        },
2230                    )]
2231            })
2232            .await;
2233
2234        // Open the file with the errors on client B. They should be present.
2235        let buffer_b = cx_b
2236            .background()
2237            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2238            .await
2239            .unwrap();
2240
2241        buffer_b.read_with(&cx_b, |buffer, _| {
2242            assert_eq!(
2243                buffer
2244                    .snapshot()
2245                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2246                    .map(|entry| entry)
2247                    .collect::<Vec<_>>(),
2248                &[
2249                    DiagnosticEntry {
2250                        range: Point::new(0, 4)..Point::new(0, 7),
2251                        diagnostic: Diagnostic {
2252                            group_id: 0,
2253                            message: "message 1".to_string(),
2254                            severity: lsp::DiagnosticSeverity::ERROR,
2255                            is_primary: true,
2256                            ..Default::default()
2257                        }
2258                    },
2259                    DiagnosticEntry {
2260                        range: Point::new(0, 10)..Point::new(0, 13),
2261                        diagnostic: Diagnostic {
2262                            group_id: 1,
2263                            severity: lsp::DiagnosticSeverity::WARNING,
2264                            message: "message 2".to_string(),
2265                            is_primary: true,
2266                            ..Default::default()
2267                        }
2268                    }
2269                ]
2270            );
2271        });
2272    }
2273
2274    #[gpui::test(iterations = 10)]
2275    async fn test_collaborating_with_completion(
2276        mut cx_a: TestAppContext,
2277        mut cx_b: TestAppContext,
2278    ) {
2279        cx_a.foreground().forbid_parking();
2280        let mut lang_registry = Arc::new(LanguageRegistry::new());
2281        let fs = FakeFs::new(cx_a.background());
2282
2283        // Set up a fake language server.
2284        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2285        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2286            completion_provider: Some(lsp::CompletionOptions {
2287                trigger_characters: Some(vec![".".to_string()]),
2288                ..Default::default()
2289            }),
2290            ..Default::default()
2291        });
2292        Arc::get_mut(&mut lang_registry)
2293            .unwrap()
2294            .add(Arc::new(Language::new(
2295                LanguageConfig {
2296                    name: "Rust".into(),
2297                    path_suffixes: vec!["rs".to_string()],
2298                    language_server: Some(language_server_config),
2299                    ..Default::default()
2300                },
2301                Some(tree_sitter_rust::language()),
2302            )));
2303
2304        // Connect to a server as 2 clients.
2305        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2306        let client_a = server.create_client(&mut cx_a, "user_a").await;
2307        let client_b = server.create_client(&mut cx_b, "user_b").await;
2308
2309        // Share a project as client A
2310        fs.insert_tree(
2311            "/a",
2312            json!({
2313                ".zed.toml": r#"collaborators = ["user_b"]"#,
2314                "main.rs": "fn main() { a }",
2315                "other.rs": "",
2316            }),
2317        )
2318        .await;
2319        let project_a = cx_a.update(|cx| {
2320            Project::local(
2321                client_a.clone(),
2322                client_a.user_store.clone(),
2323                lang_registry.clone(),
2324                fs.clone(),
2325                cx,
2326            )
2327        });
2328        let (worktree_a, _) = project_a
2329            .update(&mut cx_a, |p, cx| {
2330                p.find_or_create_local_worktree("/a", false, cx)
2331            })
2332            .await
2333            .unwrap();
2334        worktree_a
2335            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2336            .await;
2337        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2338        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2339        project_a
2340            .update(&mut cx_a, |p, cx| p.share(cx))
2341            .await
2342            .unwrap();
2343
2344        // Join the worktree as client B.
2345        let project_b = Project::remote(
2346            project_id,
2347            client_b.clone(),
2348            client_b.user_store.clone(),
2349            lang_registry.clone(),
2350            fs.clone(),
2351            &mut cx_b.to_async(),
2352        )
2353        .await
2354        .unwrap();
2355
2356        // Open a file in an editor as the guest.
2357        let buffer_b = project_b
2358            .update(&mut cx_b, |p, cx| {
2359                p.open_buffer((worktree_id, "main.rs"), cx)
2360            })
2361            .await
2362            .unwrap();
2363        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2364        let editor_b = cx_b.add_view(window_b, |cx| {
2365            Editor::for_buffer(
2366                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2367                Arc::new(|cx| EditorSettings::test(cx)),
2368                Some(project_b.clone()),
2369                cx,
2370            )
2371        });
2372
2373        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2374        buffer_b
2375            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2376            .await;
2377
2378        // Type a completion trigger character as the guest.
2379        editor_b.update(&mut cx_b, |editor, cx| {
2380            editor.select_ranges([13..13], None, cx);
2381            editor.handle_input(&Input(".".into()), cx);
2382            cx.focus(&editor_b);
2383        });
2384
2385        // Receive a completion request as the host's language server.
2386        // Return some completions from the host's language server.
2387        cx_a.foreground().start_waiting();
2388        fake_language_server
2389            .handle_request::<lsp::request::Completion, _>(|params, _| {
2390                assert_eq!(
2391                    params.text_document_position.text_document.uri,
2392                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2393                );
2394                assert_eq!(
2395                    params.text_document_position.position,
2396                    lsp::Position::new(0, 14),
2397                );
2398
2399                Some(lsp::CompletionResponse::Array(vec![
2400                    lsp::CompletionItem {
2401                        label: "first_method(…)".into(),
2402                        detail: Some("fn(&mut self, B) -> C".into()),
2403                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2404                            new_text: "first_method($1)".to_string(),
2405                            range: lsp::Range::new(
2406                                lsp::Position::new(0, 14),
2407                                lsp::Position::new(0, 14),
2408                            ),
2409                        })),
2410                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2411                        ..Default::default()
2412                    },
2413                    lsp::CompletionItem {
2414                        label: "second_method(…)".into(),
2415                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2416                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2417                            new_text: "second_method()".to_string(),
2418                            range: lsp::Range::new(
2419                                lsp::Position::new(0, 14),
2420                                lsp::Position::new(0, 14),
2421                            ),
2422                        })),
2423                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2424                        ..Default::default()
2425                    },
2426                ]))
2427            })
2428            .next()
2429            .await
2430            .unwrap();
2431        cx_a.foreground().finish_waiting();
2432
2433        // Open the buffer on the host.
2434        let buffer_a = project_a
2435            .update(&mut cx_a, |p, cx| {
2436                p.open_buffer((worktree_id, "main.rs"), cx)
2437            })
2438            .await
2439            .unwrap();
2440        buffer_a
2441            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2442            .await;
2443
2444        // Confirm a completion on the guest.
2445        editor_b
2446            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2447            .await;
2448        editor_b.update(&mut cx_b, |editor, cx| {
2449            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2450            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2451        });
2452
2453        // Return a resolved completion from the host's language server.
2454        // The resolved completion has an additional text edit.
2455        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2456            |params, _| {
2457                assert_eq!(params.label, "first_method(…)");
2458                lsp::CompletionItem {
2459                    label: "first_method(…)".into(),
2460                    detail: Some("fn(&mut self, B) -> C".into()),
2461                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2462                        new_text: "first_method($1)".to_string(),
2463                        range: lsp::Range::new(
2464                            lsp::Position::new(0, 14),
2465                            lsp::Position::new(0, 14),
2466                        ),
2467                    })),
2468                    additional_text_edits: Some(vec![lsp::TextEdit {
2469                        new_text: "use d::SomeTrait;\n".to_string(),
2470                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2471                    }]),
2472                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2473                    ..Default::default()
2474                }
2475            },
2476        );
2477
2478        // The additional edit is applied.
2479        buffer_a
2480            .condition(&cx_a, |buffer, _| {
2481                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2482            })
2483            .await;
2484        buffer_b
2485            .condition(&cx_b, |buffer, _| {
2486                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2487            })
2488            .await;
2489    }
2490
2491    #[gpui::test(iterations = 10)]
2492    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2493        cx_a.foreground().forbid_parking();
2494        let mut lang_registry = Arc::new(LanguageRegistry::new());
2495        let fs = FakeFs::new(cx_a.background());
2496
2497        // Set up a fake language server.
2498        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2499        Arc::get_mut(&mut lang_registry)
2500            .unwrap()
2501            .add(Arc::new(Language::new(
2502                LanguageConfig {
2503                    name: "Rust".into(),
2504                    path_suffixes: vec!["rs".to_string()],
2505                    language_server: Some(language_server_config),
2506                    ..Default::default()
2507                },
2508                Some(tree_sitter_rust::language()),
2509            )));
2510
2511        // Connect to a server as 2 clients.
2512        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2513        let client_a = server.create_client(&mut cx_a, "user_a").await;
2514        let client_b = server.create_client(&mut cx_b, "user_b").await;
2515
2516        // Share a project as client A
2517        fs.insert_tree(
2518            "/a",
2519            json!({
2520                ".zed.toml": r#"collaborators = ["user_b"]"#,
2521                "a.rs": "let one = two",
2522            }),
2523        )
2524        .await;
2525        let project_a = cx_a.update(|cx| {
2526            Project::local(
2527                client_a.clone(),
2528                client_a.user_store.clone(),
2529                lang_registry.clone(),
2530                fs.clone(),
2531                cx,
2532            )
2533        });
2534        let (worktree_a, _) = project_a
2535            .update(&mut cx_a, |p, cx| {
2536                p.find_or_create_local_worktree("/a", false, cx)
2537            })
2538            .await
2539            .unwrap();
2540        worktree_a
2541            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2542            .await;
2543        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2544        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2545        project_a
2546            .update(&mut cx_a, |p, cx| p.share(cx))
2547            .await
2548            .unwrap();
2549
2550        // Join the worktree as client B.
2551        let project_b = Project::remote(
2552            project_id,
2553            client_b.clone(),
2554            client_b.user_store.clone(),
2555            lang_registry.clone(),
2556            fs.clone(),
2557            &mut cx_b.to_async(),
2558        )
2559        .await
2560        .unwrap();
2561
2562        let buffer_b = cx_b
2563            .background()
2564            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2565            .await
2566            .unwrap();
2567
2568        let format = project_b.update(&mut cx_b, |project, cx| {
2569            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2570        });
2571
2572        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2573        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2574            Some(vec![
2575                lsp::TextEdit {
2576                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2577                    new_text: "h".to_string(),
2578                },
2579                lsp::TextEdit {
2580                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2581                    new_text: "y".to_string(),
2582                },
2583            ])
2584        });
2585
2586        format.await.unwrap();
2587        assert_eq!(
2588            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2589            "let honey = two"
2590        );
2591    }
2592
2593    #[gpui::test(iterations = 10)]
2594    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2595        cx_a.foreground().forbid_parking();
2596        let mut lang_registry = Arc::new(LanguageRegistry::new());
2597        let fs = FakeFs::new(cx_a.background());
2598        fs.insert_tree(
2599            "/root-1",
2600            json!({
2601                ".zed.toml": r#"collaborators = ["user_b"]"#,
2602                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2603            }),
2604        )
2605        .await;
2606        fs.insert_tree(
2607            "/root-2",
2608            json!({
2609                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2610            }),
2611        )
2612        .await;
2613
2614        // Set up a fake language server.
2615        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2616        Arc::get_mut(&mut lang_registry)
2617            .unwrap()
2618            .add(Arc::new(Language::new(
2619                LanguageConfig {
2620                    name: "Rust".into(),
2621                    path_suffixes: vec!["rs".to_string()],
2622                    language_server: Some(language_server_config),
2623                    ..Default::default()
2624                },
2625                Some(tree_sitter_rust::language()),
2626            )));
2627
2628        // Connect to a server as 2 clients.
2629        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2630        let client_a = server.create_client(&mut cx_a, "user_a").await;
2631        let client_b = server.create_client(&mut cx_b, "user_b").await;
2632
2633        // Share a project as client A
2634        let project_a = cx_a.update(|cx| {
2635            Project::local(
2636                client_a.clone(),
2637                client_a.user_store.clone(),
2638                lang_registry.clone(),
2639                fs.clone(),
2640                cx,
2641            )
2642        });
2643        let (worktree_a, _) = project_a
2644            .update(&mut cx_a, |p, cx| {
2645                p.find_or_create_local_worktree("/root-1", false, cx)
2646            })
2647            .await
2648            .unwrap();
2649        worktree_a
2650            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2651            .await;
2652        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2653        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2654        project_a
2655            .update(&mut cx_a, |p, cx| p.share(cx))
2656            .await
2657            .unwrap();
2658
2659        // Join the worktree as client B.
2660        let project_b = Project::remote(
2661            project_id,
2662            client_b.clone(),
2663            client_b.user_store.clone(),
2664            lang_registry.clone(),
2665            fs.clone(),
2666            &mut cx_b.to_async(),
2667        )
2668        .await
2669        .unwrap();
2670
2671        // Open the file on client B.
2672        let buffer_b = cx_b
2673            .background()
2674            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2675            .await
2676            .unwrap();
2677
2678        // Request the definition of a symbol as the guest.
2679        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2680
2681        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2682        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2683            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2684                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2685                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2686            )))
2687        });
2688
2689        let definitions_1 = definitions_1.await.unwrap();
2690        cx_b.read(|cx| {
2691            assert_eq!(definitions_1.len(), 1);
2692            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2693            let target_buffer = definitions_1[0].buffer.read(cx);
2694            assert_eq!(
2695                target_buffer.text(),
2696                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2697            );
2698            assert_eq!(
2699                definitions_1[0].range.to_point(target_buffer),
2700                Point::new(0, 6)..Point::new(0, 9)
2701            );
2702        });
2703
2704        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2705        // the previous call to `definition`.
2706        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2707        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2708            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2709                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2710                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2711            )))
2712        });
2713
2714        let definitions_2 = definitions_2.await.unwrap();
2715        cx_b.read(|cx| {
2716            assert_eq!(definitions_2.len(), 1);
2717            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2718            let target_buffer = definitions_2[0].buffer.read(cx);
2719            assert_eq!(
2720                target_buffer.text(),
2721                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2722            );
2723            assert_eq!(
2724                definitions_2[0].range.to_point(target_buffer),
2725                Point::new(1, 6)..Point::new(1, 11)
2726            );
2727        });
2728        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2729
2730        cx_b.update(|_| {
2731            drop(definitions_1);
2732            drop(definitions_2);
2733        });
2734        project_b
2735            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2736            .await;
2737    }
2738
2739    #[gpui::test(iterations = 10)]
2740    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2741        cx_a.foreground().forbid_parking();
2742        let mut lang_registry = Arc::new(LanguageRegistry::new());
2743        let fs = FakeFs::new(cx_a.background());
2744        fs.insert_tree(
2745            "/root-1",
2746            json!({
2747                ".zed.toml": r#"collaborators = ["user_b"]"#,
2748                "one.rs": "const ONE: usize = 1;",
2749                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2750            }),
2751        )
2752        .await;
2753        fs.insert_tree(
2754            "/root-2",
2755            json!({
2756                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2757            }),
2758        )
2759        .await;
2760
2761        // Set up a fake language server.
2762        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2763        Arc::get_mut(&mut lang_registry)
2764            .unwrap()
2765            .add(Arc::new(Language::new(
2766                LanguageConfig {
2767                    name: "Rust".into(),
2768                    path_suffixes: vec!["rs".to_string()],
2769                    language_server: Some(language_server_config),
2770                    ..Default::default()
2771                },
2772                Some(tree_sitter_rust::language()),
2773            )));
2774
2775        // Connect to a server as 2 clients.
2776        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2777        let client_a = server.create_client(&mut cx_a, "user_a").await;
2778        let client_b = server.create_client(&mut cx_b, "user_b").await;
2779
2780        // Share a project as client A
2781        let project_a = cx_a.update(|cx| {
2782            Project::local(
2783                client_a.clone(),
2784                client_a.user_store.clone(),
2785                lang_registry.clone(),
2786                fs.clone(),
2787                cx,
2788            )
2789        });
2790        let (worktree_a, _) = project_a
2791            .update(&mut cx_a, |p, cx| {
2792                p.find_or_create_local_worktree("/root-1", false, cx)
2793            })
2794            .await
2795            .unwrap();
2796        worktree_a
2797            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2798            .await;
2799        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2800        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2801        project_a
2802            .update(&mut cx_a, |p, cx| p.share(cx))
2803            .await
2804            .unwrap();
2805
2806        // Join the worktree as client B.
2807        let project_b = Project::remote(
2808            project_id,
2809            client_b.clone(),
2810            client_b.user_store.clone(),
2811            lang_registry.clone(),
2812            fs.clone(),
2813            &mut cx_b.to_async(),
2814        )
2815        .await
2816        .unwrap();
2817
2818        // Open the file on client B.
2819        let buffer_b = cx_b
2820            .background()
2821            .spawn(project_b.update(&mut cx_b, |p, cx| {
2822                p.open_buffer((worktree_id, "one.rs"), cx)
2823            }))
2824            .await
2825            .unwrap();
2826
2827        // Request references to a symbol as the guest.
2828        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2829
2830        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2831        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2832            assert_eq!(
2833                params.text_document_position.text_document.uri.as_str(),
2834                "file:///root-1/one.rs"
2835            );
2836            Some(vec![
2837                lsp::Location {
2838                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2839                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2840                },
2841                lsp::Location {
2842                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2843                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2844                },
2845                lsp::Location {
2846                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2847                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2848                },
2849            ])
2850        });
2851
2852        let references = references.await.unwrap();
2853        cx_b.read(|cx| {
2854            assert_eq!(references.len(), 3);
2855            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2856
2857            let two_buffer = references[0].buffer.read(cx);
2858            let three_buffer = references[2].buffer.read(cx);
2859            assert_eq!(
2860                two_buffer.file().unwrap().path().as_ref(),
2861                Path::new("two.rs")
2862            );
2863            assert_eq!(references[1].buffer, references[0].buffer);
2864            assert_eq!(
2865                three_buffer.file().unwrap().full_path(cx),
2866                Path::new("three.rs")
2867            );
2868
2869            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2870            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2871            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2872        });
2873    }
2874
2875    #[gpui::test(iterations = 10)]
2876    async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2877        cx_a.foreground().forbid_parking();
2878        let lang_registry = Arc::new(LanguageRegistry::new());
2879        let fs = FakeFs::new(cx_a.background());
2880        fs.insert_tree(
2881            "/root-1",
2882            json!({
2883                ".zed.toml": r#"collaborators = ["user_b"]"#,
2884                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2885            }),
2886        )
2887        .await;
2888
2889        // Set up a fake language server.
2890        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2891        lang_registry.add(Arc::new(Language::new(
2892            LanguageConfig {
2893                name: "Rust".into(),
2894                path_suffixes: vec!["rs".to_string()],
2895                language_server: Some(language_server_config),
2896                ..Default::default()
2897            },
2898            Some(tree_sitter_rust::language()),
2899        )));
2900
2901        // Connect to a server as 2 clients.
2902        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2903        let client_a = server.create_client(&mut cx_a, "user_a").await;
2904        let client_b = server.create_client(&mut cx_b, "user_b").await;
2905
2906        // Share a project as client A
2907        let project_a = cx_a.update(|cx| {
2908            Project::local(
2909                client_a.clone(),
2910                client_a.user_store.clone(),
2911                lang_registry.clone(),
2912                fs.clone(),
2913                cx,
2914            )
2915        });
2916        let (worktree_a, _) = project_a
2917            .update(&mut cx_a, |p, cx| {
2918                p.find_or_create_local_worktree("/root-1", false, cx)
2919            })
2920            .await
2921            .unwrap();
2922        worktree_a
2923            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2924            .await;
2925        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2926        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2927        project_a
2928            .update(&mut cx_a, |p, cx| p.share(cx))
2929            .await
2930            .unwrap();
2931
2932        // Join the worktree as client B.
2933        let project_b = Project::remote(
2934            project_id,
2935            client_b.clone(),
2936            client_b.user_store.clone(),
2937            lang_registry.clone(),
2938            fs.clone(),
2939            &mut cx_b.to_async(),
2940        )
2941        .await
2942        .unwrap();
2943
2944        // Open the file on client B.
2945        let buffer_b = cx_b
2946            .background()
2947            .spawn(project_b.update(&mut cx_b, |p, cx| {
2948                p.open_buffer((worktree_id, "main.rs"), cx)
2949            }))
2950            .await
2951            .unwrap();
2952
2953        // Request document highlights as the guest.
2954        let highlights =
2955            project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2956
2957        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2958        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2959            |params, _| {
2960                assert_eq!(
2961                    params
2962                        .text_document_position_params
2963                        .text_document
2964                        .uri
2965                        .as_str(),
2966                    "file:///root-1/main.rs"
2967                );
2968                assert_eq!(
2969                    params.text_document_position_params.position,
2970                    lsp::Position::new(0, 34)
2971                );
2972                Some(vec![
2973                    lsp::DocumentHighlight {
2974                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2975                        range: lsp::Range::new(
2976                            lsp::Position::new(0, 10),
2977                            lsp::Position::new(0, 16),
2978                        ),
2979                    },
2980                    lsp::DocumentHighlight {
2981                        kind: Some(lsp::DocumentHighlightKind::READ),
2982                        range: lsp::Range::new(
2983                            lsp::Position::new(0, 32),
2984                            lsp::Position::new(0, 38),
2985                        ),
2986                    },
2987                    lsp::DocumentHighlight {
2988                        kind: Some(lsp::DocumentHighlightKind::READ),
2989                        range: lsp::Range::new(
2990                            lsp::Position::new(0, 41),
2991                            lsp::Position::new(0, 47),
2992                        ),
2993                    },
2994                ])
2995            },
2996        );
2997
2998        let highlights = highlights.await.unwrap();
2999        buffer_b.read_with(&cx_b, |buffer, _| {
3000            let snapshot = buffer.snapshot();
3001
3002            let highlights = highlights
3003                .into_iter()
3004                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3005                .collect::<Vec<_>>();
3006            assert_eq!(
3007                highlights,
3008                &[
3009                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3010                    (lsp::DocumentHighlightKind::READ, 32..38),
3011                    (lsp::DocumentHighlightKind::READ, 41..47)
3012                ]
3013            )
3014        });
3015    }
3016
3017    #[gpui::test(iterations = 10)]
3018    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3019        cx_a.foreground().forbid_parking();
3020        let mut lang_registry = Arc::new(LanguageRegistry::new());
3021        let fs = FakeFs::new(cx_a.background());
3022        fs.insert_tree(
3023            "/code",
3024            json!({
3025                "crate-1": {
3026                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3027                    "one.rs": "const ONE: usize = 1;",
3028                },
3029                "crate-2": {
3030                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3031                },
3032                "private": {
3033                    "passwords.txt": "the-password",
3034                }
3035            }),
3036        )
3037        .await;
3038
3039        // Set up a fake language server.
3040        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3041        Arc::get_mut(&mut lang_registry)
3042            .unwrap()
3043            .add(Arc::new(Language::new(
3044                LanguageConfig {
3045                    name: "Rust".into(),
3046                    path_suffixes: vec!["rs".to_string()],
3047                    language_server: Some(language_server_config),
3048                    ..Default::default()
3049                },
3050                Some(tree_sitter_rust::language()),
3051            )));
3052
3053        // Connect to a server as 2 clients.
3054        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3055        let client_a = server.create_client(&mut cx_a, "user_a").await;
3056        let client_b = server.create_client(&mut cx_b, "user_b").await;
3057
3058        // Share a project as client A
3059        let project_a = cx_a.update(|cx| {
3060            Project::local(
3061                client_a.clone(),
3062                client_a.user_store.clone(),
3063                lang_registry.clone(),
3064                fs.clone(),
3065                cx,
3066            )
3067        });
3068        let (worktree_a, _) = project_a
3069            .update(&mut cx_a, |p, cx| {
3070                p.find_or_create_local_worktree("/code/crate-1", false, cx)
3071            })
3072            .await
3073            .unwrap();
3074        worktree_a
3075            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3076            .await;
3077        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3078        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3079        project_a
3080            .update(&mut cx_a, |p, cx| p.share(cx))
3081            .await
3082            .unwrap();
3083
3084        // Join the worktree as client B.
3085        let project_b = Project::remote(
3086            project_id,
3087            client_b.clone(),
3088            client_b.user_store.clone(),
3089            lang_registry.clone(),
3090            fs.clone(),
3091            &mut cx_b.to_async(),
3092        )
3093        .await
3094        .unwrap();
3095
3096        // Cause the language server to start.
3097        let _buffer = cx_b
3098            .background()
3099            .spawn(project_b.update(&mut cx_b, |p, cx| {
3100                p.open_buffer((worktree_id, "one.rs"), cx)
3101            }))
3102            .await
3103            .unwrap();
3104
3105        // Request the definition of a symbol as the guest.
3106        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3107        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3108        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3109            #[allow(deprecated)]
3110            Some(vec![lsp::SymbolInformation {
3111                name: "TWO".into(),
3112                location: lsp::Location {
3113                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3114                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3115                },
3116                kind: lsp::SymbolKind::CONSTANT,
3117                tags: None,
3118                container_name: None,
3119                deprecated: None,
3120            }])
3121        });
3122
3123        let symbols = symbols.await.unwrap();
3124        assert_eq!(symbols.len(), 1);
3125        assert_eq!(symbols[0].name, "TWO");
3126
3127        // Open one of the returned symbols.
3128        let buffer_b_2 = project_b
3129            .update(&mut cx_b, |project, cx| {
3130                project.open_buffer_for_symbol(&symbols[0], cx)
3131            })
3132            .await
3133            .unwrap();
3134        buffer_b_2.read_with(&cx_b, |buffer, _| {
3135            assert_eq!(
3136                buffer.file().unwrap().path().as_ref(),
3137                Path::new("../crate-2/two.rs")
3138            );
3139        });
3140
3141        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3142        let mut fake_symbol = symbols[0].clone();
3143        fake_symbol.path = Path::new("/code/secrets").into();
3144        let error = project_b
3145            .update(&mut cx_b, |project, cx| {
3146                project.open_buffer_for_symbol(&fake_symbol, cx)
3147            })
3148            .await
3149            .unwrap_err();
3150        assert!(error.to_string().contains("invalid symbol signature"));
3151    }
3152
3153    #[gpui::test(iterations = 10)]
3154    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3155        mut cx_a: TestAppContext,
3156        mut cx_b: TestAppContext,
3157        mut rng: StdRng,
3158    ) {
3159        cx_a.foreground().forbid_parking();
3160        let mut lang_registry = Arc::new(LanguageRegistry::new());
3161        let fs = FakeFs::new(cx_a.background());
3162        fs.insert_tree(
3163            "/root",
3164            json!({
3165                ".zed.toml": r#"collaborators = ["user_b"]"#,
3166                "a.rs": "const ONE: usize = b::TWO;",
3167                "b.rs": "const TWO: usize = 2",
3168            }),
3169        )
3170        .await;
3171
3172        // Set up a fake language server.
3173        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3174
3175        Arc::get_mut(&mut lang_registry)
3176            .unwrap()
3177            .add(Arc::new(Language::new(
3178                LanguageConfig {
3179                    name: "Rust".into(),
3180                    path_suffixes: vec!["rs".to_string()],
3181                    language_server: Some(language_server_config),
3182                    ..Default::default()
3183                },
3184                Some(tree_sitter_rust::language()),
3185            )));
3186
3187        // Connect to a server as 2 clients.
3188        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3189        let client_a = server.create_client(&mut cx_a, "user_a").await;
3190        let client_b = server.create_client(&mut cx_b, "user_b").await;
3191
3192        // Share a project as client A
3193        let project_a = cx_a.update(|cx| {
3194            Project::local(
3195                client_a.clone(),
3196                client_a.user_store.clone(),
3197                lang_registry.clone(),
3198                fs.clone(),
3199                cx,
3200            )
3201        });
3202
3203        let (worktree_a, _) = project_a
3204            .update(&mut cx_a, |p, cx| {
3205                p.find_or_create_local_worktree("/root", false, cx)
3206            })
3207            .await
3208            .unwrap();
3209        worktree_a
3210            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3211            .await;
3212        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3213        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3214        project_a
3215            .update(&mut cx_a, |p, cx| p.share(cx))
3216            .await
3217            .unwrap();
3218
3219        // Join the worktree as client B.
3220        let project_b = Project::remote(
3221            project_id,
3222            client_b.clone(),
3223            client_b.user_store.clone(),
3224            lang_registry.clone(),
3225            fs.clone(),
3226            &mut cx_b.to_async(),
3227        )
3228        .await
3229        .unwrap();
3230
3231        let buffer_b1 = cx_b
3232            .background()
3233            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3234            .await
3235            .unwrap();
3236
3237        let definitions;
3238        let buffer_b2;
3239        if rng.gen() {
3240            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3241            buffer_b2 =
3242                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3243        } else {
3244            buffer_b2 =
3245                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3246            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3247        }
3248
3249        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3250        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3251            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3252                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3253                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3254            )))
3255        });
3256
3257        let buffer_b2 = buffer_b2.await.unwrap();
3258        let definitions = definitions.await.unwrap();
3259        assert_eq!(definitions.len(), 1);
3260        assert_eq!(definitions[0].buffer, buffer_b2);
3261    }
3262
3263    #[gpui::test(iterations = 10)]
3264    async fn test_collaborating_with_code_actions(
3265        mut cx_a: TestAppContext,
3266        mut cx_b: TestAppContext,
3267    ) {
3268        cx_a.foreground().forbid_parking();
3269        let mut lang_registry = Arc::new(LanguageRegistry::new());
3270        let fs = FakeFs::new(cx_a.background());
3271        let mut path_openers_b = Vec::new();
3272        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3273
3274        // Set up a fake language server.
3275        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3276        Arc::get_mut(&mut lang_registry)
3277            .unwrap()
3278            .add(Arc::new(Language::new(
3279                LanguageConfig {
3280                    name: "Rust".into(),
3281                    path_suffixes: vec!["rs".to_string()],
3282                    language_server: Some(language_server_config),
3283                    ..Default::default()
3284                },
3285                Some(tree_sitter_rust::language()),
3286            )));
3287
3288        // Connect to a server as 2 clients.
3289        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3290        let client_a = server.create_client(&mut cx_a, "user_a").await;
3291        let client_b = server.create_client(&mut cx_b, "user_b").await;
3292
3293        // Share a project as client A
3294        fs.insert_tree(
3295            "/a",
3296            json!({
3297                ".zed.toml": r#"collaborators = ["user_b"]"#,
3298                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3299                "other.rs": "pub fn foo() -> usize { 4 }",
3300            }),
3301        )
3302        .await;
3303        let project_a = cx_a.update(|cx| {
3304            Project::local(
3305                client_a.clone(),
3306                client_a.user_store.clone(),
3307                lang_registry.clone(),
3308                fs.clone(),
3309                cx,
3310            )
3311        });
3312        let (worktree_a, _) = project_a
3313            .update(&mut cx_a, |p, cx| {
3314                p.find_or_create_local_worktree("/a", false, cx)
3315            })
3316            .await
3317            .unwrap();
3318        worktree_a
3319            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3320            .await;
3321        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3322        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3323        project_a
3324            .update(&mut cx_a, |p, cx| p.share(cx))
3325            .await
3326            .unwrap();
3327
3328        // Join the worktree as client B.
3329        let project_b = Project::remote(
3330            project_id,
3331            client_b.clone(),
3332            client_b.user_store.clone(),
3333            lang_registry.clone(),
3334            fs.clone(),
3335            &mut cx_b.to_async(),
3336        )
3337        .await
3338        .unwrap();
3339        let mut params = cx_b.update(WorkspaceParams::test);
3340        params.languages = lang_registry.clone();
3341        params.client = client_b.client.clone();
3342        params.user_store = client_b.user_store.clone();
3343        params.project = project_b;
3344        params.path_openers = path_openers_b.into();
3345
3346        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3347        let editor_b = workspace_b
3348            .update(&mut cx_b, |workspace, cx| {
3349                workspace.open_path((worktree_id, "main.rs").into(), cx)
3350            })
3351            .await
3352            .unwrap()
3353            .downcast::<Editor>()
3354            .unwrap();
3355
3356        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3357        fake_language_server
3358            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3359                assert_eq!(
3360                    params.text_document.uri,
3361                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3362                );
3363                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3364                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3365                None
3366            })
3367            .next()
3368            .await;
3369
3370        // Move cursor to a location that contains code actions.
3371        editor_b.update(&mut cx_b, |editor, cx| {
3372            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3373            cx.focus(&editor_b);
3374        });
3375
3376        fake_language_server
3377            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3378                assert_eq!(
3379                    params.text_document.uri,
3380                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3381                );
3382                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3383                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3384
3385                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3386                    lsp::CodeAction {
3387                        title: "Inline into all callers".to_string(),
3388                        edit: Some(lsp::WorkspaceEdit {
3389                            changes: Some(
3390                                [
3391                                    (
3392                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3393                                        vec![lsp::TextEdit::new(
3394                                            lsp::Range::new(
3395                                                lsp::Position::new(1, 22),
3396                                                lsp::Position::new(1, 34),
3397                                            ),
3398                                            "4".to_string(),
3399                                        )],
3400                                    ),
3401                                    (
3402                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3403                                        vec![lsp::TextEdit::new(
3404                                            lsp::Range::new(
3405                                                lsp::Position::new(0, 0),
3406                                                lsp::Position::new(0, 27),
3407                                            ),
3408                                            "".to_string(),
3409                                        )],
3410                                    ),
3411                                ]
3412                                .into_iter()
3413                                .collect(),
3414                            ),
3415                            ..Default::default()
3416                        }),
3417                        data: Some(json!({
3418                            "codeActionParams": {
3419                                "range": {
3420                                    "start": {"line": 1, "column": 31},
3421                                    "end": {"line": 1, "column": 31},
3422                                }
3423                            }
3424                        })),
3425                        ..Default::default()
3426                    },
3427                )])
3428            })
3429            .next()
3430            .await;
3431
3432        // Toggle code actions and wait for them to display.
3433        editor_b.update(&mut cx_b, |editor, cx| {
3434            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3435        });
3436        editor_b
3437            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3438            .await;
3439
3440        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3441
3442        // Confirming the code action will trigger a resolve request.
3443        let confirm_action = workspace_b
3444            .update(&mut cx_b, |workspace, cx| {
3445                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3446            })
3447            .unwrap();
3448        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3449            lsp::CodeAction {
3450                title: "Inline into all callers".to_string(),
3451                edit: Some(lsp::WorkspaceEdit {
3452                    changes: Some(
3453                        [
3454                            (
3455                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3456                                vec![lsp::TextEdit::new(
3457                                    lsp::Range::new(
3458                                        lsp::Position::new(1, 22),
3459                                        lsp::Position::new(1, 34),
3460                                    ),
3461                                    "4".to_string(),
3462                                )],
3463                            ),
3464                            (
3465                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3466                                vec![lsp::TextEdit::new(
3467                                    lsp::Range::new(
3468                                        lsp::Position::new(0, 0),
3469                                        lsp::Position::new(0, 27),
3470                                    ),
3471                                    "".to_string(),
3472                                )],
3473                            ),
3474                        ]
3475                        .into_iter()
3476                        .collect(),
3477                    ),
3478                    ..Default::default()
3479                }),
3480                ..Default::default()
3481            }
3482        });
3483
3484        // After the action is confirmed, an editor containing both modified files is opened.
3485        confirm_action.await.unwrap();
3486        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3487            workspace
3488                .active_item(cx)
3489                .unwrap()
3490                .downcast::<Editor>()
3491                .unwrap()
3492        });
3493        code_action_editor.update(&mut cx_b, |editor, cx| {
3494            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3495            editor.undo(&Undo, cx);
3496            assert_eq!(
3497                editor.text(cx),
3498                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3499            );
3500            editor.redo(&Redo, cx);
3501            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3502        });
3503    }
3504
3505    #[gpui::test(iterations = 10)]
3506    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3507        cx_a.foreground().forbid_parking();
3508        let mut lang_registry = Arc::new(LanguageRegistry::new());
3509        let fs = FakeFs::new(cx_a.background());
3510        let mut path_openers_b = Vec::new();
3511        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3512
3513        // Set up a fake language server.
3514        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3515        Arc::get_mut(&mut lang_registry)
3516            .unwrap()
3517            .add(Arc::new(Language::new(
3518                LanguageConfig {
3519                    name: "Rust".into(),
3520                    path_suffixes: vec!["rs".to_string()],
3521                    language_server: Some(language_server_config),
3522                    ..Default::default()
3523                },
3524                Some(tree_sitter_rust::language()),
3525            )));
3526
3527        // Connect to a server as 2 clients.
3528        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3529        let client_a = server.create_client(&mut cx_a, "user_a").await;
3530        let client_b = server.create_client(&mut cx_b, "user_b").await;
3531
3532        // Share a project as client A
3533        fs.insert_tree(
3534            "/dir",
3535            json!({
3536                ".zed.toml": r#"collaborators = ["user_b"]"#,
3537                "one.rs": "const ONE: usize = 1;",
3538                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3539            }),
3540        )
3541        .await;
3542        let project_a = cx_a.update(|cx| {
3543            Project::local(
3544                client_a.clone(),
3545                client_a.user_store.clone(),
3546                lang_registry.clone(),
3547                fs.clone(),
3548                cx,
3549            )
3550        });
3551        let (worktree_a, _) = project_a
3552            .update(&mut cx_a, |p, cx| {
3553                p.find_or_create_local_worktree("/dir", false, cx)
3554            })
3555            .await
3556            .unwrap();
3557        worktree_a
3558            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3559            .await;
3560        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3561        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3562        project_a
3563            .update(&mut cx_a, |p, cx| p.share(cx))
3564            .await
3565            .unwrap();
3566
3567        // Join the worktree as client B.
3568        let project_b = Project::remote(
3569            project_id,
3570            client_b.clone(),
3571            client_b.user_store.clone(),
3572            lang_registry.clone(),
3573            fs.clone(),
3574            &mut cx_b.to_async(),
3575        )
3576        .await
3577        .unwrap();
3578        let mut params = cx_b.update(WorkspaceParams::test);
3579        params.languages = lang_registry.clone();
3580        params.client = client_b.client.clone();
3581        params.user_store = client_b.user_store.clone();
3582        params.project = project_b;
3583        params.path_openers = path_openers_b.into();
3584
3585        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3586        let editor_b = workspace_b
3587            .update(&mut cx_b, |workspace, cx| {
3588                workspace.open_path((worktree_id, "one.rs").into(), cx)
3589            })
3590            .await
3591            .unwrap()
3592            .downcast::<Editor>()
3593            .unwrap();
3594        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3595
3596        // Move cursor to a location that can be renamed.
3597        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3598            editor.select_ranges([7..7], None, cx);
3599            editor.rename(&Rename, cx).unwrap()
3600        });
3601
3602        fake_language_server
3603            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3604                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3605                assert_eq!(params.position, lsp::Position::new(0, 7));
3606                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3607                    lsp::Position::new(0, 6),
3608                    lsp::Position::new(0, 9),
3609                )))
3610            })
3611            .next()
3612            .await
3613            .unwrap();
3614        prepare_rename.await.unwrap();
3615        editor_b.update(&mut cx_b, |editor, cx| {
3616            let rename = editor.pending_rename().unwrap();
3617            let buffer = editor.buffer().read(cx).snapshot(cx);
3618            assert_eq!(
3619                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3620                6..9
3621            );
3622            rename.editor.update(cx, |rename_editor, cx| {
3623                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3624                    rename_buffer.edit([0..3], "THREE", cx);
3625                });
3626            });
3627        });
3628
3629        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3630            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3631        });
3632        fake_language_server
3633            .handle_request::<lsp::request::Rename, _>(|params, _| {
3634                assert_eq!(
3635                    params.text_document_position.text_document.uri.as_str(),
3636                    "file:///dir/one.rs"
3637                );
3638                assert_eq!(
3639                    params.text_document_position.position,
3640                    lsp::Position::new(0, 6)
3641                );
3642                assert_eq!(params.new_name, "THREE");
3643                Some(lsp::WorkspaceEdit {
3644                    changes: Some(
3645                        [
3646                            (
3647                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3648                                vec![lsp::TextEdit::new(
3649                                    lsp::Range::new(
3650                                        lsp::Position::new(0, 6),
3651                                        lsp::Position::new(0, 9),
3652                                    ),
3653                                    "THREE".to_string(),
3654                                )],
3655                            ),
3656                            (
3657                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3658                                vec![
3659                                    lsp::TextEdit::new(
3660                                        lsp::Range::new(
3661                                            lsp::Position::new(0, 24),
3662                                            lsp::Position::new(0, 27),
3663                                        ),
3664                                        "THREE".to_string(),
3665                                    ),
3666                                    lsp::TextEdit::new(
3667                                        lsp::Range::new(
3668                                            lsp::Position::new(0, 35),
3669                                            lsp::Position::new(0, 38),
3670                                        ),
3671                                        "THREE".to_string(),
3672                                    ),
3673                                ],
3674                            ),
3675                        ]
3676                        .into_iter()
3677                        .collect(),
3678                    ),
3679                    ..Default::default()
3680                })
3681            })
3682            .next()
3683            .await
3684            .unwrap();
3685        confirm_rename.await.unwrap();
3686
3687        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3688            workspace
3689                .active_item(cx)
3690                .unwrap()
3691                .downcast::<Editor>()
3692                .unwrap()
3693        });
3694        rename_editor.update(&mut cx_b, |editor, cx| {
3695            assert_eq!(
3696                editor.text(cx),
3697                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3698            );
3699            editor.undo(&Undo, cx);
3700            assert_eq!(
3701                editor.text(cx),
3702                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3703            );
3704            editor.redo(&Redo, cx);
3705            assert_eq!(
3706                editor.text(cx),
3707                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3708            );
3709        });
3710
3711        // Ensure temporary rename edits cannot be undone/redone.
3712        editor_b.update(&mut cx_b, |editor, cx| {
3713            editor.undo(&Undo, cx);
3714            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3715            editor.undo(&Undo, cx);
3716            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3717            editor.redo(&Redo, cx);
3718            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3719        })
3720    }
3721
3722    #[gpui::test(iterations = 10)]
3723    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3724        cx_a.foreground().forbid_parking();
3725
3726        // Connect to a server as 2 clients.
3727        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3728        let client_a = server.create_client(&mut cx_a, "user_a").await;
3729        let client_b = server.create_client(&mut cx_b, "user_b").await;
3730
3731        // Create an org that includes these 2 users.
3732        let db = &server.app_state.db;
3733        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3734        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3735            .await
3736            .unwrap();
3737        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3738            .await
3739            .unwrap();
3740
3741        // Create a channel that includes all the users.
3742        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3743        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3744            .await
3745            .unwrap();
3746        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3747            .await
3748            .unwrap();
3749        db.create_channel_message(
3750            channel_id,
3751            client_b.current_user_id(&cx_b),
3752            "hello A, it's B.",
3753            OffsetDateTime::now_utc(),
3754            1,
3755        )
3756        .await
3757        .unwrap();
3758
3759        let channels_a = cx_a
3760            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3761        channels_a
3762            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3763            .await;
3764        channels_a.read_with(&cx_a, |list, _| {
3765            assert_eq!(
3766                list.available_channels().unwrap(),
3767                &[ChannelDetails {
3768                    id: channel_id.to_proto(),
3769                    name: "test-channel".to_string()
3770                }]
3771            )
3772        });
3773        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3774            this.get_channel(channel_id.to_proto(), cx).unwrap()
3775        });
3776        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3777        channel_a
3778            .condition(&cx_a, |channel, _| {
3779                channel_messages(channel)
3780                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3781            })
3782            .await;
3783
3784        let channels_b = cx_b
3785            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3786        channels_b
3787            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3788            .await;
3789        channels_b.read_with(&cx_b, |list, _| {
3790            assert_eq!(
3791                list.available_channels().unwrap(),
3792                &[ChannelDetails {
3793                    id: channel_id.to_proto(),
3794                    name: "test-channel".to_string()
3795                }]
3796            )
3797        });
3798
3799        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3800            this.get_channel(channel_id.to_proto(), cx).unwrap()
3801        });
3802        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3803        channel_b
3804            .condition(&cx_b, |channel, _| {
3805                channel_messages(channel)
3806                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3807            })
3808            .await;
3809
3810        channel_a
3811            .update(&mut cx_a, |channel, cx| {
3812                channel
3813                    .send_message("oh, hi B.".to_string(), cx)
3814                    .unwrap()
3815                    .detach();
3816                let task = channel.send_message("sup".to_string(), cx).unwrap();
3817                assert_eq!(
3818                    channel_messages(channel),
3819                    &[
3820                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3821                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3822                        ("user_a".to_string(), "sup".to_string(), true)
3823                    ]
3824                );
3825                task
3826            })
3827            .await
3828            .unwrap();
3829
3830        channel_b
3831            .condition(&cx_b, |channel, _| {
3832                channel_messages(channel)
3833                    == [
3834                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3835                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3836                        ("user_a".to_string(), "sup".to_string(), false),
3837                    ]
3838            })
3839            .await;
3840
3841        assert_eq!(
3842            server
3843                .state()
3844                .await
3845                .channel(channel_id)
3846                .unwrap()
3847                .connection_ids
3848                .len(),
3849            2
3850        );
3851        cx_b.update(|_| drop(channel_b));
3852        server
3853            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3854            .await;
3855
3856        cx_a.update(|_| drop(channel_a));
3857        server
3858            .condition(|state| state.channel(channel_id).is_none())
3859            .await;
3860    }
3861
3862    #[gpui::test(iterations = 10)]
3863    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3864        cx_a.foreground().forbid_parking();
3865
3866        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3867        let client_a = server.create_client(&mut cx_a, "user_a").await;
3868
3869        let db = &server.app_state.db;
3870        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3871        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3872        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3873            .await
3874            .unwrap();
3875        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3876            .await
3877            .unwrap();
3878
3879        let channels_a = cx_a
3880            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3881        channels_a
3882            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3883            .await;
3884        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3885            this.get_channel(channel_id.to_proto(), cx).unwrap()
3886        });
3887
3888        // Messages aren't allowed to be too long.
3889        channel_a
3890            .update(&mut cx_a, |channel, cx| {
3891                let long_body = "this is long.\n".repeat(1024);
3892                channel.send_message(long_body, cx).unwrap()
3893            })
3894            .await
3895            .unwrap_err();
3896
3897        // Messages aren't allowed to be blank.
3898        channel_a.update(&mut cx_a, |channel, cx| {
3899            channel.send_message(String::new(), cx).unwrap_err()
3900        });
3901
3902        // Leading and trailing whitespace are trimmed.
3903        channel_a
3904            .update(&mut cx_a, |channel, cx| {
3905                channel
3906                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3907                    .unwrap()
3908            })
3909            .await
3910            .unwrap();
3911        assert_eq!(
3912            db.get_channel_messages(channel_id, 10, None)
3913                .await
3914                .unwrap()
3915                .iter()
3916                .map(|m| &m.body)
3917                .collect::<Vec<_>>(),
3918            &["surrounded by whitespace"]
3919        );
3920    }
3921
3922    #[gpui::test(iterations = 10)]
3923    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3924        cx_a.foreground().forbid_parking();
3925
3926        // Connect to a server as 2 clients.
3927        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3928        let client_a = server.create_client(&mut cx_a, "user_a").await;
3929        let client_b = server.create_client(&mut cx_b, "user_b").await;
3930        let mut status_b = client_b.status();
3931
3932        // Create an org that includes these 2 users.
3933        let db = &server.app_state.db;
3934        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3935        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3936            .await
3937            .unwrap();
3938        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3939            .await
3940            .unwrap();
3941
3942        // Create a channel that includes all the users.
3943        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3944        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3945            .await
3946            .unwrap();
3947        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3948            .await
3949            .unwrap();
3950        db.create_channel_message(
3951            channel_id,
3952            client_b.current_user_id(&cx_b),
3953            "hello A, it's B.",
3954            OffsetDateTime::now_utc(),
3955            2,
3956        )
3957        .await
3958        .unwrap();
3959
3960        let channels_a = cx_a
3961            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3962        channels_a
3963            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3964            .await;
3965
3966        channels_a.read_with(&cx_a, |list, _| {
3967            assert_eq!(
3968                list.available_channels().unwrap(),
3969                &[ChannelDetails {
3970                    id: channel_id.to_proto(),
3971                    name: "test-channel".to_string()
3972                }]
3973            )
3974        });
3975        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3976            this.get_channel(channel_id.to_proto(), cx).unwrap()
3977        });
3978        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3979        channel_a
3980            .condition(&cx_a, |channel, _| {
3981                channel_messages(channel)
3982                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3983            })
3984            .await;
3985
3986        let channels_b = cx_b
3987            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3988        channels_b
3989            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3990            .await;
3991        channels_b.read_with(&cx_b, |list, _| {
3992            assert_eq!(
3993                list.available_channels().unwrap(),
3994                &[ChannelDetails {
3995                    id: channel_id.to_proto(),
3996                    name: "test-channel".to_string()
3997                }]
3998            )
3999        });
4000
4001        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
4002            this.get_channel(channel_id.to_proto(), cx).unwrap()
4003        });
4004        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
4005        channel_b
4006            .condition(&cx_b, |channel, _| {
4007                channel_messages(channel)
4008                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4009            })
4010            .await;
4011
4012        // Disconnect client B, ensuring we can still access its cached channel data.
4013        server.forbid_connections();
4014        server.disconnect_client(client_b.current_user_id(&cx_b));
4015        while !matches!(
4016            status_b.next().await,
4017            Some(client::Status::ReconnectionError { .. })
4018        ) {}
4019
4020        channels_b.read_with(&cx_b, |channels, _| {
4021            assert_eq!(
4022                channels.available_channels().unwrap(),
4023                [ChannelDetails {
4024                    id: channel_id.to_proto(),
4025                    name: "test-channel".to_string()
4026                }]
4027            )
4028        });
4029        channel_b.read_with(&cx_b, |channel, _| {
4030            assert_eq!(
4031                channel_messages(channel),
4032                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4033            )
4034        });
4035
4036        // Send a message from client B while it is disconnected.
4037        channel_b
4038            .update(&mut cx_b, |channel, cx| {
4039                let task = channel
4040                    .send_message("can you see this?".to_string(), cx)
4041                    .unwrap();
4042                assert_eq!(
4043                    channel_messages(channel),
4044                    &[
4045                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4046                        ("user_b".to_string(), "can you see this?".to_string(), true)
4047                    ]
4048                );
4049                task
4050            })
4051            .await
4052            .unwrap_err();
4053
4054        // Send a message from client A while B is disconnected.
4055        channel_a
4056            .update(&mut cx_a, |channel, cx| {
4057                channel
4058                    .send_message("oh, hi B.".to_string(), cx)
4059                    .unwrap()
4060                    .detach();
4061                let task = channel.send_message("sup".to_string(), cx).unwrap();
4062                assert_eq!(
4063                    channel_messages(channel),
4064                    &[
4065                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4066                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4067                        ("user_a".to_string(), "sup".to_string(), true)
4068                    ]
4069                );
4070                task
4071            })
4072            .await
4073            .unwrap();
4074
4075        // Give client B a chance to reconnect.
4076        server.allow_connections();
4077        cx_b.foreground().advance_clock(Duration::from_secs(10));
4078
4079        // Verify that B sees the new messages upon reconnection, as well as the message client B
4080        // sent while offline.
4081        channel_b
4082            .condition(&cx_b, |channel, _| {
4083                channel_messages(channel)
4084                    == [
4085                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4086                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4087                        ("user_a".to_string(), "sup".to_string(), false),
4088                        ("user_b".to_string(), "can you see this?".to_string(), false),
4089                    ]
4090            })
4091            .await;
4092
4093        // Ensure client A and B can communicate normally after reconnection.
4094        channel_a
4095            .update(&mut cx_a, |channel, cx| {
4096                channel.send_message("you online?".to_string(), cx).unwrap()
4097            })
4098            .await
4099            .unwrap();
4100        channel_b
4101            .condition(&cx_b, |channel, _| {
4102                channel_messages(channel)
4103                    == [
4104                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4105                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4106                        ("user_a".to_string(), "sup".to_string(), false),
4107                        ("user_b".to_string(), "can you see this?".to_string(), false),
4108                        ("user_a".to_string(), "you online?".to_string(), false),
4109                    ]
4110            })
4111            .await;
4112
4113        channel_b
4114            .update(&mut cx_b, |channel, cx| {
4115                channel.send_message("yep".to_string(), cx).unwrap()
4116            })
4117            .await
4118            .unwrap();
4119        channel_a
4120            .condition(&cx_a, |channel, _| {
4121                channel_messages(channel)
4122                    == [
4123                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4124                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4125                        ("user_a".to_string(), "sup".to_string(), false),
4126                        ("user_b".to_string(), "can you see this?".to_string(), false),
4127                        ("user_a".to_string(), "you online?".to_string(), false),
4128                        ("user_b".to_string(), "yep".to_string(), false),
4129                    ]
4130            })
4131            .await;
4132    }
4133
4134    #[gpui::test(iterations = 10)]
4135    async fn test_contacts(
4136        mut cx_a: TestAppContext,
4137        mut cx_b: TestAppContext,
4138        mut cx_c: TestAppContext,
4139    ) {
4140        cx_a.foreground().forbid_parking();
4141        let lang_registry = Arc::new(LanguageRegistry::new());
4142        let fs = FakeFs::new(cx_a.background());
4143
4144        // Connect to a server as 3 clients.
4145        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4146        let client_a = server.create_client(&mut cx_a, "user_a").await;
4147        let client_b = server.create_client(&mut cx_b, "user_b").await;
4148        let client_c = server.create_client(&mut cx_c, "user_c").await;
4149
4150        // Share a worktree as client A.
4151        fs.insert_tree(
4152            "/a",
4153            json!({
4154                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4155            }),
4156        )
4157        .await;
4158
4159        let project_a = cx_a.update(|cx| {
4160            Project::local(
4161                client_a.clone(),
4162                client_a.user_store.clone(),
4163                lang_registry.clone(),
4164                fs.clone(),
4165                cx,
4166            )
4167        });
4168        let (worktree_a, _) = project_a
4169            .update(&mut cx_a, |p, cx| {
4170                p.find_or_create_local_worktree("/a", false, cx)
4171            })
4172            .await
4173            .unwrap();
4174        worktree_a
4175            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4176            .await;
4177
4178        client_a
4179            .user_store
4180            .condition(&cx_a, |user_store, _| {
4181                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4182            })
4183            .await;
4184        client_b
4185            .user_store
4186            .condition(&cx_b, |user_store, _| {
4187                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4188            })
4189            .await;
4190        client_c
4191            .user_store
4192            .condition(&cx_c, |user_store, _| {
4193                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4194            })
4195            .await;
4196
4197        let project_id = project_a
4198            .update(&mut cx_a, |project, _| project.next_remote_id())
4199            .await;
4200        project_a
4201            .update(&mut cx_a, |project, cx| project.share(cx))
4202            .await
4203            .unwrap();
4204
4205        let _project_b = Project::remote(
4206            project_id,
4207            client_b.clone(),
4208            client_b.user_store.clone(),
4209            lang_registry.clone(),
4210            fs.clone(),
4211            &mut cx_b.to_async(),
4212        )
4213        .await
4214        .unwrap();
4215
4216        client_a
4217            .user_store
4218            .condition(&cx_a, |user_store, _| {
4219                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4220            })
4221            .await;
4222        client_b
4223            .user_store
4224            .condition(&cx_b, |user_store, _| {
4225                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4226            })
4227            .await;
4228        client_c
4229            .user_store
4230            .condition(&cx_c, |user_store, _| {
4231                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4232            })
4233            .await;
4234
4235        project_a
4236            .condition(&cx_a, |project, _| {
4237                project.collaborators().contains_key(&client_b.peer_id)
4238            })
4239            .await;
4240
4241        cx_a.update(move |_| drop(project_a));
4242        client_a
4243            .user_store
4244            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4245            .await;
4246        client_b
4247            .user_store
4248            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4249            .await;
4250        client_c
4251            .user_store
4252            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4253            .await;
4254
4255        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4256            user_store
4257                .contacts()
4258                .iter()
4259                .map(|contact| {
4260                    let worktrees = contact
4261                        .projects
4262                        .iter()
4263                        .map(|p| {
4264                            (
4265                                p.worktree_root_names[0].as_str(),
4266                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4267                            )
4268                        })
4269                        .collect();
4270                    (contact.user.github_login.as_str(), worktrees)
4271                })
4272                .collect()
4273        }
4274    }
4275
4276    #[gpui::test(iterations = 100)]
4277    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4278        cx.foreground().forbid_parking();
4279        let max_peers = env::var("MAX_PEERS")
4280            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4281            .unwrap_or(5);
4282        let max_operations = env::var("OPERATIONS")
4283            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4284            .unwrap_or(10);
4285
4286        let rng = Arc::new(Mutex::new(rng));
4287
4288        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4289        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4290
4291        let fs = FakeFs::new(cx.background());
4292        fs.insert_tree(
4293            "/_collab",
4294            json!({
4295                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4296            }),
4297        )
4298        .await;
4299
4300        let operations = Rc::new(Cell::new(0));
4301        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4302        let mut clients = Vec::new();
4303
4304        let mut next_entity_id = 100000;
4305        let mut host_cx = TestAppContext::new(
4306            cx.foreground_platform(),
4307            cx.platform(),
4308            cx.foreground(),
4309            cx.background(),
4310            cx.font_cache(),
4311            next_entity_id,
4312        );
4313        let host = server.create_client(&mut host_cx, "host").await;
4314        let host_project = host_cx.update(|cx| {
4315            Project::local(
4316                host.client.clone(),
4317                host.user_store.clone(),
4318                Arc::new(LanguageRegistry::new()),
4319                fs.clone(),
4320                cx,
4321            )
4322        });
4323        let host_project_id = host_project
4324            .update(&mut host_cx, |p, _| p.next_remote_id())
4325            .await;
4326
4327        let (collab_worktree, _) = host_project
4328            .update(&mut host_cx, |project, cx| {
4329                project.find_or_create_local_worktree("/_collab", false, cx)
4330            })
4331            .await
4332            .unwrap();
4333        collab_worktree
4334            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4335            .await;
4336        host_project
4337            .update(&mut host_cx, |project, cx| project.share(cx))
4338            .await
4339            .unwrap();
4340
4341        clients.push(cx.foreground().spawn(host.simulate_host(
4342            host_project.clone(),
4343            language_server_config,
4344            operations.clone(),
4345            max_operations,
4346            rng.clone(),
4347            host_cx.clone(),
4348        )));
4349
4350        while operations.get() < max_operations {
4351            cx.background().simulate_random_delay().await;
4352            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4353                operations.set(operations.get() + 1);
4354
4355                let guest_id = clients.len();
4356                log::info!("Adding guest {}", guest_id);
4357                next_entity_id += 100000;
4358                let mut guest_cx = TestAppContext::new(
4359                    cx.foreground_platform(),
4360                    cx.platform(),
4361                    cx.foreground(),
4362                    cx.background(),
4363                    cx.font_cache(),
4364                    next_entity_id,
4365                );
4366                let guest = server
4367                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4368                    .await;
4369                let guest_project = Project::remote(
4370                    host_project_id,
4371                    guest.client.clone(),
4372                    guest.user_store.clone(),
4373                    guest_lang_registry.clone(),
4374                    fs.clone(),
4375                    &mut guest_cx.to_async(),
4376                )
4377                .await
4378                .unwrap();
4379                clients.push(cx.foreground().spawn(guest.simulate_guest(
4380                    guest_id,
4381                    guest_project,
4382                    operations.clone(),
4383                    max_operations,
4384                    rng.clone(),
4385                    guest_cx,
4386                )));
4387
4388                log::info!("Guest {} added", guest_id);
4389            }
4390        }
4391
4392        let clients = futures::future::join_all(clients).await;
4393        cx.foreground().run_until_parked();
4394
4395        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4396            project
4397                .worktrees(cx)
4398                .map(|worktree| {
4399                    let snapshot = worktree.read(cx).snapshot();
4400                    (snapshot.id(), snapshot)
4401                })
4402                .collect::<BTreeMap<_, _>>()
4403        });
4404
4405        for (guest_client, guest_cx) in clients.iter().skip(1) {
4406            let guest_id = guest_client.client.id();
4407            let worktree_snapshots =
4408                guest_client
4409                    .project
4410                    .as_ref()
4411                    .unwrap()
4412                    .read_with(guest_cx, |project, cx| {
4413                        project
4414                            .worktrees(cx)
4415                            .map(|worktree| {
4416                                let worktree = worktree.read(cx);
4417                                assert!(
4418                                    !worktree.as_remote().unwrap().has_pending_updates(),
4419                                    "Guest {} worktree {:?} contains deferred updates",
4420                                    guest_id,
4421                                    worktree.id()
4422                                );
4423                                (worktree.id(), worktree.snapshot())
4424                            })
4425                            .collect::<BTreeMap<_, _>>()
4426                    });
4427
4428            assert_eq!(
4429                worktree_snapshots.keys().collect::<Vec<_>>(),
4430                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4431                "guest {} has different worktrees than the host",
4432                guest_id
4433            );
4434            for (id, host_snapshot) in &host_worktree_snapshots {
4435                let guest_snapshot = &worktree_snapshots[id];
4436                assert_eq!(
4437                    guest_snapshot.root_name(),
4438                    host_snapshot.root_name(),
4439                    "guest {} has different root name than the host for worktree {}",
4440                    guest_id,
4441                    id
4442                );
4443                assert_eq!(
4444                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4445                    host_snapshot.entries(false).collect::<Vec<_>>(),
4446                    "guest {} has different snapshot than the host for worktree {}",
4447                    guest_id,
4448                    id
4449                );
4450            }
4451
4452            guest_client
4453                .project
4454                .as_ref()
4455                .unwrap()
4456                .read_with(guest_cx, |project, _| {
4457                    assert!(
4458                        !project.has_buffered_operations(),
4459                        "guest {} has buffered operations ",
4460                        guest_id,
4461                    );
4462                });
4463
4464            for guest_buffer in &guest_client.buffers {
4465                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4466                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4467                    project
4468                        .shared_buffer(guest_client.peer_id, buffer_id)
4469                        .expect(&format!(
4470                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4471                            guest_id, guest_client.peer_id, buffer_id
4472                        ))
4473                });
4474                assert_eq!(
4475                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4476                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4477                    "guest {} buffer {} differs from the host's buffer",
4478                    guest_id,
4479                    buffer_id,
4480                );
4481            }
4482        }
4483    }
4484
4485    struct TestServer {
4486        peer: Arc<Peer>,
4487        app_state: Arc<AppState>,
4488        server: Arc<Server>,
4489        foreground: Rc<executor::Foreground>,
4490        notifications: mpsc::UnboundedReceiver<()>,
4491        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4492        forbid_connections: Arc<AtomicBool>,
4493        _test_db: TestDb,
4494    }
4495
4496    impl TestServer {
4497        async fn start(
4498            foreground: Rc<executor::Foreground>,
4499            background: Arc<executor::Background>,
4500        ) -> Self {
4501            let test_db = TestDb::fake(background);
4502            let app_state = Self::build_app_state(&test_db).await;
4503            let peer = Peer::new();
4504            let notifications = mpsc::unbounded();
4505            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4506            Self {
4507                peer,
4508                app_state,
4509                server,
4510                foreground,
4511                notifications: notifications.1,
4512                connection_killers: Default::default(),
4513                forbid_connections: Default::default(),
4514                _test_db: test_db,
4515            }
4516        }
4517
4518        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4519            let http = FakeHttpClient::with_404_response();
4520            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4521            let client_name = name.to_string();
4522            let mut client = Client::new(http.clone());
4523            let server = self.server.clone();
4524            let connection_killers = self.connection_killers.clone();
4525            let forbid_connections = self.forbid_connections.clone();
4526            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4527
4528            Arc::get_mut(&mut client)
4529                .unwrap()
4530                .override_authenticate(move |cx| {
4531                    cx.spawn(|_| async move {
4532                        let access_token = "the-token".to_string();
4533                        Ok(Credentials {
4534                            user_id: user_id.0 as u64,
4535                            access_token,
4536                        })
4537                    })
4538                })
4539                .override_establish_connection(move |credentials, cx| {
4540                    assert_eq!(credentials.user_id, user_id.0 as u64);
4541                    assert_eq!(credentials.access_token, "the-token");
4542
4543                    let server = server.clone();
4544                    let connection_killers = connection_killers.clone();
4545                    let forbid_connections = forbid_connections.clone();
4546                    let client_name = client_name.clone();
4547                    let connection_id_tx = connection_id_tx.clone();
4548                    cx.spawn(move |cx| async move {
4549                        if forbid_connections.load(SeqCst) {
4550                            Err(EstablishConnectionError::other(anyhow!(
4551                                "server is forbidding connections"
4552                            )))
4553                        } else {
4554                            let (client_conn, server_conn, kill_conn) =
4555                                Connection::in_memory(cx.background());
4556                            connection_killers.lock().insert(user_id, kill_conn);
4557                            cx.background()
4558                                .spawn(server.handle_connection(
4559                                    server_conn,
4560                                    client_name,
4561                                    user_id,
4562                                    Some(connection_id_tx),
4563                                    cx.background(),
4564                                ))
4565                                .detach();
4566                            Ok(client_conn)
4567                        }
4568                    })
4569                });
4570
4571            client
4572                .authenticate_and_connect(&cx.to_async())
4573                .await
4574                .unwrap();
4575
4576            Channel::init(&client);
4577            Project::init(&client);
4578
4579            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4580            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4581            let mut authed_user =
4582                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4583            while authed_user.next().await.unwrap().is_none() {}
4584
4585            TestClient {
4586                client,
4587                peer_id,
4588                user_store,
4589                project: Default::default(),
4590                buffers: Default::default(),
4591            }
4592        }
4593
4594        fn disconnect_client(&self, user_id: UserId) {
4595            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4596                let _ = kill_conn.try_send(Some(()));
4597            }
4598        }
4599
4600        fn forbid_connections(&self) {
4601            self.forbid_connections.store(true, SeqCst);
4602        }
4603
4604        fn allow_connections(&self) {
4605            self.forbid_connections.store(false, SeqCst);
4606        }
4607
4608        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4609            let mut config = Config::default();
4610            config.session_secret = "a".repeat(32);
4611            config.database_url = test_db.url.clone();
4612            let github_client = github::AppClient::test();
4613            Arc::new(AppState {
4614                db: test_db.db().clone(),
4615                handlebars: Default::default(),
4616                auth_client: auth::build_client("", ""),
4617                repo_client: github::RepoClient::test(&github_client),
4618                github_client,
4619                config,
4620            })
4621        }
4622
4623        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4624            self.server.store.read()
4625        }
4626
4627        async fn condition<F>(&mut self, mut predicate: F)
4628        where
4629            F: FnMut(&Store) -> bool,
4630        {
4631            async_std::future::timeout(Duration::from_millis(500), async {
4632                while !(predicate)(&*self.server.store.read()) {
4633                    self.foreground.start_waiting();
4634                    self.notifications.next().await;
4635                    self.foreground.finish_waiting();
4636                }
4637            })
4638            .await
4639            .expect("condition timed out");
4640        }
4641    }
4642
4643    impl Drop for TestServer {
4644        fn drop(&mut self) {
4645            self.peer.reset();
4646        }
4647    }
4648
4649    struct TestClient {
4650        client: Arc<Client>,
4651        pub peer_id: PeerId,
4652        pub user_store: ModelHandle<UserStore>,
4653        project: Option<ModelHandle<Project>>,
4654        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4655    }
4656
4657    impl Deref for TestClient {
4658        type Target = Arc<Client>;
4659
4660        fn deref(&self) -> &Self::Target {
4661            &self.client
4662        }
4663    }
4664
4665    impl TestClient {
4666        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4667            UserId::from_proto(
4668                self.user_store
4669                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4670            )
4671        }
4672
4673        fn simulate_host(
4674            mut self,
4675            project: ModelHandle<Project>,
4676            mut language_server_config: LanguageServerConfig,
4677            operations: Rc<Cell<usize>>,
4678            max_operations: usize,
4679            rng: Arc<Mutex<StdRng>>,
4680            mut cx: TestAppContext,
4681        ) -> impl Future<Output = (Self, TestAppContext)> {
4682            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4683
4684            // Set up a fake language server.
4685            language_server_config.set_fake_initializer({
4686                let rng = rng.clone();
4687                let files = files.clone();
4688                let project = project.clone();
4689                move |fake_server| {
4690                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4691                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4692                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4693                                range: lsp::Range::new(
4694                                    lsp::Position::new(0, 0),
4695                                    lsp::Position::new(0, 0),
4696                                ),
4697                                new_text: "the-new-text".to_string(),
4698                            })),
4699                            ..Default::default()
4700                        }]))
4701                    });
4702
4703                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4704                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4705                            lsp::CodeAction {
4706                                title: "the-code-action".to_string(),
4707                                ..Default::default()
4708                            },
4709                        )])
4710                    });
4711
4712                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4713                        |params, _| {
4714                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4715                                params.position,
4716                                params.position,
4717                            )))
4718                        },
4719                    );
4720
4721                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4722                        let files = files.clone();
4723                        let rng = rng.clone();
4724                        move |_, _| {
4725                            let files = files.lock();
4726                            let mut rng = rng.lock();
4727                            let count = rng.gen_range::<usize, _>(1..3);
4728                            Some(lsp::GotoDefinitionResponse::Array(
4729                                (0..count)
4730                                    .map(|_| {
4731                                        let file = files.choose(&mut *rng).unwrap().as_path();
4732                                        lsp::Location {
4733                                            uri: lsp::Url::from_file_path(file).unwrap(),
4734                                            range: Default::default(),
4735                                        }
4736                                    })
4737                                    .collect(),
4738                            ))
4739                        }
4740                    });
4741
4742                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4743                        let rng = rng.clone();
4744                        let project = project.clone();
4745                        move |params, mut cx| {
4746                            project.update(&mut cx, |project, cx| {
4747                                let path = params
4748                                    .text_document_position_params
4749                                    .text_document
4750                                    .uri
4751                                    .to_file_path()
4752                                    .unwrap();
4753                                let (worktree, relative_path) =
4754                                    project.find_local_worktree(&path, cx)?;
4755                                let project_path =
4756                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4757                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4758
4759                                let mut highlights = Vec::new();
4760                                let highlight_count = rng.lock().gen_range(1..=5);
4761                                let mut prev_end = 0;
4762                                for _ in 0..highlight_count {
4763                                    let range =
4764                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4765                                    let start =
4766                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4767                                    let end =
4768                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4769                                    highlights.push(lsp::DocumentHighlight {
4770                                        range: lsp::Range::new(start, end),
4771                                        kind: Some(lsp::DocumentHighlightKind::READ),
4772                                    });
4773                                    prev_end = range.end;
4774                                }
4775                                Some(highlights)
4776                            })
4777                        }
4778                    });
4779                }
4780            });
4781
4782            project.update(&mut cx, |project, _| {
4783                project.languages().add(Arc::new(Language::new(
4784                    LanguageConfig {
4785                        name: "Rust".into(),
4786                        path_suffixes: vec!["rs".to_string()],
4787                        language_server: Some(language_server_config),
4788                        ..Default::default()
4789                    },
4790                    None,
4791                )));
4792            });
4793
4794            async move {
4795                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4796                while operations.get() < max_operations {
4797                    operations.set(operations.get() + 1);
4798
4799                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4800                    match distribution {
4801                        0..=20 if !files.lock().is_empty() => {
4802                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4803                            let mut path = path.as_path();
4804                            while let Some(parent_path) = path.parent() {
4805                                path = parent_path;
4806                                if rng.lock().gen() {
4807                                    break;
4808                                }
4809                            }
4810
4811                            log::info!("Host: find/create local worktree {:?}", path);
4812                            project
4813                                .update(&mut cx, |project, cx| {
4814                                    project.find_or_create_local_worktree(path, false, cx)
4815                                })
4816                                .await
4817                                .unwrap();
4818                        }
4819                        10..=80 if !files.lock().is_empty() => {
4820                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4821                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4822                                let (worktree, path) = project
4823                                    .update(&mut cx, |project, cx| {
4824                                        project.find_or_create_local_worktree(file, false, cx)
4825                                    })
4826                                    .await
4827                                    .unwrap();
4828                                let project_path =
4829                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4830                                log::info!("Host: opening path {:?}", project_path);
4831                                let buffer = project
4832                                    .update(&mut cx, |project, cx| {
4833                                        project.open_buffer(project_path, cx)
4834                                    })
4835                                    .await
4836                                    .unwrap();
4837                                self.buffers.insert(buffer.clone());
4838                                buffer
4839                            } else {
4840                                self.buffers
4841                                    .iter()
4842                                    .choose(&mut *rng.lock())
4843                                    .unwrap()
4844                                    .clone()
4845                            };
4846
4847                            if rng.lock().gen_bool(0.1) {
4848                                cx.update(|cx| {
4849                                    log::info!(
4850                                        "Host: dropping buffer {:?}",
4851                                        buffer.read(cx).file().unwrap().full_path(cx)
4852                                    );
4853                                    self.buffers.remove(&buffer);
4854                                    drop(buffer);
4855                                });
4856                            } else {
4857                                buffer.update(&mut cx, |buffer, cx| {
4858                                    log::info!(
4859                                        "Host: updating buffer {:?}",
4860                                        buffer.file().unwrap().full_path(cx)
4861                                    );
4862                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4863                                });
4864                            }
4865                        }
4866                        _ => loop {
4867                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4868                            let mut path = PathBuf::new();
4869                            path.push("/");
4870                            for _ in 0..path_component_count {
4871                                let letter = rng.lock().gen_range(b'a'..=b'z');
4872                                path.push(std::str::from_utf8(&[letter]).unwrap());
4873                            }
4874                            path.set_extension("rs");
4875                            let parent_path = path.parent().unwrap();
4876
4877                            log::info!("Host: creating file {:?}", path,);
4878
4879                            if fs.create_dir(&parent_path).await.is_ok()
4880                                && fs.create_file(&path, Default::default()).await.is_ok()
4881                            {
4882                                files.lock().push(path);
4883                                break;
4884                            } else {
4885                                log::info!("Host: cannot create file");
4886                            }
4887                        },
4888                    }
4889
4890                    cx.background().simulate_random_delay().await;
4891                }
4892
4893                self.project = Some(project);
4894                (self, cx)
4895            }
4896        }
4897
4898        pub async fn simulate_guest(
4899            mut self,
4900            guest_id: usize,
4901            project: ModelHandle<Project>,
4902            operations: Rc<Cell<usize>>,
4903            max_operations: usize,
4904            rng: Arc<Mutex<StdRng>>,
4905            mut cx: TestAppContext,
4906        ) -> (Self, TestAppContext) {
4907            while operations.get() < max_operations {
4908                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4909                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4910                        project
4911                            .worktrees(&cx)
4912                            .filter(|worktree| {
4913                                worktree.read(cx).entries(false).any(|e| e.is_file())
4914                            })
4915                            .choose(&mut *rng.lock())
4916                    }) {
4917                        worktree
4918                    } else {
4919                        cx.background().simulate_random_delay().await;
4920                        continue;
4921                    };
4922
4923                    operations.set(operations.get() + 1);
4924                    let project_path = worktree.read_with(&cx, |worktree, _| {
4925                        let entry = worktree
4926                            .entries(false)
4927                            .filter(|e| e.is_file())
4928                            .choose(&mut *rng.lock())
4929                            .unwrap();
4930                        (worktree.id(), entry.path.clone())
4931                    });
4932                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4933                    let buffer = project
4934                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4935                        .await
4936                        .unwrap();
4937                    self.buffers.insert(buffer.clone());
4938                    buffer
4939                } else {
4940                    operations.set(operations.get() + 1);
4941
4942                    self.buffers
4943                        .iter()
4944                        .choose(&mut *rng.lock())
4945                        .unwrap()
4946                        .clone()
4947                };
4948
4949                let choice = rng.lock().gen_range(0..100);
4950                match choice {
4951                    0..=9 => {
4952                        cx.update(|cx| {
4953                            log::info!(
4954                                "Guest {}: dropping buffer {:?}",
4955                                guest_id,
4956                                buffer.read(cx).file().unwrap().full_path(cx)
4957                            );
4958                            self.buffers.remove(&buffer);
4959                            drop(buffer);
4960                        });
4961                    }
4962                    10..=19 => {
4963                        let completions = project.update(&mut cx, |project, cx| {
4964                            log::info!(
4965                                "Guest {}: requesting completions for buffer {:?}",
4966                                guest_id,
4967                                buffer.read(cx).file().unwrap().full_path(cx)
4968                            );
4969                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4970                            project.completions(&buffer, offset, cx)
4971                        });
4972                        let completions = cx.background().spawn(async move {
4973                            completions.await.expect("completions request failed");
4974                        });
4975                        if rng.lock().gen_bool(0.3) {
4976                            log::info!("Guest {}: detaching completions request", guest_id);
4977                            completions.detach();
4978                        } else {
4979                            completions.await;
4980                        }
4981                    }
4982                    20..=29 => {
4983                        let code_actions = project.update(&mut cx, |project, cx| {
4984                            log::info!(
4985                                "Guest {}: requesting code actions for buffer {:?}",
4986                                guest_id,
4987                                buffer.read(cx).file().unwrap().full_path(cx)
4988                            );
4989                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4990                            project.code_actions(&buffer, range, cx)
4991                        });
4992                        let code_actions = cx.background().spawn(async move {
4993                            code_actions.await.expect("code actions request failed");
4994                        });
4995                        if rng.lock().gen_bool(0.3) {
4996                            log::info!("Guest {}: detaching code actions request", guest_id);
4997                            code_actions.detach();
4998                        } else {
4999                            code_actions.await;
5000                        }
5001                    }
5002                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5003                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5004                            log::info!(
5005                                "Guest {}: saving buffer {:?}",
5006                                guest_id,
5007                                buffer.file().unwrap().full_path(cx)
5008                            );
5009                            (buffer.version(), buffer.save(cx))
5010                        });
5011                        let save = cx.spawn(|cx| async move {
5012                            let (saved_version, _) = save.await.expect("save request failed");
5013                            buffer.read_with(&cx, |buffer, _| {
5014                                assert!(buffer.version().observed_all(&saved_version));
5015                                assert!(saved_version.observed_all(&requested_version));
5016                            });
5017                        });
5018                        if rng.lock().gen_bool(0.3) {
5019                            log::info!("Guest {}: detaching save request", guest_id);
5020                            save.detach();
5021                        } else {
5022                            save.await;
5023                        }
5024                    }
5025                    40..=45 => {
5026                        let prepare_rename = project.update(&mut cx, |project, cx| {
5027                            log::info!(
5028                                "Guest {}: preparing rename for buffer {:?}",
5029                                guest_id,
5030                                buffer.read(cx).file().unwrap().full_path(cx)
5031                            );
5032                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5033                            project.prepare_rename(buffer, offset, cx)
5034                        });
5035                        let prepare_rename = cx.background().spawn(async move {
5036                            prepare_rename.await.expect("prepare rename request failed");
5037                        });
5038                        if rng.lock().gen_bool(0.3) {
5039                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5040                            prepare_rename.detach();
5041                        } else {
5042                            prepare_rename.await;
5043                        }
5044                    }
5045                    46..=49 => {
5046                        let definitions = project.update(&mut cx, |project, cx| {
5047                            log::info!(
5048                                "Guest {}: requesting defintions for buffer {:?}",
5049                                guest_id,
5050                                buffer.read(cx).file().unwrap().full_path(cx)
5051                            );
5052                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5053                            project.definition(&buffer, offset, cx)
5054                        });
5055                        let definitions = cx.background().spawn(async move {
5056                            definitions.await.expect("definitions request failed");
5057                        });
5058                        if rng.lock().gen_bool(0.3) {
5059                            log::info!("Guest {}: detaching definitions request", guest_id);
5060                            definitions.detach();
5061                        } else {
5062                            definitions.await;
5063                        }
5064                    }
5065                    50..=55 => {
5066                        let highlights = project.update(&mut cx, |project, cx| {
5067                            log::info!(
5068                                "Guest {}: requesting highlights for buffer {:?}",
5069                                guest_id,
5070                                buffer.read(cx).file().unwrap().full_path(cx)
5071                            );
5072                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5073                            project.document_highlights(&buffer, offset, cx)
5074                        });
5075                        let highlights = cx.background().spawn(async move {
5076                            highlights.await.expect("highlights request failed");
5077                        });
5078                        if rng.lock().gen_bool(0.3) {
5079                            log::info!("Guest {}: detaching highlights request", guest_id);
5080                            highlights.detach();
5081                        } else {
5082                            highlights.await;
5083                        }
5084                    }
5085                    _ => {
5086                        buffer.update(&mut cx, |buffer, cx| {
5087                            log::info!(
5088                                "Guest {}: updating buffer {:?}",
5089                                guest_id,
5090                                buffer.file().unwrap().full_path(cx)
5091                            );
5092                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5093                        });
5094                    }
5095                }
5096                cx.background().simulate_random_delay().await;
5097            }
5098
5099            self.project = Some(project);
5100            (self, cx)
5101        }
5102    }
5103
5104    impl Executor for Arc<gpui::executor::Background> {
5105        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5106            self.spawn(future).detach();
5107        }
5108    }
5109
5110    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5111        channel
5112            .messages()
5113            .cursor::<()>()
5114            .map(|m| {
5115                (
5116                    m.sender.github_login.clone(),
5117                    m.body.clone(),
5118                    m.is_pending(),
5119                )
5120            })
5121            .collect()
5122    }
5123
5124    struct EmptyView;
5125
5126    impl gpui::Entity for EmptyView {
5127        type Event = ();
5128    }
5129
5130    impl gpui::View for EmptyView {
5131        fn ui_name() -> &'static str {
5132            "empty view"
5133        }
5134
5135        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5136            gpui::Element::boxed(gpui::elements::Empty)
5137        }
5138    }
5139}