rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::get_references)
  83            .add_request_handler(Server::get_document_highlights)
  84            .add_request_handler(Server::get_project_symbols)
  85            .add_request_handler(Server::open_buffer_for_symbol)
  86            .add_request_handler(Server::open_buffer)
  87            .add_message_handler(Server::close_buffer)
  88            .add_request_handler(Server::update_buffer)
  89            .add_message_handler(Server::update_buffer_file)
  90            .add_message_handler(Server::buffer_reloaded)
  91            .add_message_handler(Server::buffer_saved)
  92            .add_request_handler(Server::save_buffer)
  93            .add_request_handler(Server::format_buffers)
  94            .add_request_handler(Server::get_completions)
  95            .add_request_handler(Server::apply_additional_edits_for_completion)
  96            .add_request_handler(Server::get_code_actions)
  97            .add_request_handler(Server::apply_code_action)
  98            .add_request_handler(Server::prepare_rename)
  99            .add_request_handler(Server::perform_rename)
 100            .add_request_handler(Server::get_channels)
 101            .add_request_handler(Server::get_users)
 102            .add_request_handler(Server::join_channel)
 103            .add_message_handler(Server::leave_channel)
 104            .add_request_handler(Server::send_channel_message)
 105            .add_request_handler(Server::get_channel_messages);
 106
 107        Arc::new(server)
 108    }
 109
 110    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 111    where
 112        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 113        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 114        M: EnvelopedMessage,
 115    {
 116        let prev_handler = self.handlers.insert(
 117            TypeId::of::<M>(),
 118            Box::new(move |server, envelope| {
 119                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 120                (handler)(server, *envelope).boxed()
 121            }),
 122        );
 123        if prev_handler.is_some() {
 124            panic!("registered a handler for the same message twice");
 125        }
 126        self
 127    }
 128
 129    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 130    where
 131        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 132        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 133        M: RequestMessage,
 134    {
 135        self.add_message_handler(move |server, envelope| {
 136            let receipt = envelope.receipt();
 137            let response = (handler)(server.clone(), envelope);
 138            async move {
 139                match response.await {
 140                    Ok(response) => {
 141                        server.peer.respond(receipt, response)?;
 142                        Ok(())
 143                    }
 144                    Err(error) => {
 145                        server.peer.respond_with_error(
 146                            receipt,
 147                            proto::Error {
 148                                message: error.to_string(),
 149                            },
 150                        )?;
 151                        Err(error)
 152                    }
 153                }
 154            }
 155        })
 156    }
 157
 158    pub fn handle_connection<E: Executor>(
 159        self: &Arc<Self>,
 160        connection: Connection,
 161        addr: String,
 162        user_id: UserId,
 163        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 164        executor: E,
 165    ) -> impl Future<Output = ()> {
 166        let mut this = self.clone();
 167        async move {
 168            let (connection_id, handle_io, mut incoming_rx) =
 169                this.peer.add_connection(connection).await;
 170
 171            if let Some(send_connection_id) = send_connection_id.as_mut() {
 172                let _ = send_connection_id.send(connection_id).await;
 173            }
 174
 175            this.state_mut().add_connection(connection_id, user_id);
 176            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 177                log::error!("error updating contacts for {:?}: {}", user_id, err);
 178            }
 179
 180            let handle_io = handle_io.fuse();
 181            futures::pin_mut!(handle_io);
 182            loop {
 183                let next_message = incoming_rx.next().fuse();
 184                futures::pin_mut!(next_message);
 185                futures::select_biased! {
 186                    result = handle_io => {
 187                        if let Err(err) = result {
 188                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 189                        }
 190                        break;
 191                    }
 192                    message = next_message => {
 193                        if let Some(message) = message {
 194                            let start_time = Instant::now();
 195                            let type_name = message.payload_type_name();
 196                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 197                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 198                                let notifications = this.notifications.clone();
 199                                let is_background = message.is_background();
 200                                let handle_message = (handler)(this.clone(), message);
 201                                let handle_message = async move {
 202                                    if let Err(err) = handle_message.await {
 203                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 204                                    } else {
 205                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 206                                    }
 207                                    if let Some(mut notifications) = notifications {
 208                                        let _ = notifications.send(()).await;
 209                                    }
 210                                };
 211                                if is_background {
 212                                    executor.spawn_detached(handle_message);
 213                                } else {
 214                                    handle_message.await;
 215                                }
 216                            } else {
 217                                log::warn!("unhandled message: {}", type_name);
 218                            }
 219                        } else {
 220                            log::info!("rpc connection closed {:?}", addr);
 221                            break;
 222                        }
 223                    }
 224                }
 225            }
 226
 227            if let Err(err) = this.sign_out(connection_id).await {
 228                log::error!("error signing out connection {:?} - {:?}", addr, err);
 229            }
 230        }
 231    }
 232
 233    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 234        self.peer.disconnect(connection_id);
 235        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 236
 237        for (project_id, project) in removed_connection.hosted_projects {
 238            if let Some(share) = project.share {
 239                broadcast(
 240                    connection_id,
 241                    share.guests.keys().copied().collect(),
 242                    |conn_id| {
 243                        self.peer
 244                            .send(conn_id, proto::UnshareProject { project_id })
 245                    },
 246                )?;
 247            }
 248        }
 249
 250        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 251            broadcast(connection_id, peer_ids, |conn_id| {
 252                self.peer.send(
 253                    conn_id,
 254                    proto::RemoveProjectCollaborator {
 255                        project_id,
 256                        peer_id: connection_id.0,
 257                    },
 258                )
 259            })?;
 260        }
 261
 262        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 263        Ok(())
 264    }
 265
 266    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 267        Ok(proto::Ack {})
 268    }
 269
 270    async fn register_project(
 271        mut self: Arc<Server>,
 272        request: TypedEnvelope<proto::RegisterProject>,
 273    ) -> tide::Result<proto::RegisterProjectResponse> {
 274        let project_id = {
 275            let mut state = self.state_mut();
 276            let user_id = state.user_id_for_connection(request.sender_id)?;
 277            state.register_project(request.sender_id, user_id)
 278        };
 279        Ok(proto::RegisterProjectResponse { project_id })
 280    }
 281
 282    async fn unregister_project(
 283        mut self: Arc<Server>,
 284        request: TypedEnvelope<proto::UnregisterProject>,
 285    ) -> tide::Result<()> {
 286        let project = self
 287            .state_mut()
 288            .unregister_project(request.payload.project_id, request.sender_id)?;
 289        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 290        Ok(())
 291    }
 292
 293    async fn share_project(
 294        mut self: Arc<Server>,
 295        request: TypedEnvelope<proto::ShareProject>,
 296    ) -> tide::Result<proto::Ack> {
 297        self.state_mut()
 298            .share_project(request.payload.project_id, request.sender_id);
 299        Ok(proto::Ack {})
 300    }
 301
 302    async fn unshare_project(
 303        mut self: Arc<Server>,
 304        request: TypedEnvelope<proto::UnshareProject>,
 305    ) -> tide::Result<()> {
 306        let project_id = request.payload.project_id;
 307        let project = self
 308            .state_mut()
 309            .unshare_project(project_id, request.sender_id)?;
 310
 311        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 312            self.peer
 313                .send(conn_id, proto::UnshareProject { project_id })
 314        })?;
 315        self.update_contacts_for_users(&project.authorized_user_ids)?;
 316        Ok(())
 317    }
 318
 319    async fn join_project(
 320        mut self: Arc<Server>,
 321        request: TypedEnvelope<proto::JoinProject>,
 322    ) -> tide::Result<proto::JoinProjectResponse> {
 323        let project_id = request.payload.project_id;
 324
 325        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 326        let (response, connection_ids, contact_user_ids) = self
 327            .state_mut()
 328            .join_project(request.sender_id, user_id, project_id)
 329            .and_then(|joined| {
 330                let share = joined.project.share()?;
 331                let peer_count = share.guests.len();
 332                let mut collaborators = Vec::with_capacity(peer_count);
 333                collaborators.push(proto::Collaborator {
 334                    peer_id: joined.project.host_connection_id.0,
 335                    replica_id: 0,
 336                    user_id: joined.project.host_user_id.to_proto(),
 337                });
 338                let worktrees = joined
 339                    .project
 340                    .worktrees
 341                    .iter()
 342                    .filter_map(|(id, worktree)| {
 343                        worktree.share.as_ref().map(|share| proto::Worktree {
 344                            id: *id,
 345                            root_name: worktree.root_name.clone(),
 346                            entries: share.entries.values().cloned().collect(),
 347                            diagnostic_summaries: share
 348                                .diagnostic_summaries
 349                                .values()
 350                                .cloned()
 351                                .collect(),
 352                            weak: worktree.weak,
 353                            next_update_id: share.next_update_id as u64,
 354                        })
 355                    })
 356                    .collect();
 357                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 358                    if *peer_conn_id != request.sender_id {
 359                        collaborators.push(proto::Collaborator {
 360                            peer_id: peer_conn_id.0,
 361                            replica_id: *peer_replica_id as u32,
 362                            user_id: peer_user_id.to_proto(),
 363                        });
 364                    }
 365                }
 366                let response = proto::JoinProjectResponse {
 367                    worktrees,
 368                    replica_id: joined.replica_id as u32,
 369                    collaborators,
 370                };
 371                let connection_ids = joined.project.connection_ids();
 372                let contact_user_ids = joined.project.authorized_user_ids();
 373                Ok((response, connection_ids, contact_user_ids))
 374            })?;
 375
 376        broadcast(request.sender_id, connection_ids, |conn_id| {
 377            self.peer.send(
 378                conn_id,
 379                proto::AddProjectCollaborator {
 380                    project_id,
 381                    collaborator: Some(proto::Collaborator {
 382                        peer_id: request.sender_id.0,
 383                        replica_id: response.replica_id,
 384                        user_id: user_id.to_proto(),
 385                    }),
 386                },
 387            )
 388        })?;
 389        self.update_contacts_for_users(&contact_user_ids)?;
 390        Ok(response)
 391    }
 392
 393    async fn leave_project(
 394        mut self: Arc<Server>,
 395        request: TypedEnvelope<proto::LeaveProject>,
 396    ) -> tide::Result<()> {
 397        let sender_id = request.sender_id;
 398        let project_id = request.payload.project_id;
 399        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 400
 401        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 402            self.peer.send(
 403                conn_id,
 404                proto::RemoveProjectCollaborator {
 405                    project_id,
 406                    peer_id: sender_id.0,
 407                },
 408            )
 409        })?;
 410        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 411
 412        Ok(())
 413    }
 414
 415    async fn register_worktree(
 416        mut self: Arc<Server>,
 417        request: TypedEnvelope<proto::RegisterWorktree>,
 418    ) -> tide::Result<proto::Ack> {
 419        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 420
 421        let mut contact_user_ids = HashSet::default();
 422        contact_user_ids.insert(host_user_id);
 423        for github_login in request.payload.authorized_logins {
 424            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 425            contact_user_ids.insert(contact_user_id);
 426        }
 427
 428        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 429        self.state_mut().register_worktree(
 430            request.payload.project_id,
 431            request.payload.worktree_id,
 432            request.sender_id,
 433            Worktree {
 434                authorized_user_ids: contact_user_ids.clone(),
 435                root_name: request.payload.root_name,
 436                share: None,
 437                weak: false,
 438            },
 439        )?;
 440        self.update_contacts_for_users(&contact_user_ids)?;
 441        Ok(proto::Ack {})
 442    }
 443
 444    async fn unregister_worktree(
 445        mut self: Arc<Server>,
 446        request: TypedEnvelope<proto::UnregisterWorktree>,
 447    ) -> tide::Result<()> {
 448        let project_id = request.payload.project_id;
 449        let worktree_id = request.payload.worktree_id;
 450        let (worktree, guest_connection_ids) =
 451            self.state_mut()
 452                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 453        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 454            self.peer.send(
 455                conn_id,
 456                proto::UnregisterWorktree {
 457                    project_id,
 458                    worktree_id,
 459                },
 460            )
 461        })?;
 462        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 463        Ok(())
 464    }
 465
 466    async fn share_worktree(
 467        mut self: Arc<Server>,
 468        mut request: TypedEnvelope<proto::ShareWorktree>,
 469    ) -> tide::Result<proto::Ack> {
 470        let worktree = request
 471            .payload
 472            .worktree
 473            .as_mut()
 474            .ok_or_else(|| anyhow!("missing worktree"))?;
 475        let entries = worktree
 476            .entries
 477            .iter()
 478            .map(|entry| (entry.id, entry.clone()))
 479            .collect();
 480        let diagnostic_summaries = worktree
 481            .diagnostic_summaries
 482            .iter()
 483            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 484            .collect();
 485
 486        let shared_worktree = self.state_mut().share_worktree(
 487            request.payload.project_id,
 488            worktree.id,
 489            request.sender_id,
 490            entries,
 491            diagnostic_summaries,
 492            worktree.next_update_id,
 493        )?;
 494
 495        broadcast(
 496            request.sender_id,
 497            shared_worktree.connection_ids,
 498            |connection_id| {
 499                self.peer
 500                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 501            },
 502        )?;
 503        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 504
 505        Ok(proto::Ack {})
 506    }
 507
 508    async fn update_worktree(
 509        mut self: Arc<Server>,
 510        request: TypedEnvelope<proto::UpdateWorktree>,
 511    ) -> tide::Result<proto::Ack> {
 512        let connection_ids = self.state_mut().update_worktree(
 513            request.sender_id,
 514            request.payload.project_id,
 515            request.payload.worktree_id,
 516            request.payload.id,
 517            &request.payload.removed_entries,
 518            &request.payload.updated_entries,
 519        )?;
 520
 521        broadcast(request.sender_id, connection_ids, |connection_id| {
 522            self.peer
 523                .forward_send(request.sender_id, connection_id, request.payload.clone())
 524        })?;
 525
 526        Ok(proto::Ack {})
 527    }
 528
 529    async fn update_diagnostic_summary(
 530        mut self: Arc<Server>,
 531        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 532    ) -> tide::Result<()> {
 533        let summary = request
 534            .payload
 535            .summary
 536            .clone()
 537            .ok_or_else(|| anyhow!("invalid summary"))?;
 538        let receiver_ids = self.state_mut().update_diagnostic_summary(
 539            request.payload.project_id,
 540            request.payload.worktree_id,
 541            request.sender_id,
 542            summary,
 543        )?;
 544
 545        broadcast(request.sender_id, receiver_ids, |connection_id| {
 546            self.peer
 547                .forward_send(request.sender_id, connection_id, request.payload.clone())
 548        })?;
 549        Ok(())
 550    }
 551
 552    async fn disk_based_diagnostics_updating(
 553        self: Arc<Server>,
 554        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 555    ) -> tide::Result<()> {
 556        let receiver_ids = self
 557            .state()
 558            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 559        broadcast(request.sender_id, receiver_ids, |connection_id| {
 560            self.peer
 561                .forward_send(request.sender_id, connection_id, request.payload.clone())
 562        })?;
 563        Ok(())
 564    }
 565
 566    async fn disk_based_diagnostics_updated(
 567        self: Arc<Server>,
 568        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 569    ) -> tide::Result<()> {
 570        let receiver_ids = self
 571            .state()
 572            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 573        broadcast(request.sender_id, receiver_ids, |connection_id| {
 574            self.peer
 575                .forward_send(request.sender_id, connection_id, request.payload.clone())
 576        })?;
 577        Ok(())
 578    }
 579
 580    async fn get_definition(
 581        self: Arc<Server>,
 582        request: TypedEnvelope<proto::GetDefinition>,
 583    ) -> tide::Result<proto::GetDefinitionResponse> {
 584        let host_connection_id = self
 585            .state()
 586            .read_project(request.payload.project_id, request.sender_id)?
 587            .host_connection_id;
 588        Ok(self
 589            .peer
 590            .forward_request(request.sender_id, host_connection_id, request.payload)
 591            .await?)
 592    }
 593
 594    async fn get_references(
 595        self: Arc<Server>,
 596        request: TypedEnvelope<proto::GetReferences>,
 597    ) -> tide::Result<proto::GetReferencesResponse> {
 598        let host_connection_id = self
 599            .state()
 600            .read_project(request.payload.project_id, request.sender_id)?
 601            .host_connection_id;
 602        Ok(self
 603            .peer
 604            .forward_request(request.sender_id, host_connection_id, request.payload)
 605            .await?)
 606    }
 607
 608    async fn get_document_highlights(
 609        self: Arc<Server>,
 610        request: TypedEnvelope<proto::GetDocumentHighlights>,
 611    ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
 612        let host_connection_id = self
 613            .state()
 614            .read_project(request.payload.project_id, request.sender_id)?
 615            .host_connection_id;
 616        Ok(self
 617            .peer
 618            .forward_request(request.sender_id, host_connection_id, request.payload)
 619            .await?)
 620    }
 621
 622    async fn get_project_symbols(
 623        self: Arc<Server>,
 624        request: TypedEnvelope<proto::GetProjectSymbols>,
 625    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 626        let host_connection_id = self
 627            .state()
 628            .read_project(request.payload.project_id, request.sender_id)?
 629            .host_connection_id;
 630        Ok(self
 631            .peer
 632            .forward_request(request.sender_id, host_connection_id, request.payload)
 633            .await?)
 634    }
 635
 636    async fn open_buffer_for_symbol(
 637        self: Arc<Server>,
 638        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 639    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 640        let host_connection_id = self
 641            .state()
 642            .read_project(request.payload.project_id, request.sender_id)?
 643            .host_connection_id;
 644        Ok(self
 645            .peer
 646            .forward_request(request.sender_id, host_connection_id, request.payload)
 647            .await?)
 648    }
 649
 650    async fn open_buffer(
 651        self: Arc<Server>,
 652        request: TypedEnvelope<proto::OpenBuffer>,
 653    ) -> tide::Result<proto::OpenBufferResponse> {
 654        let host_connection_id = self
 655            .state()
 656            .read_project(request.payload.project_id, request.sender_id)?
 657            .host_connection_id;
 658        Ok(self
 659            .peer
 660            .forward_request(request.sender_id, host_connection_id, request.payload)
 661            .await?)
 662    }
 663
 664    async fn close_buffer(
 665        self: Arc<Server>,
 666        request: TypedEnvelope<proto::CloseBuffer>,
 667    ) -> tide::Result<()> {
 668        let host_connection_id = self
 669            .state()
 670            .read_project(request.payload.project_id, request.sender_id)?
 671            .host_connection_id;
 672        self.peer
 673            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 674        Ok(())
 675    }
 676
 677    async fn save_buffer(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::SaveBuffer>,
 680    ) -> tide::Result<proto::BufferSaved> {
 681        let host;
 682        let mut guests;
 683        {
 684            let state = self.state();
 685            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 686            host = project.host_connection_id;
 687            guests = project.guest_connection_ids()
 688        }
 689
 690        let response = self
 691            .peer
 692            .forward_request(request.sender_id, host, request.payload.clone())
 693            .await?;
 694
 695        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 696        broadcast(host, guests, |conn_id| {
 697            self.peer.forward_send(host, conn_id, response.clone())
 698        })?;
 699
 700        Ok(response)
 701    }
 702
 703    async fn format_buffers(
 704        self: Arc<Server>,
 705        request: TypedEnvelope<proto::FormatBuffers>,
 706    ) -> tide::Result<proto::FormatBuffersResponse> {
 707        let host = self
 708            .state()
 709            .read_project(request.payload.project_id, request.sender_id)?
 710            .host_connection_id;
 711        Ok(self
 712            .peer
 713            .forward_request(request.sender_id, host, request.payload.clone())
 714            .await?)
 715    }
 716
 717    async fn get_completions(
 718        self: Arc<Server>,
 719        request: TypedEnvelope<proto::GetCompletions>,
 720    ) -> tide::Result<proto::GetCompletionsResponse> {
 721        let host = self
 722            .state()
 723            .read_project(request.payload.project_id, request.sender_id)?
 724            .host_connection_id;
 725        Ok(self
 726            .peer
 727            .forward_request(request.sender_id, host, request.payload.clone())
 728            .await?)
 729    }
 730
 731    async fn apply_additional_edits_for_completion(
 732        self: Arc<Server>,
 733        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 734    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 735        let host = self
 736            .state()
 737            .read_project(request.payload.project_id, request.sender_id)?
 738            .host_connection_id;
 739        Ok(self
 740            .peer
 741            .forward_request(request.sender_id, host, request.payload.clone())
 742            .await?)
 743    }
 744
 745    async fn get_code_actions(
 746        self: Arc<Server>,
 747        request: TypedEnvelope<proto::GetCodeActions>,
 748    ) -> tide::Result<proto::GetCodeActionsResponse> {
 749        let host = self
 750            .state()
 751            .read_project(request.payload.project_id, request.sender_id)?
 752            .host_connection_id;
 753        Ok(self
 754            .peer
 755            .forward_request(request.sender_id, host, request.payload.clone())
 756            .await?)
 757    }
 758
 759    async fn apply_code_action(
 760        self: Arc<Server>,
 761        request: TypedEnvelope<proto::ApplyCodeAction>,
 762    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 763        let host = self
 764            .state()
 765            .read_project(request.payload.project_id, request.sender_id)?
 766            .host_connection_id;
 767        Ok(self
 768            .peer
 769            .forward_request(request.sender_id, host, request.payload.clone())
 770            .await?)
 771    }
 772
 773    async fn prepare_rename(
 774        self: Arc<Server>,
 775        request: TypedEnvelope<proto::PrepareRename>,
 776    ) -> tide::Result<proto::PrepareRenameResponse> {
 777        let host = self
 778            .state()
 779            .read_project(request.payload.project_id, request.sender_id)?
 780            .host_connection_id;
 781        Ok(self
 782            .peer
 783            .forward_request(request.sender_id, host, request.payload.clone())
 784            .await?)
 785    }
 786
 787    async fn perform_rename(
 788        self: Arc<Server>,
 789        request: TypedEnvelope<proto::PerformRename>,
 790    ) -> tide::Result<proto::PerformRenameResponse> {
 791        let host = self
 792            .state()
 793            .read_project(request.payload.project_id, request.sender_id)?
 794            .host_connection_id;
 795        Ok(self
 796            .peer
 797            .forward_request(request.sender_id, host, request.payload.clone())
 798            .await?)
 799    }
 800
 801    async fn update_buffer(
 802        self: Arc<Server>,
 803        request: TypedEnvelope<proto::UpdateBuffer>,
 804    ) -> tide::Result<proto::Ack> {
 805        let receiver_ids = self
 806            .state()
 807            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 808        broadcast(request.sender_id, receiver_ids, |connection_id| {
 809            self.peer
 810                .forward_send(request.sender_id, connection_id, request.payload.clone())
 811        })?;
 812        Ok(proto::Ack {})
 813    }
 814
 815    async fn update_buffer_file(
 816        self: Arc<Server>,
 817        request: TypedEnvelope<proto::UpdateBufferFile>,
 818    ) -> tide::Result<()> {
 819        let receiver_ids = self
 820            .state()
 821            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 822        broadcast(request.sender_id, receiver_ids, |connection_id| {
 823            self.peer
 824                .forward_send(request.sender_id, connection_id, request.payload.clone())
 825        })?;
 826        Ok(())
 827    }
 828
 829    async fn buffer_reloaded(
 830        self: Arc<Server>,
 831        request: TypedEnvelope<proto::BufferReloaded>,
 832    ) -> tide::Result<()> {
 833        let receiver_ids = self
 834            .state()
 835            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 836        broadcast(request.sender_id, receiver_ids, |connection_id| {
 837            self.peer
 838                .forward_send(request.sender_id, connection_id, request.payload.clone())
 839        })?;
 840        Ok(())
 841    }
 842
 843    async fn buffer_saved(
 844        self: Arc<Server>,
 845        request: TypedEnvelope<proto::BufferSaved>,
 846    ) -> tide::Result<()> {
 847        let receiver_ids = self
 848            .state()
 849            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 850        broadcast(request.sender_id, receiver_ids, |connection_id| {
 851            self.peer
 852                .forward_send(request.sender_id, connection_id, request.payload.clone())
 853        })?;
 854        Ok(())
 855    }
 856
 857    async fn get_channels(
 858        self: Arc<Server>,
 859        request: TypedEnvelope<proto::GetChannels>,
 860    ) -> tide::Result<proto::GetChannelsResponse> {
 861        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 862        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 863        Ok(proto::GetChannelsResponse {
 864            channels: channels
 865                .into_iter()
 866                .map(|chan| proto::Channel {
 867                    id: chan.id.to_proto(),
 868                    name: chan.name,
 869                })
 870                .collect(),
 871        })
 872    }
 873
 874    async fn get_users(
 875        self: Arc<Server>,
 876        request: TypedEnvelope<proto::GetUsers>,
 877    ) -> tide::Result<proto::GetUsersResponse> {
 878        let user_ids = request
 879            .payload
 880            .user_ids
 881            .into_iter()
 882            .map(UserId::from_proto)
 883            .collect();
 884        let users = self
 885            .app_state
 886            .db
 887            .get_users_by_ids(user_ids)
 888            .await?
 889            .into_iter()
 890            .map(|user| proto::User {
 891                id: user.id.to_proto(),
 892                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 893                github_login: user.github_login,
 894            })
 895            .collect();
 896        Ok(proto::GetUsersResponse { users })
 897    }
 898
 899    fn update_contacts_for_users<'a>(
 900        self: &Arc<Server>,
 901        user_ids: impl IntoIterator<Item = &'a UserId>,
 902    ) -> anyhow::Result<()> {
 903        let mut result = Ok(());
 904        let state = self.state();
 905        for user_id in user_ids {
 906            let contacts = state.contacts_for_user(*user_id);
 907            for connection_id in state.connection_ids_for_user(*user_id) {
 908                if let Err(error) = self.peer.send(
 909                    connection_id,
 910                    proto::UpdateContacts {
 911                        contacts: contacts.clone(),
 912                    },
 913                ) {
 914                    result = Err(error);
 915                }
 916            }
 917        }
 918        result
 919    }
 920
 921    async fn join_channel(
 922        mut self: Arc<Self>,
 923        request: TypedEnvelope<proto::JoinChannel>,
 924    ) -> tide::Result<proto::JoinChannelResponse> {
 925        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 926        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 927        if !self
 928            .app_state
 929            .db
 930            .can_user_access_channel(user_id, channel_id)
 931            .await?
 932        {
 933            Err(anyhow!("access denied"))?;
 934        }
 935
 936        self.state_mut().join_channel(request.sender_id, channel_id);
 937        let messages = self
 938            .app_state
 939            .db
 940            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 941            .await?
 942            .into_iter()
 943            .map(|msg| proto::ChannelMessage {
 944                id: msg.id.to_proto(),
 945                body: msg.body,
 946                timestamp: msg.sent_at.unix_timestamp() as u64,
 947                sender_id: msg.sender_id.to_proto(),
 948                nonce: Some(msg.nonce.as_u128().into()),
 949            })
 950            .collect::<Vec<_>>();
 951        Ok(proto::JoinChannelResponse {
 952            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 953            messages,
 954        })
 955    }
 956
 957    async fn leave_channel(
 958        mut self: Arc<Self>,
 959        request: TypedEnvelope<proto::LeaveChannel>,
 960    ) -> tide::Result<()> {
 961        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 962        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 963        if !self
 964            .app_state
 965            .db
 966            .can_user_access_channel(user_id, channel_id)
 967            .await?
 968        {
 969            Err(anyhow!("access denied"))?;
 970        }
 971
 972        self.state_mut()
 973            .leave_channel(request.sender_id, channel_id);
 974
 975        Ok(())
 976    }
 977
 978    async fn send_channel_message(
 979        self: Arc<Self>,
 980        request: TypedEnvelope<proto::SendChannelMessage>,
 981    ) -> tide::Result<proto::SendChannelMessageResponse> {
 982        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 983        let user_id;
 984        let connection_ids;
 985        {
 986            let state = self.state();
 987            user_id = state.user_id_for_connection(request.sender_id)?;
 988            connection_ids = state.channel_connection_ids(channel_id)?;
 989        }
 990
 991        // Validate the message body.
 992        let body = request.payload.body.trim().to_string();
 993        if body.len() > MAX_MESSAGE_LEN {
 994            return Err(anyhow!("message is too long"))?;
 995        }
 996        if body.is_empty() {
 997            return Err(anyhow!("message can't be blank"))?;
 998        }
 999
1000        let timestamp = OffsetDateTime::now_utc();
1001        let nonce = request
1002            .payload
1003            .nonce
1004            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
1005
1006        let message_id = self
1007            .app_state
1008            .db
1009            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1010            .await?
1011            .to_proto();
1012        let message = proto::ChannelMessage {
1013            sender_id: user_id.to_proto(),
1014            id: message_id,
1015            body,
1016            timestamp: timestamp.unix_timestamp() as u64,
1017            nonce: Some(nonce),
1018        };
1019        broadcast(request.sender_id, connection_ids, |conn_id| {
1020            self.peer.send(
1021                conn_id,
1022                proto::ChannelMessageSent {
1023                    channel_id: channel_id.to_proto(),
1024                    message: Some(message.clone()),
1025                },
1026            )
1027        })?;
1028        Ok(proto::SendChannelMessageResponse {
1029            message: Some(message),
1030        })
1031    }
1032
1033    async fn get_channel_messages(
1034        self: Arc<Self>,
1035        request: TypedEnvelope<proto::GetChannelMessages>,
1036    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1037        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1038        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1039        if !self
1040            .app_state
1041            .db
1042            .can_user_access_channel(user_id, channel_id)
1043            .await?
1044        {
1045            Err(anyhow!("access denied"))?;
1046        }
1047
1048        let messages = self
1049            .app_state
1050            .db
1051            .get_channel_messages(
1052                channel_id,
1053                MESSAGE_COUNT_PER_PAGE,
1054                Some(MessageId::from_proto(request.payload.before_message_id)),
1055            )
1056            .await?
1057            .into_iter()
1058            .map(|msg| proto::ChannelMessage {
1059                id: msg.id.to_proto(),
1060                body: msg.body,
1061                timestamp: msg.sent_at.unix_timestamp() as u64,
1062                sender_id: msg.sender_id.to_proto(),
1063                nonce: Some(msg.nonce.as_u128().into()),
1064            })
1065            .collect::<Vec<_>>();
1066
1067        Ok(proto::GetChannelMessagesResponse {
1068            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1069            messages,
1070        })
1071    }
1072
1073    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1074        self.store.read()
1075    }
1076
1077    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1078        self.store.write()
1079    }
1080}
1081
1082impl Executor for RealExecutor {
1083    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1084        task::spawn(future);
1085    }
1086}
1087
1088fn broadcast<F>(
1089    sender_id: ConnectionId,
1090    receiver_ids: Vec<ConnectionId>,
1091    mut f: F,
1092) -> anyhow::Result<()>
1093where
1094    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1095{
1096    let mut result = Ok(());
1097    for receiver_id in receiver_ids {
1098        if receiver_id != sender_id {
1099            if let Err(error) = f(receiver_id) {
1100                if result.is_ok() {
1101                    result = Err(error);
1102                }
1103            }
1104        }
1105    }
1106    result
1107}
1108
1109pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1110    let server = Server::new(app.state().clone(), rpc.clone(), None);
1111    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1112        let server = server.clone();
1113        async move {
1114            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1115
1116            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1117            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1118            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1119            let client_protocol_version: Option<u32> = request
1120                .header("X-Zed-Protocol-Version")
1121                .and_then(|v| v.as_str().parse().ok());
1122
1123            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1124                return Ok(Response::new(StatusCode::UpgradeRequired));
1125            }
1126
1127            let header = match request.header("Sec-Websocket-Key") {
1128                Some(h) => h.as_str(),
1129                None => return Err(anyhow!("expected sec-websocket-key"))?,
1130            };
1131
1132            let user_id = process_auth_header(&request).await?;
1133
1134            let mut response = Response::new(StatusCode::SwitchingProtocols);
1135            response.insert_header(UPGRADE, "websocket");
1136            response.insert_header(CONNECTION, "Upgrade");
1137            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1138            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1139            response.insert_header("Sec-Websocket-Version", "13");
1140
1141            let http_res: &mut tide::http::Response = response.as_mut();
1142            let upgrade_receiver = http_res.recv_upgrade().await;
1143            let addr = request.remote().unwrap_or("unknown").to_string();
1144            task::spawn(async move {
1145                if let Some(stream) = upgrade_receiver.await {
1146                    server
1147                        .handle_connection(
1148                            Connection::new(
1149                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1150                            ),
1151                            addr,
1152                            user_id,
1153                            None,
1154                            RealExecutor,
1155                        )
1156                        .await;
1157                }
1158            });
1159
1160            Ok(response)
1161        }
1162    });
1163}
1164
1165fn header_contains_ignore_case<T>(
1166    request: &tide::Request<T>,
1167    header_name: HeaderName,
1168    value: &str,
1169) -> bool {
1170    request
1171        .header(header_name)
1172        .map(|h| {
1173            h.as_str()
1174                .split(',')
1175                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1176        })
1177        .unwrap_or(false)
1178}
1179
1180#[cfg(test)]
1181mod tests {
1182    use super::*;
1183    use crate::{
1184        auth,
1185        db::{tests::TestDb, UserId},
1186        github, AppState, Config,
1187    };
1188    use ::rpc::Peer;
1189    use collections::BTreeMap;
1190    use gpui::{executor, ModelHandle, TestAppContext};
1191    use parking_lot::Mutex;
1192    use postage::{sink::Sink, watch};
1193    use rand::prelude::*;
1194    use rpc::PeerId;
1195    use serde_json::json;
1196    use sqlx::types::time::OffsetDateTime;
1197    use std::{
1198        cell::Cell,
1199        env,
1200        ops::Deref,
1201        path::Path,
1202        rc::Rc,
1203        sync::{
1204            atomic::{AtomicBool, Ordering::SeqCst},
1205            Arc,
1206        },
1207        time::Duration,
1208    };
1209    use zed::{
1210        client::{
1211            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1212            EstablishConnectionError, UserStore,
1213        },
1214        editor::{
1215            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1216            Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1217        },
1218        fs::{FakeFs, Fs as _},
1219        language::{
1220            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1221            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1222        },
1223        lsp,
1224        project::{DiagnosticSummary, Project, ProjectPath},
1225        workspace::{Workspace, WorkspaceParams},
1226    };
1227
1228    #[cfg(test)]
1229    #[ctor::ctor]
1230    fn init_logger() {
1231        if std::env::var("RUST_LOG").is_ok() {
1232            env_logger::init();
1233        }
1234    }
1235
1236    #[gpui::test(iterations = 10)]
1237    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1238        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1239        let lang_registry = Arc::new(LanguageRegistry::new());
1240        let fs = FakeFs::new(cx_a.background());
1241        cx_a.foreground().forbid_parking();
1242
1243        // Connect to a server as 2 clients.
1244        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1245        let client_a = server.create_client(&mut cx_a, "user_a").await;
1246        let client_b = server.create_client(&mut cx_b, "user_b").await;
1247
1248        // Share a project as client A
1249        fs.insert_tree(
1250            "/a",
1251            json!({
1252                ".zed.toml": r#"collaborators = ["user_b"]"#,
1253                "a.txt": "a-contents",
1254                "b.txt": "b-contents",
1255            }),
1256        )
1257        .await;
1258        let project_a = cx_a.update(|cx| {
1259            Project::local(
1260                client_a.clone(),
1261                client_a.user_store.clone(),
1262                lang_registry.clone(),
1263                fs.clone(),
1264                cx,
1265            )
1266        });
1267        let (worktree_a, _) = project_a
1268            .update(&mut cx_a, |p, cx| {
1269                p.find_or_create_local_worktree("/a", false, cx)
1270            })
1271            .await
1272            .unwrap();
1273        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1274        worktree_a
1275            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1276            .await;
1277        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1278        project_a
1279            .update(&mut cx_a, |p, cx| p.share(cx))
1280            .await
1281            .unwrap();
1282
1283        // Join that project as client B
1284        let project_b = Project::remote(
1285            project_id,
1286            client_b.clone(),
1287            client_b.user_store.clone(),
1288            lang_registry.clone(),
1289            fs.clone(),
1290            &mut cx_b.to_async(),
1291        )
1292        .await
1293        .unwrap();
1294
1295        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1296            assert_eq!(
1297                project
1298                    .collaborators()
1299                    .get(&client_a.peer_id)
1300                    .unwrap()
1301                    .user
1302                    .github_login,
1303                "user_a"
1304            );
1305            project.replica_id()
1306        });
1307        project_a
1308            .condition(&cx_a, |tree, _| {
1309                tree.collaborators()
1310                    .get(&client_b.peer_id)
1311                    .map_or(false, |collaborator| {
1312                        collaborator.replica_id == replica_id_b
1313                            && collaborator.user.github_login == "user_b"
1314                    })
1315            })
1316            .await;
1317
1318        // Open the same file as client B and client A.
1319        let buffer_b = project_b
1320            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1321            .await
1322            .unwrap();
1323        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1324        buffer_b.read_with(&cx_b, |buf, cx| {
1325            assert_eq!(buf.read(cx).text(), "b-contents")
1326        });
1327        project_a.read_with(&cx_a, |project, cx| {
1328            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1329        });
1330        let buffer_a = project_a
1331            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1332            .await
1333            .unwrap();
1334
1335        let editor_b = cx_b.add_view(window_b, |cx| {
1336            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1337        });
1338
1339        // TODO
1340        // // Create a selection set as client B and see that selection set as client A.
1341        // buffer_a
1342        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1343        //     .await;
1344
1345        // Edit the buffer as client B and see that edit as client A.
1346        editor_b.update(&mut cx_b, |editor, cx| {
1347            editor.handle_input(&Input("ok, ".into()), cx)
1348        });
1349        buffer_a
1350            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1351            .await;
1352
1353        // TODO
1354        // // Remove the selection set as client B, see those selections disappear as client A.
1355        cx_b.update(move |_| drop(editor_b));
1356        // buffer_a
1357        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1358        //     .await;
1359
1360        // Close the buffer as client A, see that the buffer is closed.
1361        cx_a.update(move |_| drop(buffer_a));
1362        project_a
1363            .condition(&cx_a, |project, cx| {
1364                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1365            })
1366            .await;
1367
1368        // Dropping the client B's project removes client B from client A's collaborators.
1369        cx_b.update(move |_| drop(project_b));
1370        project_a
1371            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1372            .await;
1373    }
1374
1375    #[gpui::test(iterations = 10)]
1376    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1377        let lang_registry = Arc::new(LanguageRegistry::new());
1378        let fs = FakeFs::new(cx_a.background());
1379        cx_a.foreground().forbid_parking();
1380
1381        // Connect to a server as 2 clients.
1382        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1383        let client_a = server.create_client(&mut cx_a, "user_a").await;
1384        let client_b = server.create_client(&mut cx_b, "user_b").await;
1385
1386        // Share a project as client A
1387        fs.insert_tree(
1388            "/a",
1389            json!({
1390                ".zed.toml": r#"collaborators = ["user_b"]"#,
1391                "a.txt": "a-contents",
1392                "b.txt": "b-contents",
1393            }),
1394        )
1395        .await;
1396        let project_a = cx_a.update(|cx| {
1397            Project::local(
1398                client_a.clone(),
1399                client_a.user_store.clone(),
1400                lang_registry.clone(),
1401                fs.clone(),
1402                cx,
1403            )
1404        });
1405        let (worktree_a, _) = project_a
1406            .update(&mut cx_a, |p, cx| {
1407                p.find_or_create_local_worktree("/a", false, cx)
1408            })
1409            .await
1410            .unwrap();
1411        worktree_a
1412            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1413            .await;
1414        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1415        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1416        project_a
1417            .update(&mut cx_a, |p, cx| p.share(cx))
1418            .await
1419            .unwrap();
1420        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1421
1422        // Join that project as client B
1423        let project_b = Project::remote(
1424            project_id,
1425            client_b.clone(),
1426            client_b.user_store.clone(),
1427            lang_registry.clone(),
1428            fs.clone(),
1429            &mut cx_b.to_async(),
1430        )
1431        .await
1432        .unwrap();
1433        project_b
1434            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1435            .await
1436            .unwrap();
1437
1438        // Unshare the project as client A
1439        project_a
1440            .update(&mut cx_a, |project, cx| project.unshare(cx))
1441            .await
1442            .unwrap();
1443        project_b
1444            .condition(&mut cx_b, |project, _| project.is_read_only())
1445            .await;
1446        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1447        drop(project_b);
1448
1449        // Share the project again and ensure guests can still join.
1450        project_a
1451            .update(&mut cx_a, |project, cx| project.share(cx))
1452            .await
1453            .unwrap();
1454        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1455
1456        let project_c = Project::remote(
1457            project_id,
1458            client_b.clone(),
1459            client_b.user_store.clone(),
1460            lang_registry.clone(),
1461            fs.clone(),
1462            &mut cx_b.to_async(),
1463        )
1464        .await
1465        .unwrap();
1466        project_c
1467            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1468            .await
1469            .unwrap();
1470    }
1471
1472    #[gpui::test(iterations = 10)]
1473    async fn test_propagate_saves_and_fs_changes(
1474        mut cx_a: TestAppContext,
1475        mut cx_b: TestAppContext,
1476        mut cx_c: TestAppContext,
1477    ) {
1478        let lang_registry = Arc::new(LanguageRegistry::new());
1479        let fs = FakeFs::new(cx_a.background());
1480        cx_a.foreground().forbid_parking();
1481
1482        // Connect to a server as 3 clients.
1483        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1484        let client_a = server.create_client(&mut cx_a, "user_a").await;
1485        let client_b = server.create_client(&mut cx_b, "user_b").await;
1486        let client_c = server.create_client(&mut cx_c, "user_c").await;
1487
1488        // Share a worktree as client A.
1489        fs.insert_tree(
1490            "/a",
1491            json!({
1492                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1493                "file1": "",
1494                "file2": ""
1495            }),
1496        )
1497        .await;
1498        let project_a = cx_a.update(|cx| {
1499            Project::local(
1500                client_a.clone(),
1501                client_a.user_store.clone(),
1502                lang_registry.clone(),
1503                fs.clone(),
1504                cx,
1505            )
1506        });
1507        let (worktree_a, _) = project_a
1508            .update(&mut cx_a, |p, cx| {
1509                p.find_or_create_local_worktree("/a", false, cx)
1510            })
1511            .await
1512            .unwrap();
1513        worktree_a
1514            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1515            .await;
1516        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1517        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1518        project_a
1519            .update(&mut cx_a, |p, cx| p.share(cx))
1520            .await
1521            .unwrap();
1522
1523        // Join that worktree as clients B and C.
1524        let project_b = Project::remote(
1525            project_id,
1526            client_b.clone(),
1527            client_b.user_store.clone(),
1528            lang_registry.clone(),
1529            fs.clone(),
1530            &mut cx_b.to_async(),
1531        )
1532        .await
1533        .unwrap();
1534        let project_c = Project::remote(
1535            project_id,
1536            client_c.clone(),
1537            client_c.user_store.clone(),
1538            lang_registry.clone(),
1539            fs.clone(),
1540            &mut cx_c.to_async(),
1541        )
1542        .await
1543        .unwrap();
1544        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1545        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1546
1547        // Open and edit a buffer as both guests B and C.
1548        let buffer_b = project_b
1549            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1550            .await
1551            .unwrap();
1552        let buffer_c = project_c
1553            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1554            .await
1555            .unwrap();
1556        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1557        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1558
1559        // Open and edit that buffer as the host.
1560        let buffer_a = project_a
1561            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1562            .await
1563            .unwrap();
1564
1565        buffer_a
1566            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1567            .await;
1568        buffer_a.update(&mut cx_a, |buf, cx| {
1569            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1570        });
1571
1572        // Wait for edits to propagate
1573        buffer_a
1574            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1575            .await;
1576        buffer_b
1577            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1578            .await;
1579        buffer_c
1580            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1581            .await;
1582
1583        // Edit the buffer as the host and concurrently save as guest B.
1584        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1585        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1586        save_b.await.unwrap();
1587        assert_eq!(
1588            fs.load("/a/file1".as_ref()).await.unwrap(),
1589            "hi-a, i-am-c, i-am-b, i-am-a"
1590        );
1591        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1592        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1593        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1594
1595        // Make changes on host's file system, see those changes on guest worktrees.
1596        fs.rename(
1597            "/a/file1".as_ref(),
1598            "/a/file1-renamed".as_ref(),
1599            Default::default(),
1600        )
1601        .await
1602        .unwrap();
1603
1604        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1605            .await
1606            .unwrap();
1607        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1608
1609        worktree_a
1610            .condition(&cx_a, |tree, _| {
1611                tree.paths()
1612                    .map(|p| p.to_string_lossy())
1613                    .collect::<Vec<_>>()
1614                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1615            })
1616            .await;
1617        worktree_b
1618            .condition(&cx_b, |tree, _| {
1619                tree.paths()
1620                    .map(|p| p.to_string_lossy())
1621                    .collect::<Vec<_>>()
1622                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1623            })
1624            .await;
1625        worktree_c
1626            .condition(&cx_c, |tree, _| {
1627                tree.paths()
1628                    .map(|p| p.to_string_lossy())
1629                    .collect::<Vec<_>>()
1630                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1631            })
1632            .await;
1633
1634        // Ensure buffer files are updated as well.
1635        buffer_a
1636            .condition(&cx_a, |buf, _| {
1637                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1638            })
1639            .await;
1640        buffer_b
1641            .condition(&cx_b, |buf, _| {
1642                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1643            })
1644            .await;
1645        buffer_c
1646            .condition(&cx_c, |buf, _| {
1647                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1648            })
1649            .await;
1650    }
1651
1652    #[gpui::test(iterations = 10)]
1653    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1654        cx_a.foreground().forbid_parking();
1655        let lang_registry = Arc::new(LanguageRegistry::new());
1656        let fs = FakeFs::new(cx_a.background());
1657
1658        // Connect to a server as 2 clients.
1659        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1660        let client_a = server.create_client(&mut cx_a, "user_a").await;
1661        let client_b = server.create_client(&mut cx_b, "user_b").await;
1662
1663        // Share a project as client A
1664        fs.insert_tree(
1665            "/dir",
1666            json!({
1667                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1668                "a.txt": "a-contents",
1669            }),
1670        )
1671        .await;
1672
1673        let project_a = cx_a.update(|cx| {
1674            Project::local(
1675                client_a.clone(),
1676                client_a.user_store.clone(),
1677                lang_registry.clone(),
1678                fs.clone(),
1679                cx,
1680            )
1681        });
1682        let (worktree_a, _) = project_a
1683            .update(&mut cx_a, |p, cx| {
1684                p.find_or_create_local_worktree("/dir", false, cx)
1685            })
1686            .await
1687            .unwrap();
1688        worktree_a
1689            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1690            .await;
1691        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1692        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1693        project_a
1694            .update(&mut cx_a, |p, cx| p.share(cx))
1695            .await
1696            .unwrap();
1697
1698        // Join that project as client B
1699        let project_b = Project::remote(
1700            project_id,
1701            client_b.clone(),
1702            client_b.user_store.clone(),
1703            lang_registry.clone(),
1704            fs.clone(),
1705            &mut cx_b.to_async(),
1706        )
1707        .await
1708        .unwrap();
1709
1710        // Open a buffer as client B
1711        let buffer_b = project_b
1712            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1713            .await
1714            .unwrap();
1715
1716        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1717        buffer_b.read_with(&cx_b, |buf, _| {
1718            assert!(buf.is_dirty());
1719            assert!(!buf.has_conflict());
1720        });
1721
1722        buffer_b
1723            .update(&mut cx_b, |buf, cx| buf.save(cx))
1724            .await
1725            .unwrap();
1726        buffer_b
1727            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1728            .await;
1729        buffer_b.read_with(&cx_b, |buf, _| {
1730            assert!(!buf.has_conflict());
1731        });
1732
1733        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1734        buffer_b.read_with(&cx_b, |buf, _| {
1735            assert!(buf.is_dirty());
1736            assert!(!buf.has_conflict());
1737        });
1738    }
1739
1740    #[gpui::test(iterations = 10)]
1741    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1742        cx_a.foreground().forbid_parking();
1743        let lang_registry = Arc::new(LanguageRegistry::new());
1744        let fs = FakeFs::new(cx_a.background());
1745
1746        // Connect to a server as 2 clients.
1747        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1748        let client_a = server.create_client(&mut cx_a, "user_a").await;
1749        let client_b = server.create_client(&mut cx_b, "user_b").await;
1750
1751        // Share a project as client A
1752        fs.insert_tree(
1753            "/dir",
1754            json!({
1755                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1756                "a.txt": "a-contents",
1757            }),
1758        )
1759        .await;
1760
1761        let project_a = cx_a.update(|cx| {
1762            Project::local(
1763                client_a.clone(),
1764                client_a.user_store.clone(),
1765                lang_registry.clone(),
1766                fs.clone(),
1767                cx,
1768            )
1769        });
1770        let (worktree_a, _) = project_a
1771            .update(&mut cx_a, |p, cx| {
1772                p.find_or_create_local_worktree("/dir", false, cx)
1773            })
1774            .await
1775            .unwrap();
1776        worktree_a
1777            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1778            .await;
1779        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1780        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1781        project_a
1782            .update(&mut cx_a, |p, cx| p.share(cx))
1783            .await
1784            .unwrap();
1785
1786        // Join that project as client B
1787        let project_b = Project::remote(
1788            project_id,
1789            client_b.clone(),
1790            client_b.user_store.clone(),
1791            lang_registry.clone(),
1792            fs.clone(),
1793            &mut cx_b.to_async(),
1794        )
1795        .await
1796        .unwrap();
1797        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1798
1799        // Open a buffer as client B
1800        let buffer_b = project_b
1801            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1802            .await
1803            .unwrap();
1804        buffer_b.read_with(&cx_b, |buf, _| {
1805            assert!(!buf.is_dirty());
1806            assert!(!buf.has_conflict());
1807        });
1808
1809        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1810            .await
1811            .unwrap();
1812        buffer_b
1813            .condition(&cx_b, |buf, _| {
1814                buf.text() == "new contents" && !buf.is_dirty()
1815            })
1816            .await;
1817        buffer_b.read_with(&cx_b, |buf, _| {
1818            assert!(!buf.has_conflict());
1819        });
1820    }
1821
1822    #[gpui::test(iterations = 10)]
1823    async fn test_editing_while_guest_opens_buffer(
1824        mut cx_a: TestAppContext,
1825        mut cx_b: TestAppContext,
1826    ) {
1827        cx_a.foreground().forbid_parking();
1828        let lang_registry = Arc::new(LanguageRegistry::new());
1829        let fs = FakeFs::new(cx_a.background());
1830
1831        // Connect to a server as 2 clients.
1832        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1833        let client_a = server.create_client(&mut cx_a, "user_a").await;
1834        let client_b = server.create_client(&mut cx_b, "user_b").await;
1835
1836        // Share a project as client A
1837        fs.insert_tree(
1838            "/dir",
1839            json!({
1840                ".zed.toml": r#"collaborators = ["user_b"]"#,
1841                "a.txt": "a-contents",
1842            }),
1843        )
1844        .await;
1845        let project_a = cx_a.update(|cx| {
1846            Project::local(
1847                client_a.clone(),
1848                client_a.user_store.clone(),
1849                lang_registry.clone(),
1850                fs.clone(),
1851                cx,
1852            )
1853        });
1854        let (worktree_a, _) = project_a
1855            .update(&mut cx_a, |p, cx| {
1856                p.find_or_create_local_worktree("/dir", false, cx)
1857            })
1858            .await
1859            .unwrap();
1860        worktree_a
1861            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1862            .await;
1863        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1864        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1865        project_a
1866            .update(&mut cx_a, |p, cx| p.share(cx))
1867            .await
1868            .unwrap();
1869
1870        // Join that project as client B
1871        let project_b = Project::remote(
1872            project_id,
1873            client_b.clone(),
1874            client_b.user_store.clone(),
1875            lang_registry.clone(),
1876            fs.clone(),
1877            &mut cx_b.to_async(),
1878        )
1879        .await
1880        .unwrap();
1881
1882        // Open a buffer as client A
1883        let buffer_a = project_a
1884            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1885            .await
1886            .unwrap();
1887
1888        // Start opening the same buffer as client B
1889        let buffer_b = cx_b
1890            .background()
1891            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1892
1893        // Edit the buffer as client A while client B is still opening it.
1894        cx_b.background().simulate_random_delay().await;
1895        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1896        cx_b.background().simulate_random_delay().await;
1897        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1898
1899        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1900        let buffer_b = buffer_b.await.unwrap();
1901        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1902    }
1903
1904    #[gpui::test(iterations = 10)]
1905    async fn test_leaving_worktree_while_opening_buffer(
1906        mut cx_a: TestAppContext,
1907        mut cx_b: TestAppContext,
1908    ) {
1909        cx_a.foreground().forbid_parking();
1910        let lang_registry = Arc::new(LanguageRegistry::new());
1911        let fs = FakeFs::new(cx_a.background());
1912
1913        // Connect to a server as 2 clients.
1914        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1915        let client_a = server.create_client(&mut cx_a, "user_a").await;
1916        let client_b = server.create_client(&mut cx_b, "user_b").await;
1917
1918        // Share a project as client A
1919        fs.insert_tree(
1920            "/dir",
1921            json!({
1922                ".zed.toml": r#"collaborators = ["user_b"]"#,
1923                "a.txt": "a-contents",
1924            }),
1925        )
1926        .await;
1927        let project_a = cx_a.update(|cx| {
1928            Project::local(
1929                client_a.clone(),
1930                client_a.user_store.clone(),
1931                lang_registry.clone(),
1932                fs.clone(),
1933                cx,
1934            )
1935        });
1936        let (worktree_a, _) = project_a
1937            .update(&mut cx_a, |p, cx| {
1938                p.find_or_create_local_worktree("/dir", false, cx)
1939            })
1940            .await
1941            .unwrap();
1942        worktree_a
1943            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1944            .await;
1945        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1946        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1947        project_a
1948            .update(&mut cx_a, |p, cx| p.share(cx))
1949            .await
1950            .unwrap();
1951
1952        // Join that project as client B
1953        let project_b = Project::remote(
1954            project_id,
1955            client_b.clone(),
1956            client_b.user_store.clone(),
1957            lang_registry.clone(),
1958            fs.clone(),
1959            &mut cx_b.to_async(),
1960        )
1961        .await
1962        .unwrap();
1963
1964        // See that a guest has joined as client A.
1965        project_a
1966            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1967            .await;
1968
1969        // Begin opening a buffer as client B, but leave the project before the open completes.
1970        let buffer_b = cx_b
1971            .background()
1972            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1973        cx_b.update(|_| drop(project_b));
1974        drop(buffer_b);
1975
1976        // See that the guest has left.
1977        project_a
1978            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1979            .await;
1980    }
1981
1982    #[gpui::test(iterations = 10)]
1983    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1984        cx_a.foreground().forbid_parking();
1985        let lang_registry = Arc::new(LanguageRegistry::new());
1986        let fs = FakeFs::new(cx_a.background());
1987
1988        // Connect to a server as 2 clients.
1989        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1990        let client_a = server.create_client(&mut cx_a, "user_a").await;
1991        let client_b = server.create_client(&mut cx_b, "user_b").await;
1992
1993        // Share a project as client A
1994        fs.insert_tree(
1995            "/a",
1996            json!({
1997                ".zed.toml": r#"collaborators = ["user_b"]"#,
1998                "a.txt": "a-contents",
1999                "b.txt": "b-contents",
2000            }),
2001        )
2002        .await;
2003        let project_a = cx_a.update(|cx| {
2004            Project::local(
2005                client_a.clone(),
2006                client_a.user_store.clone(),
2007                lang_registry.clone(),
2008                fs.clone(),
2009                cx,
2010            )
2011        });
2012        let (worktree_a, _) = project_a
2013            .update(&mut cx_a, |p, cx| {
2014                p.find_or_create_local_worktree("/a", false, cx)
2015            })
2016            .await
2017            .unwrap();
2018        worktree_a
2019            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2020            .await;
2021        let project_id = project_a
2022            .update(&mut cx_a, |project, _| project.next_remote_id())
2023            .await;
2024        project_a
2025            .update(&mut cx_a, |project, cx| project.share(cx))
2026            .await
2027            .unwrap();
2028
2029        // Join that project as client B
2030        let _project_b = Project::remote(
2031            project_id,
2032            client_b.clone(),
2033            client_b.user_store.clone(),
2034            lang_registry.clone(),
2035            fs.clone(),
2036            &mut cx_b.to_async(),
2037        )
2038        .await
2039        .unwrap();
2040
2041        // See that a guest has joined as client A.
2042        project_a
2043            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2044            .await;
2045
2046        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2047        client_b.disconnect(&cx_b.to_async()).unwrap();
2048        project_a
2049            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2050            .await;
2051    }
2052
2053    #[gpui::test(iterations = 10)]
2054    async fn test_collaborating_with_diagnostics(
2055        mut cx_a: TestAppContext,
2056        mut cx_b: TestAppContext,
2057    ) {
2058        cx_a.foreground().forbid_parking();
2059        let mut lang_registry = Arc::new(LanguageRegistry::new());
2060        let fs = FakeFs::new(cx_a.background());
2061
2062        // Set up a fake language server.
2063        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2064        Arc::get_mut(&mut lang_registry)
2065            .unwrap()
2066            .add(Arc::new(Language::new(
2067                LanguageConfig {
2068                    name: "Rust".into(),
2069                    path_suffixes: vec!["rs".to_string()],
2070                    language_server: Some(language_server_config),
2071                    ..Default::default()
2072                },
2073                Some(tree_sitter_rust::language()),
2074            )));
2075
2076        // Connect to a server as 2 clients.
2077        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2078        let client_a = server.create_client(&mut cx_a, "user_a").await;
2079        let client_b = server.create_client(&mut cx_b, "user_b").await;
2080
2081        // Share a project as client A
2082        fs.insert_tree(
2083            "/a",
2084            json!({
2085                ".zed.toml": r#"collaborators = ["user_b"]"#,
2086                "a.rs": "let one = two",
2087                "other.rs": "",
2088            }),
2089        )
2090        .await;
2091        let project_a = cx_a.update(|cx| {
2092            Project::local(
2093                client_a.clone(),
2094                client_a.user_store.clone(),
2095                lang_registry.clone(),
2096                fs.clone(),
2097                cx,
2098            )
2099        });
2100        let (worktree_a, _) = project_a
2101            .update(&mut cx_a, |p, cx| {
2102                p.find_or_create_local_worktree("/a", false, cx)
2103            })
2104            .await
2105            .unwrap();
2106        worktree_a
2107            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2108            .await;
2109        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2110        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2111        project_a
2112            .update(&mut cx_a, |p, cx| p.share(cx))
2113            .await
2114            .unwrap();
2115
2116        // Cause the language server to start.
2117        let _ = cx_a
2118            .background()
2119            .spawn(project_a.update(&mut cx_a, |project, cx| {
2120                project.open_buffer(
2121                    ProjectPath {
2122                        worktree_id,
2123                        path: Path::new("other.rs").into(),
2124                    },
2125                    cx,
2126                )
2127            }))
2128            .await
2129            .unwrap();
2130
2131        // Simulate a language server reporting errors for a file.
2132        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2133        fake_language_server
2134            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2135                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2136                version: None,
2137                diagnostics: vec![lsp::Diagnostic {
2138                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2139                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2140                    message: "message 1".to_string(),
2141                    ..Default::default()
2142                }],
2143            })
2144            .await;
2145
2146        // Wait for server to see the diagnostics update.
2147        server
2148            .condition(|store| {
2149                let worktree = store
2150                    .project(project_id)
2151                    .unwrap()
2152                    .worktrees
2153                    .get(&worktree_id.to_proto())
2154                    .unwrap();
2155
2156                !worktree
2157                    .share
2158                    .as_ref()
2159                    .unwrap()
2160                    .diagnostic_summaries
2161                    .is_empty()
2162            })
2163            .await;
2164
2165        // Join the worktree as client B.
2166        let project_b = Project::remote(
2167            project_id,
2168            client_b.clone(),
2169            client_b.user_store.clone(),
2170            lang_registry.clone(),
2171            fs.clone(),
2172            &mut cx_b.to_async(),
2173        )
2174        .await
2175        .unwrap();
2176
2177        project_b.read_with(&cx_b, |project, cx| {
2178            assert_eq!(
2179                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2180                &[(
2181                    ProjectPath {
2182                        worktree_id,
2183                        path: Arc::from(Path::new("a.rs")),
2184                    },
2185                    DiagnosticSummary {
2186                        error_count: 1,
2187                        warning_count: 0,
2188                        ..Default::default()
2189                    },
2190                )]
2191            )
2192        });
2193
2194        // Simulate a language server reporting more errors for a file.
2195        fake_language_server
2196            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2197                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2198                version: None,
2199                diagnostics: vec![
2200                    lsp::Diagnostic {
2201                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2202                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2203                        message: "message 1".to_string(),
2204                        ..Default::default()
2205                    },
2206                    lsp::Diagnostic {
2207                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2208                        range: lsp::Range::new(
2209                            lsp::Position::new(0, 10),
2210                            lsp::Position::new(0, 13),
2211                        ),
2212                        message: "message 2".to_string(),
2213                        ..Default::default()
2214                    },
2215                ],
2216            })
2217            .await;
2218
2219        // Client b gets the updated summaries
2220        project_b
2221            .condition(&cx_b, |project, cx| {
2222                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2223                    == &[(
2224                        ProjectPath {
2225                            worktree_id,
2226                            path: Arc::from(Path::new("a.rs")),
2227                        },
2228                        DiagnosticSummary {
2229                            error_count: 1,
2230                            warning_count: 1,
2231                            ..Default::default()
2232                        },
2233                    )]
2234            })
2235            .await;
2236
2237        // Open the file with the errors on client B. They should be present.
2238        let buffer_b = cx_b
2239            .background()
2240            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2241            .await
2242            .unwrap();
2243
2244        buffer_b.read_with(&cx_b, |buffer, _| {
2245            assert_eq!(
2246                buffer
2247                    .snapshot()
2248                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2249                    .map(|entry| entry)
2250                    .collect::<Vec<_>>(),
2251                &[
2252                    DiagnosticEntry {
2253                        range: Point::new(0, 4)..Point::new(0, 7),
2254                        diagnostic: Diagnostic {
2255                            group_id: 0,
2256                            message: "message 1".to_string(),
2257                            severity: lsp::DiagnosticSeverity::ERROR,
2258                            is_primary: true,
2259                            ..Default::default()
2260                        }
2261                    },
2262                    DiagnosticEntry {
2263                        range: Point::new(0, 10)..Point::new(0, 13),
2264                        diagnostic: Diagnostic {
2265                            group_id: 1,
2266                            severity: lsp::DiagnosticSeverity::WARNING,
2267                            message: "message 2".to_string(),
2268                            is_primary: true,
2269                            ..Default::default()
2270                        }
2271                    }
2272                ]
2273            );
2274        });
2275    }
2276
2277    #[gpui::test(iterations = 10)]
2278    async fn test_collaborating_with_completion(
2279        mut cx_a: TestAppContext,
2280        mut cx_b: TestAppContext,
2281    ) {
2282        cx_a.foreground().forbid_parking();
2283        let mut lang_registry = Arc::new(LanguageRegistry::new());
2284        let fs = FakeFs::new(cx_a.background());
2285
2286        // Set up a fake language server.
2287        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2288        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2289            completion_provider: Some(lsp::CompletionOptions {
2290                trigger_characters: Some(vec![".".to_string()]),
2291                ..Default::default()
2292            }),
2293            ..Default::default()
2294        });
2295        Arc::get_mut(&mut lang_registry)
2296            .unwrap()
2297            .add(Arc::new(Language::new(
2298                LanguageConfig {
2299                    name: "Rust".into(),
2300                    path_suffixes: vec!["rs".to_string()],
2301                    language_server: Some(language_server_config),
2302                    ..Default::default()
2303                },
2304                Some(tree_sitter_rust::language()),
2305            )));
2306
2307        // Connect to a server as 2 clients.
2308        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2309        let client_a = server.create_client(&mut cx_a, "user_a").await;
2310        let client_b = server.create_client(&mut cx_b, "user_b").await;
2311
2312        // Share a project as client A
2313        fs.insert_tree(
2314            "/a",
2315            json!({
2316                ".zed.toml": r#"collaborators = ["user_b"]"#,
2317                "main.rs": "fn main() { a }",
2318                "other.rs": "",
2319            }),
2320        )
2321        .await;
2322        let project_a = cx_a.update(|cx| {
2323            Project::local(
2324                client_a.clone(),
2325                client_a.user_store.clone(),
2326                lang_registry.clone(),
2327                fs.clone(),
2328                cx,
2329            )
2330        });
2331        let (worktree_a, _) = project_a
2332            .update(&mut cx_a, |p, cx| {
2333                p.find_or_create_local_worktree("/a", false, cx)
2334            })
2335            .await
2336            .unwrap();
2337        worktree_a
2338            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2339            .await;
2340        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2341        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2342        project_a
2343            .update(&mut cx_a, |p, cx| p.share(cx))
2344            .await
2345            .unwrap();
2346
2347        // Join the worktree as client B.
2348        let project_b = Project::remote(
2349            project_id,
2350            client_b.clone(),
2351            client_b.user_store.clone(),
2352            lang_registry.clone(),
2353            fs.clone(),
2354            &mut cx_b.to_async(),
2355        )
2356        .await
2357        .unwrap();
2358
2359        // Open a file in an editor as the guest.
2360        let buffer_b = project_b
2361            .update(&mut cx_b, |p, cx| {
2362                p.open_buffer((worktree_id, "main.rs"), cx)
2363            })
2364            .await
2365            .unwrap();
2366        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2367        let editor_b = cx_b.add_view(window_b, |cx| {
2368            Editor::for_buffer(
2369                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2370                Arc::new(|cx| EditorSettings::test(cx)),
2371                Some(project_b.clone()),
2372                cx,
2373            )
2374        });
2375
2376        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2377        buffer_b
2378            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2379            .await;
2380
2381        // Type a completion trigger character as the guest.
2382        editor_b.update(&mut cx_b, |editor, cx| {
2383            editor.select_ranges([13..13], None, cx);
2384            editor.handle_input(&Input(".".into()), cx);
2385            cx.focus(&editor_b);
2386        });
2387
2388        // Receive a completion request as the host's language server.
2389        // Return some completions from the host's language server.
2390        cx_a.foreground().start_waiting();
2391        fake_language_server
2392            .handle_request::<lsp::request::Completion, _>(|params, _| {
2393                assert_eq!(
2394                    params.text_document_position.text_document.uri,
2395                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2396                );
2397                assert_eq!(
2398                    params.text_document_position.position,
2399                    lsp::Position::new(0, 14),
2400                );
2401
2402                Some(lsp::CompletionResponse::Array(vec![
2403                    lsp::CompletionItem {
2404                        label: "first_method(…)".into(),
2405                        detail: Some("fn(&mut self, B) -> C".into()),
2406                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2407                            new_text: "first_method($1)".to_string(),
2408                            range: lsp::Range::new(
2409                                lsp::Position::new(0, 14),
2410                                lsp::Position::new(0, 14),
2411                            ),
2412                        })),
2413                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2414                        ..Default::default()
2415                    },
2416                    lsp::CompletionItem {
2417                        label: "second_method(…)".into(),
2418                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2419                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2420                            new_text: "second_method()".to_string(),
2421                            range: lsp::Range::new(
2422                                lsp::Position::new(0, 14),
2423                                lsp::Position::new(0, 14),
2424                            ),
2425                        })),
2426                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2427                        ..Default::default()
2428                    },
2429                ]))
2430            })
2431            .next()
2432            .await
2433            .unwrap();
2434        cx_a.foreground().finish_waiting();
2435
2436        // Open the buffer on the host.
2437        let buffer_a = project_a
2438            .update(&mut cx_a, |p, cx| {
2439                p.open_buffer((worktree_id, "main.rs"), cx)
2440            })
2441            .await
2442            .unwrap();
2443        buffer_a
2444            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2445            .await;
2446
2447        // Confirm a completion on the guest.
2448        editor_b
2449            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2450            .await;
2451        editor_b.update(&mut cx_b, |editor, cx| {
2452            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2453            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2454        });
2455
2456        // Return a resolved completion from the host's language server.
2457        // The resolved completion has an additional text edit.
2458        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2459            |params, _| {
2460                assert_eq!(params.label, "first_method(…)");
2461                lsp::CompletionItem {
2462                    label: "first_method(…)".into(),
2463                    detail: Some("fn(&mut self, B) -> C".into()),
2464                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2465                        new_text: "first_method($1)".to_string(),
2466                        range: lsp::Range::new(
2467                            lsp::Position::new(0, 14),
2468                            lsp::Position::new(0, 14),
2469                        ),
2470                    })),
2471                    additional_text_edits: Some(vec![lsp::TextEdit {
2472                        new_text: "use d::SomeTrait;\n".to_string(),
2473                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2474                    }]),
2475                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2476                    ..Default::default()
2477                }
2478            },
2479        );
2480
2481        // The additional edit is applied.
2482        buffer_a
2483            .condition(&cx_a, |buffer, _| {
2484                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2485            })
2486            .await;
2487        buffer_b
2488            .condition(&cx_b, |buffer, _| {
2489                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2490            })
2491            .await;
2492    }
2493
2494    #[gpui::test(iterations = 10)]
2495    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2496        cx_a.foreground().forbid_parking();
2497        let mut lang_registry = Arc::new(LanguageRegistry::new());
2498        let fs = FakeFs::new(cx_a.background());
2499
2500        // Set up a fake language server.
2501        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2502        Arc::get_mut(&mut lang_registry)
2503            .unwrap()
2504            .add(Arc::new(Language::new(
2505                LanguageConfig {
2506                    name: "Rust".into(),
2507                    path_suffixes: vec!["rs".to_string()],
2508                    language_server: Some(language_server_config),
2509                    ..Default::default()
2510                },
2511                Some(tree_sitter_rust::language()),
2512            )));
2513
2514        // Connect to a server as 2 clients.
2515        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2516        let client_a = server.create_client(&mut cx_a, "user_a").await;
2517        let client_b = server.create_client(&mut cx_b, "user_b").await;
2518
2519        // Share a project as client A
2520        fs.insert_tree(
2521            "/a",
2522            json!({
2523                ".zed.toml": r#"collaborators = ["user_b"]"#,
2524                "a.rs": "let one = two",
2525            }),
2526        )
2527        .await;
2528        let project_a = cx_a.update(|cx| {
2529            Project::local(
2530                client_a.clone(),
2531                client_a.user_store.clone(),
2532                lang_registry.clone(),
2533                fs.clone(),
2534                cx,
2535            )
2536        });
2537        let (worktree_a, _) = project_a
2538            .update(&mut cx_a, |p, cx| {
2539                p.find_or_create_local_worktree("/a", false, cx)
2540            })
2541            .await
2542            .unwrap();
2543        worktree_a
2544            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2545            .await;
2546        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2547        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2548        project_a
2549            .update(&mut cx_a, |p, cx| p.share(cx))
2550            .await
2551            .unwrap();
2552
2553        // Join the worktree as client B.
2554        let project_b = Project::remote(
2555            project_id,
2556            client_b.clone(),
2557            client_b.user_store.clone(),
2558            lang_registry.clone(),
2559            fs.clone(),
2560            &mut cx_b.to_async(),
2561        )
2562        .await
2563        .unwrap();
2564
2565        let buffer_b = cx_b
2566            .background()
2567            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2568            .await
2569            .unwrap();
2570
2571        let format = project_b.update(&mut cx_b, |project, cx| {
2572            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2573        });
2574
2575        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2576        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2577            Some(vec![
2578                lsp::TextEdit {
2579                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2580                    new_text: "h".to_string(),
2581                },
2582                lsp::TextEdit {
2583                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2584                    new_text: "y".to_string(),
2585                },
2586            ])
2587        });
2588
2589        format.await.unwrap();
2590        assert_eq!(
2591            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2592            "let honey = two"
2593        );
2594    }
2595
2596    #[gpui::test(iterations = 10)]
2597    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2598        cx_a.foreground().forbid_parking();
2599        let mut lang_registry = Arc::new(LanguageRegistry::new());
2600        let fs = FakeFs::new(cx_a.background());
2601        fs.insert_tree(
2602            "/root-1",
2603            json!({
2604                ".zed.toml": r#"collaborators = ["user_b"]"#,
2605                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2606            }),
2607        )
2608        .await;
2609        fs.insert_tree(
2610            "/root-2",
2611            json!({
2612                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2613            }),
2614        )
2615        .await;
2616
2617        // Set up a fake language server.
2618        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2619        Arc::get_mut(&mut lang_registry)
2620            .unwrap()
2621            .add(Arc::new(Language::new(
2622                LanguageConfig {
2623                    name: "Rust".into(),
2624                    path_suffixes: vec!["rs".to_string()],
2625                    language_server: Some(language_server_config),
2626                    ..Default::default()
2627                },
2628                Some(tree_sitter_rust::language()),
2629            )));
2630
2631        // Connect to a server as 2 clients.
2632        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2633        let client_a = server.create_client(&mut cx_a, "user_a").await;
2634        let client_b = server.create_client(&mut cx_b, "user_b").await;
2635
2636        // Share a project as client A
2637        let project_a = cx_a.update(|cx| {
2638            Project::local(
2639                client_a.clone(),
2640                client_a.user_store.clone(),
2641                lang_registry.clone(),
2642                fs.clone(),
2643                cx,
2644            )
2645        });
2646        let (worktree_a, _) = project_a
2647            .update(&mut cx_a, |p, cx| {
2648                p.find_or_create_local_worktree("/root-1", false, cx)
2649            })
2650            .await
2651            .unwrap();
2652        worktree_a
2653            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2654            .await;
2655        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2656        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2657        project_a
2658            .update(&mut cx_a, |p, cx| p.share(cx))
2659            .await
2660            .unwrap();
2661
2662        // Join the worktree as client B.
2663        let project_b = Project::remote(
2664            project_id,
2665            client_b.clone(),
2666            client_b.user_store.clone(),
2667            lang_registry.clone(),
2668            fs.clone(),
2669            &mut cx_b.to_async(),
2670        )
2671        .await
2672        .unwrap();
2673
2674        // Open the file on client B.
2675        let buffer_b = cx_b
2676            .background()
2677            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2678            .await
2679            .unwrap();
2680
2681        // Request the definition of a symbol as the guest.
2682        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2683
2684        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2685        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2686            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2687                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2688                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2689            )))
2690        });
2691
2692        let definitions_1 = definitions_1.await.unwrap();
2693        cx_b.read(|cx| {
2694            assert_eq!(definitions_1.len(), 1);
2695            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2696            let target_buffer = definitions_1[0].buffer.read(cx);
2697            assert_eq!(
2698                target_buffer.text(),
2699                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2700            );
2701            assert_eq!(
2702                definitions_1[0].range.to_point(target_buffer),
2703                Point::new(0, 6)..Point::new(0, 9)
2704            );
2705        });
2706
2707        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2708        // the previous call to `definition`.
2709        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2710        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2711            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2712                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2713                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2714            )))
2715        });
2716
2717        let definitions_2 = definitions_2.await.unwrap();
2718        cx_b.read(|cx| {
2719            assert_eq!(definitions_2.len(), 1);
2720            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2721            let target_buffer = definitions_2[0].buffer.read(cx);
2722            assert_eq!(
2723                target_buffer.text(),
2724                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2725            );
2726            assert_eq!(
2727                definitions_2[0].range.to_point(target_buffer),
2728                Point::new(1, 6)..Point::new(1, 11)
2729            );
2730        });
2731        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2732
2733        cx_b.update(|_| {
2734            drop(definitions_1);
2735            drop(definitions_2);
2736        });
2737        project_b
2738            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2739            .await;
2740    }
2741
2742    #[gpui::test(iterations = 10)]
2743    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2744        cx_a.foreground().forbid_parking();
2745        let mut lang_registry = Arc::new(LanguageRegistry::new());
2746        let fs = FakeFs::new(cx_a.background());
2747        fs.insert_tree(
2748            "/root-1",
2749            json!({
2750                ".zed.toml": r#"collaborators = ["user_b"]"#,
2751                "one.rs": "const ONE: usize = 1;",
2752                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2753            }),
2754        )
2755        .await;
2756        fs.insert_tree(
2757            "/root-2",
2758            json!({
2759                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2760            }),
2761        )
2762        .await;
2763
2764        // Set up a fake language server.
2765        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2766        Arc::get_mut(&mut lang_registry)
2767            .unwrap()
2768            .add(Arc::new(Language::new(
2769                LanguageConfig {
2770                    name: "Rust".into(),
2771                    path_suffixes: vec!["rs".to_string()],
2772                    language_server: Some(language_server_config),
2773                    ..Default::default()
2774                },
2775                Some(tree_sitter_rust::language()),
2776            )));
2777
2778        // Connect to a server as 2 clients.
2779        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2780        let client_a = server.create_client(&mut cx_a, "user_a").await;
2781        let client_b = server.create_client(&mut cx_b, "user_b").await;
2782
2783        // Share a project as client A
2784        let project_a = cx_a.update(|cx| {
2785            Project::local(
2786                client_a.clone(),
2787                client_a.user_store.clone(),
2788                lang_registry.clone(),
2789                fs.clone(),
2790                cx,
2791            )
2792        });
2793        let (worktree_a, _) = project_a
2794            .update(&mut cx_a, |p, cx| {
2795                p.find_or_create_local_worktree("/root-1", false, cx)
2796            })
2797            .await
2798            .unwrap();
2799        worktree_a
2800            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2801            .await;
2802        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2803        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2804        project_a
2805            .update(&mut cx_a, |p, cx| p.share(cx))
2806            .await
2807            .unwrap();
2808
2809        // Join the worktree as client B.
2810        let project_b = Project::remote(
2811            project_id,
2812            client_b.clone(),
2813            client_b.user_store.clone(),
2814            lang_registry.clone(),
2815            fs.clone(),
2816            &mut cx_b.to_async(),
2817        )
2818        .await
2819        .unwrap();
2820
2821        // Open the file on client B.
2822        let buffer_b = cx_b
2823            .background()
2824            .spawn(project_b.update(&mut cx_b, |p, cx| {
2825                p.open_buffer((worktree_id, "one.rs"), cx)
2826            }))
2827            .await
2828            .unwrap();
2829
2830        // Request references to a symbol as the guest.
2831        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2832
2833        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2834        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2835            assert_eq!(
2836                params.text_document_position.text_document.uri.as_str(),
2837                "file:///root-1/one.rs"
2838            );
2839            Some(vec![
2840                lsp::Location {
2841                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2842                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2843                },
2844                lsp::Location {
2845                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2846                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2847                },
2848                lsp::Location {
2849                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2850                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2851                },
2852            ])
2853        });
2854
2855        let references = references.await.unwrap();
2856        cx_b.read(|cx| {
2857            assert_eq!(references.len(), 3);
2858            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2859
2860            let two_buffer = references[0].buffer.read(cx);
2861            let three_buffer = references[2].buffer.read(cx);
2862            assert_eq!(
2863                two_buffer.file().unwrap().path().as_ref(),
2864                Path::new("two.rs")
2865            );
2866            assert_eq!(references[1].buffer, references[0].buffer);
2867            assert_eq!(
2868                three_buffer.file().unwrap().full_path(cx),
2869                Path::new("three.rs")
2870            );
2871
2872            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2873            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2874            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2875        });
2876    }
2877
2878    #[gpui::test(iterations = 10)]
2879    async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2880        cx_a.foreground().forbid_parking();
2881        let lang_registry = Arc::new(LanguageRegistry::new());
2882        let fs = FakeFs::new(cx_a.background());
2883        fs.insert_tree(
2884            "/root-1",
2885            json!({
2886                ".zed.toml": r#"collaborators = ["user_b"]"#,
2887                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2888            }),
2889        )
2890        .await;
2891
2892        // Set up a fake language server.
2893        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2894        lang_registry.add(Arc::new(Language::new(
2895            LanguageConfig {
2896                name: "Rust".into(),
2897                path_suffixes: vec!["rs".to_string()],
2898                language_server: Some(language_server_config),
2899                ..Default::default()
2900            },
2901            Some(tree_sitter_rust::language()),
2902        )));
2903
2904        // Connect to a server as 2 clients.
2905        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2906        let client_a = server.create_client(&mut cx_a, "user_a").await;
2907        let client_b = server.create_client(&mut cx_b, "user_b").await;
2908
2909        // Share a project as client A
2910        let project_a = cx_a.update(|cx| {
2911            Project::local(
2912                client_a.clone(),
2913                client_a.user_store.clone(),
2914                lang_registry.clone(),
2915                fs.clone(),
2916                cx,
2917            )
2918        });
2919        let (worktree_a, _) = project_a
2920            .update(&mut cx_a, |p, cx| {
2921                p.find_or_create_local_worktree("/root-1", false, cx)
2922            })
2923            .await
2924            .unwrap();
2925        worktree_a
2926            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2927            .await;
2928        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2929        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2930        project_a
2931            .update(&mut cx_a, |p, cx| p.share(cx))
2932            .await
2933            .unwrap();
2934
2935        // Join the worktree as client B.
2936        let project_b = Project::remote(
2937            project_id,
2938            client_b.clone(),
2939            client_b.user_store.clone(),
2940            lang_registry.clone(),
2941            fs.clone(),
2942            &mut cx_b.to_async(),
2943        )
2944        .await
2945        .unwrap();
2946
2947        // Open the file on client B.
2948        let buffer_b = cx_b
2949            .background()
2950            .spawn(project_b.update(&mut cx_b, |p, cx| {
2951                p.open_buffer((worktree_id, "main.rs"), cx)
2952            }))
2953            .await
2954            .unwrap();
2955
2956        // Request document highlights as the guest.
2957        let highlights =
2958            project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2959
2960        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2961        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2962            |params, _| {
2963                assert_eq!(
2964                    params
2965                        .text_document_position_params
2966                        .text_document
2967                        .uri
2968                        .as_str(),
2969                    "file:///root-1/main.rs"
2970                );
2971                assert_eq!(
2972                    params.text_document_position_params.position,
2973                    lsp::Position::new(0, 34)
2974                );
2975                Some(vec![
2976                    lsp::DocumentHighlight {
2977                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2978                        range: lsp::Range::new(
2979                            lsp::Position::new(0, 10),
2980                            lsp::Position::new(0, 16),
2981                        ),
2982                    },
2983                    lsp::DocumentHighlight {
2984                        kind: Some(lsp::DocumentHighlightKind::READ),
2985                        range: lsp::Range::new(
2986                            lsp::Position::new(0, 32),
2987                            lsp::Position::new(0, 38),
2988                        ),
2989                    },
2990                    lsp::DocumentHighlight {
2991                        kind: Some(lsp::DocumentHighlightKind::READ),
2992                        range: lsp::Range::new(
2993                            lsp::Position::new(0, 41),
2994                            lsp::Position::new(0, 47),
2995                        ),
2996                    },
2997                ])
2998            },
2999        );
3000
3001        let highlights = highlights.await.unwrap();
3002        buffer_b.read_with(&cx_b, |buffer, _| {
3003            let snapshot = buffer.snapshot();
3004
3005            let highlights = highlights
3006                .into_iter()
3007                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3008                .collect::<Vec<_>>();
3009            assert_eq!(
3010                highlights,
3011                &[
3012                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3013                    (lsp::DocumentHighlightKind::READ, 32..38),
3014                    (lsp::DocumentHighlightKind::READ, 41..47)
3015                ]
3016            )
3017        });
3018    }
3019
3020    #[gpui::test(iterations = 10)]
3021    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3022        cx_a.foreground().forbid_parking();
3023        let mut lang_registry = Arc::new(LanguageRegistry::new());
3024        let fs = FakeFs::new(cx_a.background());
3025        fs.insert_tree(
3026            "/code",
3027            json!({
3028                "crate-1": {
3029                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3030                    "one.rs": "const ONE: usize = 1;",
3031                },
3032                "crate-2": {
3033                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3034                },
3035                "private": {
3036                    "passwords.txt": "the-password",
3037                }
3038            }),
3039        )
3040        .await;
3041
3042        // Set up a fake language server.
3043        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3044        Arc::get_mut(&mut lang_registry)
3045            .unwrap()
3046            .add(Arc::new(Language::new(
3047                LanguageConfig {
3048                    name: "Rust".into(),
3049                    path_suffixes: vec!["rs".to_string()],
3050                    language_server: Some(language_server_config),
3051                    ..Default::default()
3052                },
3053                Some(tree_sitter_rust::language()),
3054            )));
3055
3056        // Connect to a server as 2 clients.
3057        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3058        let client_a = server.create_client(&mut cx_a, "user_a").await;
3059        let client_b = server.create_client(&mut cx_b, "user_b").await;
3060
3061        // Share a project as client A
3062        let project_a = cx_a.update(|cx| {
3063            Project::local(
3064                client_a.clone(),
3065                client_a.user_store.clone(),
3066                lang_registry.clone(),
3067                fs.clone(),
3068                cx,
3069            )
3070        });
3071        let (worktree_a, _) = project_a
3072            .update(&mut cx_a, |p, cx| {
3073                p.find_or_create_local_worktree("/code/crate-1", false, cx)
3074            })
3075            .await
3076            .unwrap();
3077        worktree_a
3078            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3079            .await;
3080        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3081        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3082        project_a
3083            .update(&mut cx_a, |p, cx| p.share(cx))
3084            .await
3085            .unwrap();
3086
3087        // Join the worktree as client B.
3088        let project_b = Project::remote(
3089            project_id,
3090            client_b.clone(),
3091            client_b.user_store.clone(),
3092            lang_registry.clone(),
3093            fs.clone(),
3094            &mut cx_b.to_async(),
3095        )
3096        .await
3097        .unwrap();
3098
3099        // Cause the language server to start.
3100        let _buffer = cx_b
3101            .background()
3102            .spawn(project_b.update(&mut cx_b, |p, cx| {
3103                p.open_buffer((worktree_id, "one.rs"), cx)
3104            }))
3105            .await
3106            .unwrap();
3107
3108        // Request the definition of a symbol as the guest.
3109        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3110        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3111        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3112            #[allow(deprecated)]
3113            Some(vec![lsp::SymbolInformation {
3114                name: "TWO".into(),
3115                location: lsp::Location {
3116                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3117                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3118                },
3119                kind: lsp::SymbolKind::CONSTANT,
3120                tags: None,
3121                container_name: None,
3122                deprecated: None,
3123            }])
3124        });
3125
3126        let symbols = symbols.await.unwrap();
3127        assert_eq!(symbols.len(), 1);
3128        assert_eq!(symbols[0].name, "TWO");
3129
3130        // Open one of the returned symbols.
3131        let buffer_b_2 = project_b
3132            .update(&mut cx_b, |project, cx| {
3133                project.open_buffer_for_symbol(&symbols[0], cx)
3134            })
3135            .await
3136            .unwrap();
3137        buffer_b_2.read_with(&cx_b, |buffer, _| {
3138            assert_eq!(
3139                buffer.file().unwrap().path().as_ref(),
3140                Path::new("../crate-2/two.rs")
3141            );
3142        });
3143
3144        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3145        let mut fake_symbol = symbols[0].clone();
3146        fake_symbol.path = Path::new("/code/secrets").into();
3147        let error = project_b
3148            .update(&mut cx_b, |project, cx| {
3149                project.open_buffer_for_symbol(&fake_symbol, cx)
3150            })
3151            .await
3152            .unwrap_err();
3153        assert!(error.to_string().contains("invalid symbol signature"));
3154    }
3155
3156    #[gpui::test(iterations = 10)]
3157    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3158        mut cx_a: TestAppContext,
3159        mut cx_b: TestAppContext,
3160        mut rng: StdRng,
3161    ) {
3162        cx_a.foreground().forbid_parking();
3163        let mut lang_registry = Arc::new(LanguageRegistry::new());
3164        let fs = FakeFs::new(cx_a.background());
3165        fs.insert_tree(
3166            "/root",
3167            json!({
3168                ".zed.toml": r#"collaborators = ["user_b"]"#,
3169                "a.rs": "const ONE: usize = b::TWO;",
3170                "b.rs": "const TWO: usize = 2",
3171            }),
3172        )
3173        .await;
3174
3175        // Set up a fake language server.
3176        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3177
3178        Arc::get_mut(&mut lang_registry)
3179            .unwrap()
3180            .add(Arc::new(Language::new(
3181                LanguageConfig {
3182                    name: "Rust".into(),
3183                    path_suffixes: vec!["rs".to_string()],
3184                    language_server: Some(language_server_config),
3185                    ..Default::default()
3186                },
3187                Some(tree_sitter_rust::language()),
3188            )));
3189
3190        // Connect to a server as 2 clients.
3191        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3192        let client_a = server.create_client(&mut cx_a, "user_a").await;
3193        let client_b = server.create_client(&mut cx_b, "user_b").await;
3194
3195        // Share a project as client A
3196        let project_a = cx_a.update(|cx| {
3197            Project::local(
3198                client_a.clone(),
3199                client_a.user_store.clone(),
3200                lang_registry.clone(),
3201                fs.clone(),
3202                cx,
3203            )
3204        });
3205
3206        let (worktree_a, _) = project_a
3207            .update(&mut cx_a, |p, cx| {
3208                p.find_or_create_local_worktree("/root", false, cx)
3209            })
3210            .await
3211            .unwrap();
3212        worktree_a
3213            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3214            .await;
3215        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3216        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3217        project_a
3218            .update(&mut cx_a, |p, cx| p.share(cx))
3219            .await
3220            .unwrap();
3221
3222        // Join the worktree as client B.
3223        let project_b = Project::remote(
3224            project_id,
3225            client_b.clone(),
3226            client_b.user_store.clone(),
3227            lang_registry.clone(),
3228            fs.clone(),
3229            &mut cx_b.to_async(),
3230        )
3231        .await
3232        .unwrap();
3233
3234        let buffer_b1 = cx_b
3235            .background()
3236            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3237            .await
3238            .unwrap();
3239
3240        let definitions;
3241        let buffer_b2;
3242        if rng.gen() {
3243            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3244            buffer_b2 =
3245                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3246        } else {
3247            buffer_b2 =
3248                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3249            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3250        }
3251
3252        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3253        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3254            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3255                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3256                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3257            )))
3258        });
3259
3260        let buffer_b2 = buffer_b2.await.unwrap();
3261        let definitions = definitions.await.unwrap();
3262        assert_eq!(definitions.len(), 1);
3263        assert_eq!(definitions[0].buffer, buffer_b2);
3264    }
3265
3266    #[gpui::test(iterations = 10)]
3267    async fn test_collaborating_with_code_actions(
3268        mut cx_a: TestAppContext,
3269        mut cx_b: TestAppContext,
3270    ) {
3271        cx_a.foreground().forbid_parking();
3272        let mut lang_registry = Arc::new(LanguageRegistry::new());
3273        let fs = FakeFs::new(cx_a.background());
3274        let mut path_openers_b = Vec::new();
3275        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3276
3277        // Set up a fake language server.
3278        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3279        Arc::get_mut(&mut lang_registry)
3280            .unwrap()
3281            .add(Arc::new(Language::new(
3282                LanguageConfig {
3283                    name: "Rust".into(),
3284                    path_suffixes: vec!["rs".to_string()],
3285                    language_server: Some(language_server_config),
3286                    ..Default::default()
3287                },
3288                Some(tree_sitter_rust::language()),
3289            )));
3290
3291        // Connect to a server as 2 clients.
3292        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3293        let client_a = server.create_client(&mut cx_a, "user_a").await;
3294        let client_b = server.create_client(&mut cx_b, "user_b").await;
3295
3296        // Share a project as client A
3297        fs.insert_tree(
3298            "/a",
3299            json!({
3300                ".zed.toml": r#"collaborators = ["user_b"]"#,
3301                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3302                "other.rs": "pub fn foo() -> usize { 4 }",
3303            }),
3304        )
3305        .await;
3306        let project_a = cx_a.update(|cx| {
3307            Project::local(
3308                client_a.clone(),
3309                client_a.user_store.clone(),
3310                lang_registry.clone(),
3311                fs.clone(),
3312                cx,
3313            )
3314        });
3315        let (worktree_a, _) = project_a
3316            .update(&mut cx_a, |p, cx| {
3317                p.find_or_create_local_worktree("/a", false, cx)
3318            })
3319            .await
3320            .unwrap();
3321        worktree_a
3322            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3323            .await;
3324        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3325        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3326        project_a
3327            .update(&mut cx_a, |p, cx| p.share(cx))
3328            .await
3329            .unwrap();
3330
3331        // Join the worktree as client B.
3332        let project_b = Project::remote(
3333            project_id,
3334            client_b.clone(),
3335            client_b.user_store.clone(),
3336            lang_registry.clone(),
3337            fs.clone(),
3338            &mut cx_b.to_async(),
3339        )
3340        .await
3341        .unwrap();
3342        let mut params = cx_b.update(WorkspaceParams::test);
3343        params.languages = lang_registry.clone();
3344        params.client = client_b.client.clone();
3345        params.user_store = client_b.user_store.clone();
3346        params.project = project_b;
3347        params.path_openers = path_openers_b.into();
3348
3349        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3350        let editor_b = workspace_b
3351            .update(&mut cx_b, |workspace, cx| {
3352                workspace.open_path((worktree_id, "main.rs").into(), cx)
3353            })
3354            .await
3355            .unwrap()
3356            .downcast::<Editor>()
3357            .unwrap();
3358
3359        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3360        fake_language_server
3361            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3362                assert_eq!(
3363                    params.text_document.uri,
3364                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3365                );
3366                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3367                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3368                None
3369            })
3370            .next()
3371            .await;
3372
3373        // Move cursor to a location that contains code actions.
3374        editor_b.update(&mut cx_b, |editor, cx| {
3375            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3376            cx.focus(&editor_b);
3377        });
3378
3379        fake_language_server
3380            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3381                assert_eq!(
3382                    params.text_document.uri,
3383                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3384                );
3385                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3386                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3387
3388                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3389                    lsp::CodeAction {
3390                        title: "Inline into all callers".to_string(),
3391                        edit: Some(lsp::WorkspaceEdit {
3392                            changes: Some(
3393                                [
3394                                    (
3395                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3396                                        vec![lsp::TextEdit::new(
3397                                            lsp::Range::new(
3398                                                lsp::Position::new(1, 22),
3399                                                lsp::Position::new(1, 34),
3400                                            ),
3401                                            "4".to_string(),
3402                                        )],
3403                                    ),
3404                                    (
3405                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3406                                        vec![lsp::TextEdit::new(
3407                                            lsp::Range::new(
3408                                                lsp::Position::new(0, 0),
3409                                                lsp::Position::new(0, 27),
3410                                            ),
3411                                            "".to_string(),
3412                                        )],
3413                                    ),
3414                                ]
3415                                .into_iter()
3416                                .collect(),
3417                            ),
3418                            ..Default::default()
3419                        }),
3420                        data: Some(json!({
3421                            "codeActionParams": {
3422                                "range": {
3423                                    "start": {"line": 1, "column": 31},
3424                                    "end": {"line": 1, "column": 31},
3425                                }
3426                            }
3427                        })),
3428                        ..Default::default()
3429                    },
3430                )])
3431            })
3432            .next()
3433            .await;
3434
3435        // Toggle code actions and wait for them to display.
3436        editor_b.update(&mut cx_b, |editor, cx| {
3437            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3438        });
3439        editor_b
3440            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3441            .await;
3442
3443        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3444
3445        // Confirming the code action will trigger a resolve request.
3446        let confirm_action = workspace_b
3447            .update(&mut cx_b, |workspace, cx| {
3448                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3449            })
3450            .unwrap();
3451        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3452            lsp::CodeAction {
3453                title: "Inline into all callers".to_string(),
3454                edit: Some(lsp::WorkspaceEdit {
3455                    changes: Some(
3456                        [
3457                            (
3458                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3459                                vec![lsp::TextEdit::new(
3460                                    lsp::Range::new(
3461                                        lsp::Position::new(1, 22),
3462                                        lsp::Position::new(1, 34),
3463                                    ),
3464                                    "4".to_string(),
3465                                )],
3466                            ),
3467                            (
3468                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3469                                vec![lsp::TextEdit::new(
3470                                    lsp::Range::new(
3471                                        lsp::Position::new(0, 0),
3472                                        lsp::Position::new(0, 27),
3473                                    ),
3474                                    "".to_string(),
3475                                )],
3476                            ),
3477                        ]
3478                        .into_iter()
3479                        .collect(),
3480                    ),
3481                    ..Default::default()
3482                }),
3483                ..Default::default()
3484            }
3485        });
3486
3487        // After the action is confirmed, an editor containing both modified files is opened.
3488        confirm_action.await.unwrap();
3489        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3490            workspace
3491                .active_item(cx)
3492                .unwrap()
3493                .downcast::<Editor>()
3494                .unwrap()
3495        });
3496        code_action_editor.update(&mut cx_b, |editor, cx| {
3497            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3498            editor.undo(&Undo, cx);
3499            assert_eq!(
3500                editor.text(cx),
3501                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3502            );
3503            editor.redo(&Redo, cx);
3504            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3505        });
3506    }
3507
3508    #[gpui::test(iterations = 10)]
3509    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3510        cx_a.foreground().forbid_parking();
3511        let mut lang_registry = Arc::new(LanguageRegistry::new());
3512        let fs = FakeFs::new(cx_a.background());
3513        let mut path_openers_b = Vec::new();
3514        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3515
3516        // Set up a fake language server.
3517        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3518        Arc::get_mut(&mut lang_registry)
3519            .unwrap()
3520            .add(Arc::new(Language::new(
3521                LanguageConfig {
3522                    name: "Rust".into(),
3523                    path_suffixes: vec!["rs".to_string()],
3524                    language_server: Some(language_server_config),
3525                    ..Default::default()
3526                },
3527                Some(tree_sitter_rust::language()),
3528            )));
3529
3530        // Connect to a server as 2 clients.
3531        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3532        let client_a = server.create_client(&mut cx_a, "user_a").await;
3533        let client_b = server.create_client(&mut cx_b, "user_b").await;
3534
3535        // Share a project as client A
3536        fs.insert_tree(
3537            "/dir",
3538            json!({
3539                ".zed.toml": r#"collaborators = ["user_b"]"#,
3540                "one.rs": "const ONE: usize = 1;",
3541                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3542            }),
3543        )
3544        .await;
3545        let project_a = cx_a.update(|cx| {
3546            Project::local(
3547                client_a.clone(),
3548                client_a.user_store.clone(),
3549                lang_registry.clone(),
3550                fs.clone(),
3551                cx,
3552            )
3553        });
3554        let (worktree_a, _) = project_a
3555            .update(&mut cx_a, |p, cx| {
3556                p.find_or_create_local_worktree("/dir", false, cx)
3557            })
3558            .await
3559            .unwrap();
3560        worktree_a
3561            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3562            .await;
3563        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3564        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3565        project_a
3566            .update(&mut cx_a, |p, cx| p.share(cx))
3567            .await
3568            .unwrap();
3569
3570        // Join the worktree as client B.
3571        let project_b = Project::remote(
3572            project_id,
3573            client_b.clone(),
3574            client_b.user_store.clone(),
3575            lang_registry.clone(),
3576            fs.clone(),
3577            &mut cx_b.to_async(),
3578        )
3579        .await
3580        .unwrap();
3581        let mut params = cx_b.update(WorkspaceParams::test);
3582        params.languages = lang_registry.clone();
3583        params.client = client_b.client.clone();
3584        params.user_store = client_b.user_store.clone();
3585        params.project = project_b;
3586        params.path_openers = path_openers_b.into();
3587
3588        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3589        let editor_b = workspace_b
3590            .update(&mut cx_b, |workspace, cx| {
3591                workspace.open_path((worktree_id, "one.rs").into(), cx)
3592            })
3593            .await
3594            .unwrap()
3595            .downcast::<Editor>()
3596            .unwrap();
3597        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3598
3599        // Move cursor to a location that can be renamed.
3600        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3601            editor.select_ranges([7..7], None, cx);
3602            editor.rename(&Rename, cx).unwrap()
3603        });
3604
3605        fake_language_server
3606            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3607                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3608                assert_eq!(params.position, lsp::Position::new(0, 7));
3609                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3610                    lsp::Position::new(0, 6),
3611                    lsp::Position::new(0, 9),
3612                )))
3613            })
3614            .next()
3615            .await
3616            .unwrap();
3617        prepare_rename.await.unwrap();
3618        editor_b.update(&mut cx_b, |editor, cx| {
3619            let rename = editor.pending_rename().unwrap();
3620            let buffer = editor.buffer().read(cx).snapshot(cx);
3621            assert_eq!(
3622                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3623                6..9
3624            );
3625            rename.editor.update(cx, |rename_editor, cx| {
3626                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3627                    rename_buffer.edit([0..3], "THREE", cx);
3628                });
3629            });
3630        });
3631
3632        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3633            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3634        });
3635        fake_language_server
3636            .handle_request::<lsp::request::Rename, _>(|params, _| {
3637                assert_eq!(
3638                    params.text_document_position.text_document.uri.as_str(),
3639                    "file:///dir/one.rs"
3640                );
3641                assert_eq!(
3642                    params.text_document_position.position,
3643                    lsp::Position::new(0, 6)
3644                );
3645                assert_eq!(params.new_name, "THREE");
3646                Some(lsp::WorkspaceEdit {
3647                    changes: Some(
3648                        [
3649                            (
3650                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3651                                vec![lsp::TextEdit::new(
3652                                    lsp::Range::new(
3653                                        lsp::Position::new(0, 6),
3654                                        lsp::Position::new(0, 9),
3655                                    ),
3656                                    "THREE".to_string(),
3657                                )],
3658                            ),
3659                            (
3660                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3661                                vec![
3662                                    lsp::TextEdit::new(
3663                                        lsp::Range::new(
3664                                            lsp::Position::new(0, 24),
3665                                            lsp::Position::new(0, 27),
3666                                        ),
3667                                        "THREE".to_string(),
3668                                    ),
3669                                    lsp::TextEdit::new(
3670                                        lsp::Range::new(
3671                                            lsp::Position::new(0, 35),
3672                                            lsp::Position::new(0, 38),
3673                                        ),
3674                                        "THREE".to_string(),
3675                                    ),
3676                                ],
3677                            ),
3678                        ]
3679                        .into_iter()
3680                        .collect(),
3681                    ),
3682                    ..Default::default()
3683                })
3684            })
3685            .next()
3686            .await
3687            .unwrap();
3688        confirm_rename.await.unwrap();
3689
3690        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3691            workspace
3692                .active_item(cx)
3693                .unwrap()
3694                .downcast::<Editor>()
3695                .unwrap()
3696        });
3697        rename_editor.update(&mut cx_b, |editor, cx| {
3698            assert_eq!(
3699                editor.text(cx),
3700                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3701            );
3702            editor.undo(&Undo, cx);
3703            assert_eq!(
3704                editor.text(cx),
3705                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3706            );
3707            editor.redo(&Redo, cx);
3708            assert_eq!(
3709                editor.text(cx),
3710                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3711            );
3712        });
3713
3714        // Ensure temporary rename edits cannot be undone/redone.
3715        editor_b.update(&mut cx_b, |editor, cx| {
3716            editor.undo(&Undo, cx);
3717            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3718            editor.undo(&Undo, cx);
3719            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3720            editor.redo(&Redo, cx);
3721            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3722        })
3723    }
3724
3725    #[gpui::test(iterations = 10)]
3726    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3727        cx_a.foreground().forbid_parking();
3728
3729        // Connect to a server as 2 clients.
3730        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3731        let client_a = server.create_client(&mut cx_a, "user_a").await;
3732        let client_b = server.create_client(&mut cx_b, "user_b").await;
3733
3734        // Create an org that includes these 2 users.
3735        let db = &server.app_state.db;
3736        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3737        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3738            .await
3739            .unwrap();
3740        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3741            .await
3742            .unwrap();
3743
3744        // Create a channel that includes all the users.
3745        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3746        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3747            .await
3748            .unwrap();
3749        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3750            .await
3751            .unwrap();
3752        db.create_channel_message(
3753            channel_id,
3754            client_b.current_user_id(&cx_b),
3755            "hello A, it's B.",
3756            OffsetDateTime::now_utc(),
3757            1,
3758        )
3759        .await
3760        .unwrap();
3761
3762        let channels_a = cx_a
3763            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3764        channels_a
3765            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3766            .await;
3767        channels_a.read_with(&cx_a, |list, _| {
3768            assert_eq!(
3769                list.available_channels().unwrap(),
3770                &[ChannelDetails {
3771                    id: channel_id.to_proto(),
3772                    name: "test-channel".to_string()
3773                }]
3774            )
3775        });
3776        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3777            this.get_channel(channel_id.to_proto(), cx).unwrap()
3778        });
3779        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3780        channel_a
3781            .condition(&cx_a, |channel, _| {
3782                channel_messages(channel)
3783                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3784            })
3785            .await;
3786
3787        let channels_b = cx_b
3788            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3789        channels_b
3790            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3791            .await;
3792        channels_b.read_with(&cx_b, |list, _| {
3793            assert_eq!(
3794                list.available_channels().unwrap(),
3795                &[ChannelDetails {
3796                    id: channel_id.to_proto(),
3797                    name: "test-channel".to_string()
3798                }]
3799            )
3800        });
3801
3802        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3803            this.get_channel(channel_id.to_proto(), cx).unwrap()
3804        });
3805        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3806        channel_b
3807            .condition(&cx_b, |channel, _| {
3808                channel_messages(channel)
3809                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3810            })
3811            .await;
3812
3813        channel_a
3814            .update(&mut cx_a, |channel, cx| {
3815                channel
3816                    .send_message("oh, hi B.".to_string(), cx)
3817                    .unwrap()
3818                    .detach();
3819                let task = channel.send_message("sup".to_string(), cx).unwrap();
3820                assert_eq!(
3821                    channel_messages(channel),
3822                    &[
3823                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3824                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3825                        ("user_a".to_string(), "sup".to_string(), true)
3826                    ]
3827                );
3828                task
3829            })
3830            .await
3831            .unwrap();
3832
3833        channel_b
3834            .condition(&cx_b, |channel, _| {
3835                channel_messages(channel)
3836                    == [
3837                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3838                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3839                        ("user_a".to_string(), "sup".to_string(), false),
3840                    ]
3841            })
3842            .await;
3843
3844        assert_eq!(
3845            server
3846                .state()
3847                .await
3848                .channel(channel_id)
3849                .unwrap()
3850                .connection_ids
3851                .len(),
3852            2
3853        );
3854        cx_b.update(|_| drop(channel_b));
3855        server
3856            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3857            .await;
3858
3859        cx_a.update(|_| drop(channel_a));
3860        server
3861            .condition(|state| state.channel(channel_id).is_none())
3862            .await;
3863    }
3864
3865    #[gpui::test(iterations = 10)]
3866    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3867        cx_a.foreground().forbid_parking();
3868
3869        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3870        let client_a = server.create_client(&mut cx_a, "user_a").await;
3871
3872        let db = &server.app_state.db;
3873        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3874        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3875        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3876            .await
3877            .unwrap();
3878        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3879            .await
3880            .unwrap();
3881
3882        let channels_a = cx_a
3883            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3884        channels_a
3885            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3886            .await;
3887        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3888            this.get_channel(channel_id.to_proto(), cx).unwrap()
3889        });
3890
3891        // Messages aren't allowed to be too long.
3892        channel_a
3893            .update(&mut cx_a, |channel, cx| {
3894                let long_body = "this is long.\n".repeat(1024);
3895                channel.send_message(long_body, cx).unwrap()
3896            })
3897            .await
3898            .unwrap_err();
3899
3900        // Messages aren't allowed to be blank.
3901        channel_a.update(&mut cx_a, |channel, cx| {
3902            channel.send_message(String::new(), cx).unwrap_err()
3903        });
3904
3905        // Leading and trailing whitespace are trimmed.
3906        channel_a
3907            .update(&mut cx_a, |channel, cx| {
3908                channel
3909                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3910                    .unwrap()
3911            })
3912            .await
3913            .unwrap();
3914        assert_eq!(
3915            db.get_channel_messages(channel_id, 10, None)
3916                .await
3917                .unwrap()
3918                .iter()
3919                .map(|m| &m.body)
3920                .collect::<Vec<_>>(),
3921            &["surrounded by whitespace"]
3922        );
3923    }
3924
3925    #[gpui::test(iterations = 10)]
3926    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3927        cx_a.foreground().forbid_parking();
3928
3929        // Connect to a server as 2 clients.
3930        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3931        let client_a = server.create_client(&mut cx_a, "user_a").await;
3932        let client_b = server.create_client(&mut cx_b, "user_b").await;
3933        let mut status_b = client_b.status();
3934
3935        // Create an org that includes these 2 users.
3936        let db = &server.app_state.db;
3937        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3938        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3939            .await
3940            .unwrap();
3941        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3942            .await
3943            .unwrap();
3944
3945        // Create a channel that includes all the users.
3946        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3947        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3948            .await
3949            .unwrap();
3950        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3951            .await
3952            .unwrap();
3953        db.create_channel_message(
3954            channel_id,
3955            client_b.current_user_id(&cx_b),
3956            "hello A, it's B.",
3957            OffsetDateTime::now_utc(),
3958            2,
3959        )
3960        .await
3961        .unwrap();
3962
3963        let channels_a = cx_a
3964            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3965        channels_a
3966            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3967            .await;
3968
3969        channels_a.read_with(&cx_a, |list, _| {
3970            assert_eq!(
3971                list.available_channels().unwrap(),
3972                &[ChannelDetails {
3973                    id: channel_id.to_proto(),
3974                    name: "test-channel".to_string()
3975                }]
3976            )
3977        });
3978        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3979            this.get_channel(channel_id.to_proto(), cx).unwrap()
3980        });
3981        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3982        channel_a
3983            .condition(&cx_a, |channel, _| {
3984                channel_messages(channel)
3985                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3986            })
3987            .await;
3988
3989        let channels_b = cx_b
3990            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3991        channels_b
3992            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3993            .await;
3994        channels_b.read_with(&cx_b, |list, _| {
3995            assert_eq!(
3996                list.available_channels().unwrap(),
3997                &[ChannelDetails {
3998                    id: channel_id.to_proto(),
3999                    name: "test-channel".to_string()
4000                }]
4001            )
4002        });
4003
4004        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
4005            this.get_channel(channel_id.to_proto(), cx).unwrap()
4006        });
4007        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
4008        channel_b
4009            .condition(&cx_b, |channel, _| {
4010                channel_messages(channel)
4011                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4012            })
4013            .await;
4014
4015        // Disconnect client B, ensuring we can still access its cached channel data.
4016        server.forbid_connections();
4017        server.disconnect_client(client_b.current_user_id(&cx_b));
4018        while !matches!(
4019            status_b.next().await,
4020            Some(client::Status::ReconnectionError { .. })
4021        ) {}
4022
4023        channels_b.read_with(&cx_b, |channels, _| {
4024            assert_eq!(
4025                channels.available_channels().unwrap(),
4026                [ChannelDetails {
4027                    id: channel_id.to_proto(),
4028                    name: "test-channel".to_string()
4029                }]
4030            )
4031        });
4032        channel_b.read_with(&cx_b, |channel, _| {
4033            assert_eq!(
4034                channel_messages(channel),
4035                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4036            )
4037        });
4038
4039        // Send a message from client B while it is disconnected.
4040        channel_b
4041            .update(&mut cx_b, |channel, cx| {
4042                let task = channel
4043                    .send_message("can you see this?".to_string(), cx)
4044                    .unwrap();
4045                assert_eq!(
4046                    channel_messages(channel),
4047                    &[
4048                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4049                        ("user_b".to_string(), "can you see this?".to_string(), true)
4050                    ]
4051                );
4052                task
4053            })
4054            .await
4055            .unwrap_err();
4056
4057        // Send a message from client A while B is disconnected.
4058        channel_a
4059            .update(&mut cx_a, |channel, cx| {
4060                channel
4061                    .send_message("oh, hi B.".to_string(), cx)
4062                    .unwrap()
4063                    .detach();
4064                let task = channel.send_message("sup".to_string(), cx).unwrap();
4065                assert_eq!(
4066                    channel_messages(channel),
4067                    &[
4068                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4069                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4070                        ("user_a".to_string(), "sup".to_string(), true)
4071                    ]
4072                );
4073                task
4074            })
4075            .await
4076            .unwrap();
4077
4078        // Give client B a chance to reconnect.
4079        server.allow_connections();
4080        cx_b.foreground().advance_clock(Duration::from_secs(10));
4081
4082        // Verify that B sees the new messages upon reconnection, as well as the message client B
4083        // sent while offline.
4084        channel_b
4085            .condition(&cx_b, |channel, _| {
4086                channel_messages(channel)
4087                    == [
4088                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4089                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4090                        ("user_a".to_string(), "sup".to_string(), false),
4091                        ("user_b".to_string(), "can you see this?".to_string(), false),
4092                    ]
4093            })
4094            .await;
4095
4096        // Ensure client A and B can communicate normally after reconnection.
4097        channel_a
4098            .update(&mut cx_a, |channel, cx| {
4099                channel.send_message("you online?".to_string(), cx).unwrap()
4100            })
4101            .await
4102            .unwrap();
4103        channel_b
4104            .condition(&cx_b, |channel, _| {
4105                channel_messages(channel)
4106                    == [
4107                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4108                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4109                        ("user_a".to_string(), "sup".to_string(), false),
4110                        ("user_b".to_string(), "can you see this?".to_string(), false),
4111                        ("user_a".to_string(), "you online?".to_string(), false),
4112                    ]
4113            })
4114            .await;
4115
4116        channel_b
4117            .update(&mut cx_b, |channel, cx| {
4118                channel.send_message("yep".to_string(), cx).unwrap()
4119            })
4120            .await
4121            .unwrap();
4122        channel_a
4123            .condition(&cx_a, |channel, _| {
4124                channel_messages(channel)
4125                    == [
4126                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4127                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4128                        ("user_a".to_string(), "sup".to_string(), false),
4129                        ("user_b".to_string(), "can you see this?".to_string(), false),
4130                        ("user_a".to_string(), "you online?".to_string(), false),
4131                        ("user_b".to_string(), "yep".to_string(), false),
4132                    ]
4133            })
4134            .await;
4135    }
4136
4137    #[gpui::test(iterations = 10)]
4138    async fn test_contacts(
4139        mut cx_a: TestAppContext,
4140        mut cx_b: TestAppContext,
4141        mut cx_c: TestAppContext,
4142    ) {
4143        cx_a.foreground().forbid_parking();
4144        let lang_registry = Arc::new(LanguageRegistry::new());
4145        let fs = FakeFs::new(cx_a.background());
4146
4147        // Connect to a server as 3 clients.
4148        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4149        let client_a = server.create_client(&mut cx_a, "user_a").await;
4150        let client_b = server.create_client(&mut cx_b, "user_b").await;
4151        let client_c = server.create_client(&mut cx_c, "user_c").await;
4152
4153        // Share a worktree as client A.
4154        fs.insert_tree(
4155            "/a",
4156            json!({
4157                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4158            }),
4159        )
4160        .await;
4161
4162        let project_a = cx_a.update(|cx| {
4163            Project::local(
4164                client_a.clone(),
4165                client_a.user_store.clone(),
4166                lang_registry.clone(),
4167                fs.clone(),
4168                cx,
4169            )
4170        });
4171        let (worktree_a, _) = project_a
4172            .update(&mut cx_a, |p, cx| {
4173                p.find_or_create_local_worktree("/a", false, cx)
4174            })
4175            .await
4176            .unwrap();
4177        worktree_a
4178            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4179            .await;
4180
4181        client_a
4182            .user_store
4183            .condition(&cx_a, |user_store, _| {
4184                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4185            })
4186            .await;
4187        client_b
4188            .user_store
4189            .condition(&cx_b, |user_store, _| {
4190                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4191            })
4192            .await;
4193        client_c
4194            .user_store
4195            .condition(&cx_c, |user_store, _| {
4196                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4197            })
4198            .await;
4199
4200        let project_id = project_a
4201            .update(&mut cx_a, |project, _| project.next_remote_id())
4202            .await;
4203        project_a
4204            .update(&mut cx_a, |project, cx| project.share(cx))
4205            .await
4206            .unwrap();
4207
4208        let _project_b = Project::remote(
4209            project_id,
4210            client_b.clone(),
4211            client_b.user_store.clone(),
4212            lang_registry.clone(),
4213            fs.clone(),
4214            &mut cx_b.to_async(),
4215        )
4216        .await
4217        .unwrap();
4218
4219        client_a
4220            .user_store
4221            .condition(&cx_a, |user_store, _| {
4222                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4223            })
4224            .await;
4225        client_b
4226            .user_store
4227            .condition(&cx_b, |user_store, _| {
4228                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4229            })
4230            .await;
4231        client_c
4232            .user_store
4233            .condition(&cx_c, |user_store, _| {
4234                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4235            })
4236            .await;
4237
4238        project_a
4239            .condition(&cx_a, |project, _| {
4240                project.collaborators().contains_key(&client_b.peer_id)
4241            })
4242            .await;
4243
4244        cx_a.update(move |_| drop(project_a));
4245        client_a
4246            .user_store
4247            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4248            .await;
4249        client_b
4250            .user_store
4251            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4252            .await;
4253        client_c
4254            .user_store
4255            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4256            .await;
4257
4258        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4259            user_store
4260                .contacts()
4261                .iter()
4262                .map(|contact| {
4263                    let worktrees = contact
4264                        .projects
4265                        .iter()
4266                        .map(|p| {
4267                            (
4268                                p.worktree_root_names[0].as_str(),
4269                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4270                            )
4271                        })
4272                        .collect();
4273                    (contact.user.github_login.as_str(), worktrees)
4274                })
4275                .collect()
4276        }
4277    }
4278
4279    #[gpui::test(iterations = 100)]
4280    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4281        cx.foreground().forbid_parking();
4282        let max_peers = env::var("MAX_PEERS")
4283            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4284            .unwrap_or(5);
4285        let max_operations = env::var("OPERATIONS")
4286            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4287            .unwrap_or(10);
4288
4289        let rng = Arc::new(Mutex::new(rng));
4290
4291        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4292        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4293
4294        let fs = FakeFs::new(cx.background());
4295        fs.insert_tree(
4296            "/_collab",
4297            json!({
4298                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4299            }),
4300        )
4301        .await;
4302
4303        let operations = Rc::new(Cell::new(0));
4304        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4305        let mut clients = Vec::new();
4306
4307        let mut next_entity_id = 100000;
4308        let mut host_cx = TestAppContext::new(
4309            cx.foreground_platform(),
4310            cx.platform(),
4311            cx.foreground(),
4312            cx.background(),
4313            cx.font_cache(),
4314            next_entity_id,
4315        );
4316        let host = server.create_client(&mut host_cx, "host").await;
4317        let host_project = host_cx.update(|cx| {
4318            Project::local(
4319                host.client.clone(),
4320                host.user_store.clone(),
4321                Arc::new(LanguageRegistry::new()),
4322                fs.clone(),
4323                cx,
4324            )
4325        });
4326        let host_project_id = host_project
4327            .update(&mut host_cx, |p, _| p.next_remote_id())
4328            .await;
4329
4330        let (collab_worktree, _) = host_project
4331            .update(&mut host_cx, |project, cx| {
4332                project.find_or_create_local_worktree("/_collab", false, cx)
4333            })
4334            .await
4335            .unwrap();
4336        collab_worktree
4337            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4338            .await;
4339        host_project
4340            .update(&mut host_cx, |project, cx| project.share(cx))
4341            .await
4342            .unwrap();
4343
4344        clients.push(cx.foreground().spawn(host.simulate_host(
4345            host_project.clone(),
4346            language_server_config,
4347            operations.clone(),
4348            max_operations,
4349            rng.clone(),
4350            host_cx.clone(),
4351        )));
4352
4353        while operations.get() < max_operations {
4354            cx.background().simulate_random_delay().await;
4355            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4356                operations.set(operations.get() + 1);
4357
4358                let guest_id = clients.len();
4359                log::info!("Adding guest {}", guest_id);
4360                next_entity_id += 100000;
4361                let mut guest_cx = TestAppContext::new(
4362                    cx.foreground_platform(),
4363                    cx.platform(),
4364                    cx.foreground(),
4365                    cx.background(),
4366                    cx.font_cache(),
4367                    next_entity_id,
4368                );
4369                let guest = server
4370                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4371                    .await;
4372                let guest_project = Project::remote(
4373                    host_project_id,
4374                    guest.client.clone(),
4375                    guest.user_store.clone(),
4376                    guest_lang_registry.clone(),
4377                    fs.clone(),
4378                    &mut guest_cx.to_async(),
4379                )
4380                .await
4381                .unwrap();
4382                clients.push(cx.foreground().spawn(guest.simulate_guest(
4383                    guest_id,
4384                    guest_project,
4385                    operations.clone(),
4386                    max_operations,
4387                    rng.clone(),
4388                    guest_cx,
4389                )));
4390
4391                log::info!("Guest {} added", guest_id);
4392            }
4393        }
4394
4395        let clients = futures::future::join_all(clients).await;
4396        cx.foreground().run_until_parked();
4397
4398        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4399            project
4400                .worktrees(cx)
4401                .map(|worktree| {
4402                    let snapshot = worktree.read(cx).snapshot();
4403                    (snapshot.id(), snapshot)
4404                })
4405                .collect::<BTreeMap<_, _>>()
4406        });
4407
4408        for (guest_client, guest_cx) in clients.iter().skip(1) {
4409            let guest_id = guest_client.client.id();
4410            let worktree_snapshots =
4411                guest_client
4412                    .project
4413                    .as_ref()
4414                    .unwrap()
4415                    .read_with(guest_cx, |project, cx| {
4416                        project
4417                            .worktrees(cx)
4418                            .map(|worktree| {
4419                                let worktree = worktree.read(cx);
4420                                assert!(
4421                                    !worktree.as_remote().unwrap().has_pending_updates(),
4422                                    "Guest {} worktree {:?} contains deferred updates",
4423                                    guest_id,
4424                                    worktree.id()
4425                                );
4426                                (worktree.id(), worktree.snapshot())
4427                            })
4428                            .collect::<BTreeMap<_, _>>()
4429                    });
4430
4431            assert_eq!(
4432                worktree_snapshots.keys().collect::<Vec<_>>(),
4433                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4434                "guest {} has different worktrees than the host",
4435                guest_id
4436            );
4437            for (id, host_snapshot) in &host_worktree_snapshots {
4438                let guest_snapshot = &worktree_snapshots[id];
4439                assert_eq!(
4440                    guest_snapshot.root_name(),
4441                    host_snapshot.root_name(),
4442                    "guest {} has different root name than the host for worktree {}",
4443                    guest_id,
4444                    id
4445                );
4446                assert_eq!(
4447                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4448                    host_snapshot.entries(false).collect::<Vec<_>>(),
4449                    "guest {} has different snapshot than the host for worktree {}",
4450                    guest_id,
4451                    id
4452                );
4453            }
4454
4455            guest_client
4456                .project
4457                .as_ref()
4458                .unwrap()
4459                .read_with(guest_cx, |project, _| {
4460                    assert!(
4461                        !project.has_buffered_operations(),
4462                        "guest {} has buffered operations ",
4463                        guest_id,
4464                    );
4465                });
4466
4467            for guest_buffer in &guest_client.buffers {
4468                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4469                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4470                    project
4471                        .shared_buffer(guest_client.peer_id, buffer_id)
4472                        .expect(&format!(
4473                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4474                            guest_id, guest_client.peer_id, buffer_id
4475                        ))
4476                });
4477                assert_eq!(
4478                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4479                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4480                    "guest {} buffer {} differs from the host's buffer",
4481                    guest_id,
4482                    buffer_id,
4483                );
4484            }
4485        }
4486    }
4487
4488    struct TestServer {
4489        peer: Arc<Peer>,
4490        app_state: Arc<AppState>,
4491        server: Arc<Server>,
4492        foreground: Rc<executor::Foreground>,
4493        notifications: mpsc::UnboundedReceiver<()>,
4494        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4495        forbid_connections: Arc<AtomicBool>,
4496        _test_db: TestDb,
4497    }
4498
4499    impl TestServer {
4500        async fn start(
4501            foreground: Rc<executor::Foreground>,
4502            background: Arc<executor::Background>,
4503        ) -> Self {
4504            let test_db = TestDb::fake(background);
4505            let app_state = Self::build_app_state(&test_db).await;
4506            let peer = Peer::new();
4507            let notifications = mpsc::unbounded();
4508            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4509            Self {
4510                peer,
4511                app_state,
4512                server,
4513                foreground,
4514                notifications: notifications.1,
4515                connection_killers: Default::default(),
4516                forbid_connections: Default::default(),
4517                _test_db: test_db,
4518            }
4519        }
4520
4521        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4522            let http = FakeHttpClient::with_404_response();
4523            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4524            let client_name = name.to_string();
4525            let mut client = Client::new(http.clone());
4526            let server = self.server.clone();
4527            let connection_killers = self.connection_killers.clone();
4528            let forbid_connections = self.forbid_connections.clone();
4529            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4530
4531            Arc::get_mut(&mut client)
4532                .unwrap()
4533                .override_authenticate(move |cx| {
4534                    cx.spawn(|_| async move {
4535                        let access_token = "the-token".to_string();
4536                        Ok(Credentials {
4537                            user_id: user_id.0 as u64,
4538                            access_token,
4539                        })
4540                    })
4541                })
4542                .override_establish_connection(move |credentials, cx| {
4543                    assert_eq!(credentials.user_id, user_id.0 as u64);
4544                    assert_eq!(credentials.access_token, "the-token");
4545
4546                    let server = server.clone();
4547                    let connection_killers = connection_killers.clone();
4548                    let forbid_connections = forbid_connections.clone();
4549                    let client_name = client_name.clone();
4550                    let connection_id_tx = connection_id_tx.clone();
4551                    cx.spawn(move |cx| async move {
4552                        if forbid_connections.load(SeqCst) {
4553                            Err(EstablishConnectionError::other(anyhow!(
4554                                "server is forbidding connections"
4555                            )))
4556                        } else {
4557                            let (client_conn, server_conn, kill_conn) =
4558                                Connection::in_memory(cx.background());
4559                            connection_killers.lock().insert(user_id, kill_conn);
4560                            cx.background()
4561                                .spawn(server.handle_connection(
4562                                    server_conn,
4563                                    client_name,
4564                                    user_id,
4565                                    Some(connection_id_tx),
4566                                    cx.background(),
4567                                ))
4568                                .detach();
4569                            Ok(client_conn)
4570                        }
4571                    })
4572                });
4573
4574            client
4575                .authenticate_and_connect(&cx.to_async())
4576                .await
4577                .unwrap();
4578
4579            Channel::init(&client);
4580            Project::init(&client);
4581
4582            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4583            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4584            let mut authed_user =
4585                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4586            while authed_user.next().await.unwrap().is_none() {}
4587
4588            TestClient {
4589                client,
4590                peer_id,
4591                user_store,
4592                project: Default::default(),
4593                buffers: Default::default(),
4594            }
4595        }
4596
4597        fn disconnect_client(&self, user_id: UserId) {
4598            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4599                let _ = kill_conn.try_send(Some(()));
4600            }
4601        }
4602
4603        fn forbid_connections(&self) {
4604            self.forbid_connections.store(true, SeqCst);
4605        }
4606
4607        fn allow_connections(&self) {
4608            self.forbid_connections.store(false, SeqCst);
4609        }
4610
4611        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4612            let mut config = Config::default();
4613            config.session_secret = "a".repeat(32);
4614            config.database_url = test_db.url.clone();
4615            let github_client = github::AppClient::test();
4616            Arc::new(AppState {
4617                db: test_db.db().clone(),
4618                handlebars: Default::default(),
4619                auth_client: auth::build_client("", ""),
4620                repo_client: github::RepoClient::test(&github_client),
4621                github_client,
4622                config,
4623            })
4624        }
4625
4626        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4627            self.server.store.read()
4628        }
4629
4630        async fn condition<F>(&mut self, mut predicate: F)
4631        where
4632            F: FnMut(&Store) -> bool,
4633        {
4634            async_std::future::timeout(Duration::from_millis(500), async {
4635                while !(predicate)(&*self.server.store.read()) {
4636                    self.foreground.start_waiting();
4637                    self.notifications.next().await;
4638                    self.foreground.finish_waiting();
4639                }
4640            })
4641            .await
4642            .expect("condition timed out");
4643        }
4644    }
4645
4646    impl Drop for TestServer {
4647        fn drop(&mut self) {
4648            self.peer.reset();
4649        }
4650    }
4651
4652    struct TestClient {
4653        client: Arc<Client>,
4654        pub peer_id: PeerId,
4655        pub user_store: ModelHandle<UserStore>,
4656        project: Option<ModelHandle<Project>>,
4657        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4658    }
4659
4660    impl Deref for TestClient {
4661        type Target = Arc<Client>;
4662
4663        fn deref(&self) -> &Self::Target {
4664            &self.client
4665        }
4666    }
4667
4668    impl TestClient {
4669        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4670            UserId::from_proto(
4671                self.user_store
4672                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4673            )
4674        }
4675
4676        fn simulate_host(
4677            mut self,
4678            project: ModelHandle<Project>,
4679            mut language_server_config: LanguageServerConfig,
4680            operations: Rc<Cell<usize>>,
4681            max_operations: usize,
4682            rng: Arc<Mutex<StdRng>>,
4683            mut cx: TestAppContext,
4684        ) -> impl Future<Output = (Self, TestAppContext)> {
4685            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4686
4687            // Set up a fake language server.
4688            language_server_config.set_fake_initializer({
4689                let rng = rng.clone();
4690                let files = files.clone();
4691                let project = project.clone();
4692                move |fake_server| {
4693                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4694                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4695                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4696                                range: lsp::Range::new(
4697                                    lsp::Position::new(0, 0),
4698                                    lsp::Position::new(0, 0),
4699                                ),
4700                                new_text: "the-new-text".to_string(),
4701                            })),
4702                            ..Default::default()
4703                        }]))
4704                    });
4705
4706                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4707                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4708                            lsp::CodeAction {
4709                                title: "the-code-action".to_string(),
4710                                ..Default::default()
4711                            },
4712                        )])
4713                    });
4714
4715                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4716                        |params, _| {
4717                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4718                                params.position,
4719                                params.position,
4720                            )))
4721                        },
4722                    );
4723
4724                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4725                        let files = files.clone();
4726                        let rng = rng.clone();
4727                        move |_, _| {
4728                            let files = files.lock();
4729                            let mut rng = rng.lock();
4730                            let count = rng.gen_range::<usize, _>(1..3);
4731                            Some(lsp::GotoDefinitionResponse::Array(
4732                                (0..count)
4733                                    .map(|_| {
4734                                        let file = files.choose(&mut *rng).unwrap().as_path();
4735                                        lsp::Location {
4736                                            uri: lsp::Url::from_file_path(file).unwrap(),
4737                                            range: Default::default(),
4738                                        }
4739                                    })
4740                                    .collect(),
4741                            ))
4742                        }
4743                    });
4744
4745                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4746                        let rng = rng.clone();
4747                        let project = project.clone();
4748                        move |params, mut cx| {
4749                            project.update(&mut cx, |project, cx| {
4750                                let path = params
4751                                    .text_document_position_params
4752                                    .text_document
4753                                    .uri
4754                                    .to_file_path()
4755                                    .unwrap();
4756                                let (worktree, relative_path) =
4757                                    project.find_local_worktree(&path, cx)?;
4758                                let project_path =
4759                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4760                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4761
4762                                let mut highlights = Vec::new();
4763                                let highlight_count = rng.lock().gen_range(1..=5);
4764                                let mut prev_end = 0;
4765                                for _ in 0..highlight_count {
4766                                    let range =
4767                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4768                                    let start =
4769                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4770                                    let end =
4771                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4772                                    highlights.push(lsp::DocumentHighlight {
4773                                        range: lsp::Range::new(start, end),
4774                                        kind: Some(lsp::DocumentHighlightKind::READ),
4775                                    });
4776                                    prev_end = range.end;
4777                                }
4778                                Some(highlights)
4779                            })
4780                        }
4781                    });
4782                }
4783            });
4784
4785            project.update(&mut cx, |project, _| {
4786                project.languages().add(Arc::new(Language::new(
4787                    LanguageConfig {
4788                        name: "Rust".into(),
4789                        path_suffixes: vec!["rs".to_string()],
4790                        language_server: Some(language_server_config),
4791                        ..Default::default()
4792                    },
4793                    None,
4794                )));
4795            });
4796
4797            async move {
4798                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4799                while operations.get() < max_operations {
4800                    operations.set(operations.get() + 1);
4801
4802                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4803                    match distribution {
4804                        0..=20 if !files.lock().is_empty() => {
4805                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4806                            let mut path = path.as_path();
4807                            while let Some(parent_path) = path.parent() {
4808                                path = parent_path;
4809                                if rng.lock().gen() {
4810                                    break;
4811                                }
4812                            }
4813
4814                            log::info!("Host: find/create local worktree {:?}", path);
4815                            project
4816                                .update(&mut cx, |project, cx| {
4817                                    project.find_or_create_local_worktree(path, false, cx)
4818                                })
4819                                .await
4820                                .unwrap();
4821                        }
4822                        10..=80 if !files.lock().is_empty() => {
4823                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4824                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4825                                let (worktree, path) = project
4826                                    .update(&mut cx, |project, cx| {
4827                                        project.find_or_create_local_worktree(file, false, cx)
4828                                    })
4829                                    .await
4830                                    .unwrap();
4831                                let project_path =
4832                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4833                                log::info!("Host: opening path {:?}", project_path);
4834                                let buffer = project
4835                                    .update(&mut cx, |project, cx| {
4836                                        project.open_buffer(project_path, cx)
4837                                    })
4838                                    .await
4839                                    .unwrap();
4840                                self.buffers.insert(buffer.clone());
4841                                buffer
4842                            } else {
4843                                self.buffers
4844                                    .iter()
4845                                    .choose(&mut *rng.lock())
4846                                    .unwrap()
4847                                    .clone()
4848                            };
4849
4850                            if rng.lock().gen_bool(0.1) {
4851                                cx.update(|cx| {
4852                                    log::info!(
4853                                        "Host: dropping buffer {:?}",
4854                                        buffer.read(cx).file().unwrap().full_path(cx)
4855                                    );
4856                                    self.buffers.remove(&buffer);
4857                                    drop(buffer);
4858                                });
4859                            } else {
4860                                buffer.update(&mut cx, |buffer, cx| {
4861                                    log::info!(
4862                                        "Host: updating buffer {:?}",
4863                                        buffer.file().unwrap().full_path(cx)
4864                                    );
4865                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4866                                });
4867                            }
4868                        }
4869                        _ => loop {
4870                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4871                            let mut path = PathBuf::new();
4872                            path.push("/");
4873                            for _ in 0..path_component_count {
4874                                let letter = rng.lock().gen_range(b'a'..=b'z');
4875                                path.push(std::str::from_utf8(&[letter]).unwrap());
4876                            }
4877                            path.set_extension("rs");
4878                            let parent_path = path.parent().unwrap();
4879
4880                            log::info!("Host: creating file {:?}", path,);
4881
4882                            if fs.create_dir(&parent_path).await.is_ok()
4883                                && fs.create_file(&path, Default::default()).await.is_ok()
4884                            {
4885                                files.lock().push(path);
4886                                break;
4887                            } else {
4888                                log::info!("Host: cannot create file");
4889                            }
4890                        },
4891                    }
4892
4893                    cx.background().simulate_random_delay().await;
4894                }
4895
4896                self.project = Some(project);
4897                (self, cx)
4898            }
4899        }
4900
4901        pub async fn simulate_guest(
4902            mut self,
4903            guest_id: usize,
4904            project: ModelHandle<Project>,
4905            operations: Rc<Cell<usize>>,
4906            max_operations: usize,
4907            rng: Arc<Mutex<StdRng>>,
4908            mut cx: TestAppContext,
4909        ) -> (Self, TestAppContext) {
4910            while operations.get() < max_operations {
4911                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4912                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4913                        project
4914                            .worktrees(&cx)
4915                            .filter(|worktree| {
4916                                worktree.read(cx).entries(false).any(|e| e.is_file())
4917                            })
4918                            .choose(&mut *rng.lock())
4919                    }) {
4920                        worktree
4921                    } else {
4922                        cx.background().simulate_random_delay().await;
4923                        continue;
4924                    };
4925
4926                    operations.set(operations.get() + 1);
4927                    let project_path = worktree.read_with(&cx, |worktree, _| {
4928                        let entry = worktree
4929                            .entries(false)
4930                            .filter(|e| e.is_file())
4931                            .choose(&mut *rng.lock())
4932                            .unwrap();
4933                        (worktree.id(), entry.path.clone())
4934                    });
4935                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4936                    let buffer = project
4937                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4938                        .await
4939                        .unwrap();
4940                    self.buffers.insert(buffer.clone());
4941                    buffer
4942                } else {
4943                    operations.set(operations.get() + 1);
4944
4945                    self.buffers
4946                        .iter()
4947                        .choose(&mut *rng.lock())
4948                        .unwrap()
4949                        .clone()
4950                };
4951
4952                let choice = rng.lock().gen_range(0..100);
4953                match choice {
4954                    0..=9 => {
4955                        cx.update(|cx| {
4956                            log::info!(
4957                                "Guest {}: dropping buffer {:?}",
4958                                guest_id,
4959                                buffer.read(cx).file().unwrap().full_path(cx)
4960                            );
4961                            self.buffers.remove(&buffer);
4962                            drop(buffer);
4963                        });
4964                    }
4965                    10..=19 => {
4966                        let completions = project.update(&mut cx, |project, cx| {
4967                            log::info!(
4968                                "Guest {}: requesting completions for buffer {:?}",
4969                                guest_id,
4970                                buffer.read(cx).file().unwrap().full_path(cx)
4971                            );
4972                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4973                            project.completions(&buffer, offset, cx)
4974                        });
4975                        let completions = cx.background().spawn(async move {
4976                            completions.await.expect("completions request failed");
4977                        });
4978                        if rng.lock().gen_bool(0.3) {
4979                            log::info!("Guest {}: detaching completions request", guest_id);
4980                            completions.detach();
4981                        } else {
4982                            completions.await;
4983                        }
4984                    }
4985                    20..=29 => {
4986                        let code_actions = project.update(&mut cx, |project, cx| {
4987                            log::info!(
4988                                "Guest {}: requesting code actions for buffer {:?}",
4989                                guest_id,
4990                                buffer.read(cx).file().unwrap().full_path(cx)
4991                            );
4992                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4993                            project.code_actions(&buffer, range, cx)
4994                        });
4995                        let code_actions = cx.background().spawn(async move {
4996                            code_actions.await.expect("code actions request failed");
4997                        });
4998                        if rng.lock().gen_bool(0.3) {
4999                            log::info!("Guest {}: detaching code actions request", guest_id);
5000                            code_actions.detach();
5001                        } else {
5002                            code_actions.await;
5003                        }
5004                    }
5005                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5006                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5007                            log::info!(
5008                                "Guest {}: saving buffer {:?}",
5009                                guest_id,
5010                                buffer.file().unwrap().full_path(cx)
5011                            );
5012                            (buffer.version(), buffer.save(cx))
5013                        });
5014                        let save = cx.spawn(|cx| async move {
5015                            let (saved_version, _) = save.await.expect("save request failed");
5016                            buffer.read_with(&cx, |buffer, _| {
5017                                assert!(buffer.version().observed_all(&saved_version));
5018                                assert!(saved_version.observed_all(&requested_version));
5019                            });
5020                        });
5021                        if rng.lock().gen_bool(0.3) {
5022                            log::info!("Guest {}: detaching save request", guest_id);
5023                            save.detach();
5024                        } else {
5025                            save.await;
5026                        }
5027                    }
5028                    40..=45 => {
5029                        let prepare_rename = project.update(&mut cx, |project, cx| {
5030                            log::info!(
5031                                "Guest {}: preparing rename for buffer {:?}",
5032                                guest_id,
5033                                buffer.read(cx).file().unwrap().full_path(cx)
5034                            );
5035                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5036                            project.prepare_rename(buffer, offset, cx)
5037                        });
5038                        let prepare_rename = cx.background().spawn(async move {
5039                            prepare_rename.await.expect("prepare rename request failed");
5040                        });
5041                        if rng.lock().gen_bool(0.3) {
5042                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5043                            prepare_rename.detach();
5044                        } else {
5045                            prepare_rename.await;
5046                        }
5047                    }
5048                    46..=49 => {
5049                        let definitions = project.update(&mut cx, |project, cx| {
5050                            log::info!(
5051                                "Guest {}: requesting defintions for buffer {:?}",
5052                                guest_id,
5053                                buffer.read(cx).file().unwrap().full_path(cx)
5054                            );
5055                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5056                            project.definition(&buffer, offset, cx)
5057                        });
5058                        let definitions = cx.background().spawn(async move {
5059                            definitions.await.expect("definitions request failed");
5060                        });
5061                        if rng.lock().gen_bool(0.3) {
5062                            log::info!("Guest {}: detaching definitions request", guest_id);
5063                            definitions.detach();
5064                        } else {
5065                            definitions.await;
5066                        }
5067                    }
5068                    50..=55 => {
5069                        let highlights = project.update(&mut cx, |project, cx| {
5070                            log::info!(
5071                                "Guest {}: requesting highlights for buffer {:?}",
5072                                guest_id,
5073                                buffer.read(cx).file().unwrap().full_path(cx)
5074                            );
5075                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5076                            project.document_highlights(&buffer, offset, cx)
5077                        });
5078                        let highlights = cx.background().spawn(async move {
5079                            highlights.await.expect("highlights request failed");
5080                        });
5081                        if rng.lock().gen_bool(0.3) {
5082                            log::info!("Guest {}: detaching highlights request", guest_id);
5083                            highlights.detach();
5084                        } else {
5085                            highlights.await;
5086                        }
5087                    }
5088                    _ => {
5089                        buffer.update(&mut cx, |buffer, cx| {
5090                            log::info!(
5091                                "Guest {}: updating buffer {:?}",
5092                                guest_id,
5093                                buffer.file().unwrap().full_path(cx)
5094                            );
5095                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5096                        });
5097                    }
5098                }
5099                cx.background().simulate_random_delay().await;
5100            }
5101
5102            self.project = Some(project);
5103            (self, cx)
5104        }
5105    }
5106
5107    impl Executor for Arc<gpui::executor::Background> {
5108        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5109            self.spawn(future).detach();
5110        }
5111    }
5112
5113    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5114        channel
5115            .messages()
5116            .cursor::<()>()
5117            .map(|m| {
5118                (
5119                    m.sender.github_login.clone(),
5120                    m.body.clone(),
5121                    m.is_pending(),
5122                )
5123            })
5124            .collect()
5125    }
5126
5127    struct EmptyView;
5128
5129    impl gpui::Entity for EmptyView {
5130        type Event = ();
5131    }
5132
5133    impl gpui::View for EmptyView {
5134        fn ui_name() -> &'static str {
5135            "empty view"
5136        }
5137
5138        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5139            gpui::Element::boxed(gpui::elements::Empty)
5140        }
5141    }
5142}