rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::get_definition)
  81            .add_request_handler(Server::get_references)
  82            .add_request_handler(Server::get_document_highlights)
  83            .add_request_handler(Server::get_project_symbols)
  84            .add_request_handler(Server::open_buffer_for_symbol)
  85            .add_request_handler(Server::open_buffer)
  86            .add_message_handler(Server::close_buffer)
  87            .add_request_handler(Server::update_buffer)
  88            .add_message_handler(Server::update_buffer_file)
  89            .add_message_handler(Server::buffer_reloaded)
  90            .add_message_handler(Server::buffer_saved)
  91            .add_request_handler(Server::save_buffer)
  92            .add_request_handler(Server::format_buffers)
  93            .add_request_handler(Server::get_completions)
  94            .add_request_handler(Server::apply_additional_edits_for_completion)
  95            .add_request_handler(Server::get_code_actions)
  96            .add_request_handler(Server::apply_code_action)
  97            .add_request_handler(Server::prepare_rename)
  98            .add_request_handler(Server::perform_rename)
  99            .add_request_handler(Server::get_channels)
 100            .add_request_handler(Server::get_users)
 101            .add_request_handler(Server::join_channel)
 102            .add_message_handler(Server::leave_channel)
 103            .add_request_handler(Server::send_channel_message)
 104            .add_request_handler(Server::get_channel_messages);
 105
 106        Arc::new(server)
 107    }
 108
 109    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 110    where
 111        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 112        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 113        M: EnvelopedMessage,
 114    {
 115        let prev_handler = self.handlers.insert(
 116            TypeId::of::<M>(),
 117            Box::new(move |server, envelope| {
 118                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 119                (handler)(server, *envelope).boxed()
 120            }),
 121        );
 122        if prev_handler.is_some() {
 123            panic!("registered a handler for the same message twice");
 124        }
 125        self
 126    }
 127
 128    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 129    where
 130        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 131        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 132        M: RequestMessage,
 133    {
 134        self.add_message_handler(move |server, envelope| {
 135            let receipt = envelope.receipt();
 136            let response = (handler)(server.clone(), envelope);
 137            async move {
 138                match response.await {
 139                    Ok(response) => {
 140                        server.peer.respond(receipt, response)?;
 141                        Ok(())
 142                    }
 143                    Err(error) => {
 144                        server.peer.respond_with_error(
 145                            receipt,
 146                            proto::Error {
 147                                message: error.to_string(),
 148                            },
 149                        )?;
 150                        Err(error)
 151                    }
 152                }
 153            }
 154        })
 155    }
 156
 157    pub fn handle_connection<E: Executor>(
 158        self: &Arc<Self>,
 159        connection: Connection,
 160        addr: String,
 161        user_id: UserId,
 162        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 163        executor: E,
 164    ) -> impl Future<Output = ()> {
 165        let mut this = self.clone();
 166        async move {
 167            let (connection_id, handle_io, mut incoming_rx) =
 168                this.peer.add_connection(connection).await;
 169
 170            if let Some(send_connection_id) = send_connection_id.as_mut() {
 171                let _ = send_connection_id.send(connection_id).await;
 172            }
 173
 174            this.state_mut().add_connection(connection_id, user_id);
 175            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 176                log::error!("error updating contacts for {:?}: {}", user_id, err);
 177            }
 178
 179            let handle_io = handle_io.fuse();
 180            futures::pin_mut!(handle_io);
 181            loop {
 182                let next_message = incoming_rx.next().fuse();
 183                futures::pin_mut!(next_message);
 184                futures::select_biased! {
 185                    result = handle_io => {
 186                        if let Err(err) = result {
 187                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 188                        }
 189                        break;
 190                    }
 191                    message = next_message => {
 192                        if let Some(message) = message {
 193                            let start_time = Instant::now();
 194                            let type_name = message.payload_type_name();
 195                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 196                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 197                                let notifications = this.notifications.clone();
 198                                let is_background = message.is_background();
 199                                let handle_message = (handler)(this.clone(), message);
 200                                let handle_message = async move {
 201                                    if let Err(err) = handle_message.await {
 202                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 203                                    } else {
 204                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 205                                    }
 206                                    if let Some(mut notifications) = notifications {
 207                                        let _ = notifications.send(()).await;
 208                                    }
 209                                };
 210                                if is_background {
 211                                    executor.spawn_detached(handle_message);
 212                                } else {
 213                                    handle_message.await;
 214                                }
 215                            } else {
 216                                log::warn!("unhandled message: {}", type_name);
 217                            }
 218                        } else {
 219                            log::info!("rpc connection closed {:?}", addr);
 220                            break;
 221                        }
 222                    }
 223                }
 224            }
 225
 226            if let Err(err) = this.sign_out(connection_id).await {
 227                log::error!("error signing out connection {:?} - {:?}", addr, err);
 228            }
 229        }
 230    }
 231
 232    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 233        self.peer.disconnect(connection_id);
 234        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 235
 236        for (project_id, project) in removed_connection.hosted_projects {
 237            if let Some(share) = project.share {
 238                broadcast(
 239                    connection_id,
 240                    share.guests.keys().copied().collect(),
 241                    |conn_id| {
 242                        self.peer
 243                            .send(conn_id, proto::UnshareProject { project_id })
 244                    },
 245                )?;
 246            }
 247        }
 248
 249        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 250            broadcast(connection_id, peer_ids, |conn_id| {
 251                self.peer.send(
 252                    conn_id,
 253                    proto::RemoveProjectCollaborator {
 254                        project_id,
 255                        peer_id: connection_id.0,
 256                    },
 257                )
 258            })?;
 259        }
 260
 261        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 262        Ok(())
 263    }
 264
 265    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 266        Ok(proto::Ack {})
 267    }
 268
 269    async fn register_project(
 270        mut self: Arc<Server>,
 271        request: TypedEnvelope<proto::RegisterProject>,
 272    ) -> tide::Result<proto::RegisterProjectResponse> {
 273        let project_id = {
 274            let mut state = self.state_mut();
 275            let user_id = state.user_id_for_connection(request.sender_id)?;
 276            state.register_project(request.sender_id, user_id)
 277        };
 278        Ok(proto::RegisterProjectResponse { project_id })
 279    }
 280
 281    async fn unregister_project(
 282        mut self: Arc<Server>,
 283        request: TypedEnvelope<proto::UnregisterProject>,
 284    ) -> tide::Result<()> {
 285        let project = self
 286            .state_mut()
 287            .unregister_project(request.payload.project_id, request.sender_id)?;
 288        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 289        Ok(())
 290    }
 291
 292    async fn share_project(
 293        mut self: Arc<Server>,
 294        request: TypedEnvelope<proto::ShareProject>,
 295    ) -> tide::Result<proto::Ack> {
 296        self.state_mut()
 297            .share_project(request.payload.project_id, request.sender_id);
 298        Ok(proto::Ack {})
 299    }
 300
 301    async fn unshare_project(
 302        mut self: Arc<Server>,
 303        request: TypedEnvelope<proto::UnshareProject>,
 304    ) -> tide::Result<()> {
 305        let project_id = request.payload.project_id;
 306        let project = self
 307            .state_mut()
 308            .unshare_project(project_id, request.sender_id)?;
 309
 310        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 311            self.peer
 312                .send(conn_id, proto::UnshareProject { project_id })
 313        })?;
 314        self.update_contacts_for_users(&project.authorized_user_ids)?;
 315        Ok(())
 316    }
 317
 318    async fn join_project(
 319        mut self: Arc<Server>,
 320        request: TypedEnvelope<proto::JoinProject>,
 321    ) -> tide::Result<proto::JoinProjectResponse> {
 322        let project_id = request.payload.project_id;
 323
 324        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 325        let (response, connection_ids, contact_user_ids) = self
 326            .state_mut()
 327            .join_project(request.sender_id, user_id, project_id)
 328            .and_then(|joined| {
 329                let share = joined.project.share()?;
 330                let peer_count = share.guests.len();
 331                let mut collaborators = Vec::with_capacity(peer_count);
 332                collaborators.push(proto::Collaborator {
 333                    peer_id: joined.project.host_connection_id.0,
 334                    replica_id: 0,
 335                    user_id: joined.project.host_user_id.to_proto(),
 336                });
 337                let worktrees = share
 338                    .worktrees
 339                    .iter()
 340                    .filter_map(|(id, shared_worktree)| {
 341                        let worktree = joined.project.worktrees.get(&id)?;
 342                        Some(proto::Worktree {
 343                            id: *id,
 344                            root_name: worktree.root_name.clone(),
 345                            entries: shared_worktree.entries.values().cloned().collect(),
 346                            diagnostic_summaries: shared_worktree
 347                                .diagnostic_summaries
 348                                .values()
 349                                .cloned()
 350                                .collect(),
 351                            weak: worktree.weak,
 352                        })
 353                    })
 354                    .collect();
 355                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 356                    if *peer_conn_id != request.sender_id {
 357                        collaborators.push(proto::Collaborator {
 358                            peer_id: peer_conn_id.0,
 359                            replica_id: *peer_replica_id as u32,
 360                            user_id: peer_user_id.to_proto(),
 361                        });
 362                    }
 363                }
 364                let response = proto::JoinProjectResponse {
 365                    worktrees,
 366                    replica_id: joined.replica_id as u32,
 367                    collaborators,
 368                };
 369                let connection_ids = joined.project.connection_ids();
 370                let contact_user_ids = joined.project.authorized_user_ids();
 371                Ok((response, connection_ids, contact_user_ids))
 372            })?;
 373
 374        broadcast(request.sender_id, connection_ids, |conn_id| {
 375            self.peer.send(
 376                conn_id,
 377                proto::AddProjectCollaborator {
 378                    project_id,
 379                    collaborator: Some(proto::Collaborator {
 380                        peer_id: request.sender_id.0,
 381                        replica_id: response.replica_id,
 382                        user_id: user_id.to_proto(),
 383                    }),
 384                },
 385            )
 386        })?;
 387        self.update_contacts_for_users(&contact_user_ids)?;
 388        Ok(response)
 389    }
 390
 391    async fn leave_project(
 392        mut self: Arc<Server>,
 393        request: TypedEnvelope<proto::LeaveProject>,
 394    ) -> tide::Result<()> {
 395        let sender_id = request.sender_id;
 396        let project_id = request.payload.project_id;
 397        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 398
 399        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 400            self.peer.send(
 401                conn_id,
 402                proto::RemoveProjectCollaborator {
 403                    project_id,
 404                    peer_id: sender_id.0,
 405                },
 406            )
 407        })?;
 408        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 409
 410        Ok(())
 411    }
 412
 413    async fn register_worktree(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::RegisterWorktree>,
 416    ) -> tide::Result<proto::Ack> {
 417        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 418
 419        let mut contact_user_ids = HashSet::default();
 420        contact_user_ids.insert(host_user_id);
 421        for github_login in &request.payload.authorized_logins {
 422            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 423            contact_user_ids.insert(contact_user_id);
 424        }
 425
 426        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 427        let guest_connection_ids;
 428        {
 429            let mut state = self.state_mut();
 430            guest_connection_ids = state
 431                .read_project(request.payload.project_id, request.sender_id)?
 432                .guest_connection_ids();
 433            state.register_worktree(
 434                request.payload.project_id,
 435                request.payload.worktree_id,
 436                request.sender_id,
 437                Worktree {
 438                    authorized_user_ids: contact_user_ids.clone(),
 439                    root_name: request.payload.root_name.clone(),
 440                    weak: request.payload.weak,
 441                },
 442            )?;
 443        }
 444        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 445            self.peer
 446                .forward_send(request.sender_id, connection_id, request.payload.clone())
 447        })?;
 448        self.update_contacts_for_users(&contact_user_ids)?;
 449        Ok(proto::Ack {})
 450    }
 451
 452    async fn unregister_worktree(
 453        mut self: Arc<Server>,
 454        request: TypedEnvelope<proto::UnregisterWorktree>,
 455    ) -> tide::Result<()> {
 456        let project_id = request.payload.project_id;
 457        let worktree_id = request.payload.worktree_id;
 458        let (worktree, guest_connection_ids) =
 459            self.state_mut()
 460                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 461        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 462            self.peer.send(
 463                conn_id,
 464                proto::UnregisterWorktree {
 465                    project_id,
 466                    worktree_id,
 467                },
 468            )
 469        })?;
 470        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 471        Ok(())
 472    }
 473
 474    async fn update_worktree(
 475        mut self: Arc<Server>,
 476        request: TypedEnvelope<proto::UpdateWorktree>,
 477    ) -> tide::Result<proto::Ack> {
 478        let connection_ids = self.state_mut().update_worktree(
 479            request.sender_id,
 480            request.payload.project_id,
 481            request.payload.worktree_id,
 482            &request.payload.removed_entries,
 483            &request.payload.updated_entries,
 484        )?;
 485
 486        broadcast(request.sender_id, connection_ids, |connection_id| {
 487            self.peer
 488                .forward_send(request.sender_id, connection_id, request.payload.clone())
 489        })?;
 490
 491        Ok(proto::Ack {})
 492    }
 493
 494    async fn update_diagnostic_summary(
 495        mut self: Arc<Server>,
 496        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 497    ) -> tide::Result<()> {
 498        let summary = request
 499            .payload
 500            .summary
 501            .clone()
 502            .ok_or_else(|| anyhow!("invalid summary"))?;
 503        let receiver_ids = self.state_mut().update_diagnostic_summary(
 504            request.payload.project_id,
 505            request.payload.worktree_id,
 506            request.sender_id,
 507            summary,
 508        )?;
 509
 510        broadcast(request.sender_id, receiver_ids, |connection_id| {
 511            self.peer
 512                .forward_send(request.sender_id, connection_id, request.payload.clone())
 513        })?;
 514        Ok(())
 515    }
 516
 517    async fn disk_based_diagnostics_updating(
 518        self: Arc<Server>,
 519        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 520    ) -> tide::Result<()> {
 521        let receiver_ids = self
 522            .state()
 523            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 524        broadcast(request.sender_id, receiver_ids, |connection_id| {
 525            self.peer
 526                .forward_send(request.sender_id, connection_id, request.payload.clone())
 527        })?;
 528        Ok(())
 529    }
 530
 531    async fn disk_based_diagnostics_updated(
 532        self: Arc<Server>,
 533        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 534    ) -> tide::Result<()> {
 535        let receiver_ids = self
 536            .state()
 537            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 538        broadcast(request.sender_id, receiver_ids, |connection_id| {
 539            self.peer
 540                .forward_send(request.sender_id, connection_id, request.payload.clone())
 541        })?;
 542        Ok(())
 543    }
 544
 545    async fn get_definition(
 546        self: Arc<Server>,
 547        request: TypedEnvelope<proto::GetDefinition>,
 548    ) -> tide::Result<proto::GetDefinitionResponse> {
 549        let host_connection_id = self
 550            .state()
 551            .read_project(request.payload.project_id, request.sender_id)?
 552            .host_connection_id;
 553        Ok(self
 554            .peer
 555            .forward_request(request.sender_id, host_connection_id, request.payload)
 556            .await?)
 557    }
 558
 559    async fn get_references(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::GetReferences>,
 562    ) -> tide::Result<proto::GetReferencesResponse> {
 563        let host_connection_id = self
 564            .state()
 565            .read_project(request.payload.project_id, request.sender_id)?
 566            .host_connection_id;
 567        Ok(self
 568            .peer
 569            .forward_request(request.sender_id, host_connection_id, request.payload)
 570            .await?)
 571    }
 572
 573    async fn get_document_highlights(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::GetDocumentHighlights>,
 576    ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
 577        let host_connection_id = self
 578            .state()
 579            .read_project(request.payload.project_id, request.sender_id)?
 580            .host_connection_id;
 581        Ok(self
 582            .peer
 583            .forward_request(request.sender_id, host_connection_id, request.payload)
 584            .await?)
 585    }
 586
 587    async fn get_project_symbols(
 588        self: Arc<Server>,
 589        request: TypedEnvelope<proto::GetProjectSymbols>,
 590    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 591        let host_connection_id = self
 592            .state()
 593            .read_project(request.payload.project_id, request.sender_id)?
 594            .host_connection_id;
 595        Ok(self
 596            .peer
 597            .forward_request(request.sender_id, host_connection_id, request.payload)
 598            .await?)
 599    }
 600
 601    async fn open_buffer_for_symbol(
 602        self: Arc<Server>,
 603        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 604    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 605        let host_connection_id = self
 606            .state()
 607            .read_project(request.payload.project_id, request.sender_id)?
 608            .host_connection_id;
 609        Ok(self
 610            .peer
 611            .forward_request(request.sender_id, host_connection_id, request.payload)
 612            .await?)
 613    }
 614
 615    async fn open_buffer(
 616        self: Arc<Server>,
 617        request: TypedEnvelope<proto::OpenBuffer>,
 618    ) -> tide::Result<proto::OpenBufferResponse> {
 619        let host_connection_id = self
 620            .state()
 621            .read_project(request.payload.project_id, request.sender_id)?
 622            .host_connection_id;
 623        Ok(self
 624            .peer
 625            .forward_request(request.sender_id, host_connection_id, request.payload)
 626            .await?)
 627    }
 628
 629    async fn close_buffer(
 630        self: Arc<Server>,
 631        request: TypedEnvelope<proto::CloseBuffer>,
 632    ) -> tide::Result<()> {
 633        let host_connection_id = self
 634            .state()
 635            .read_project(request.payload.project_id, request.sender_id)?
 636            .host_connection_id;
 637        self.peer
 638            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 639        Ok(())
 640    }
 641
 642    async fn save_buffer(
 643        self: Arc<Server>,
 644        request: TypedEnvelope<proto::SaveBuffer>,
 645    ) -> tide::Result<proto::BufferSaved> {
 646        let host;
 647        let mut guests;
 648        {
 649            let state = self.state();
 650            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 651            host = project.host_connection_id;
 652            guests = project.guest_connection_ids()
 653        }
 654
 655        let response = self
 656            .peer
 657            .forward_request(request.sender_id, host, request.payload.clone())
 658            .await?;
 659
 660        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 661        broadcast(host, guests, |conn_id| {
 662            self.peer.forward_send(host, conn_id, response.clone())
 663        })?;
 664
 665        Ok(response)
 666    }
 667
 668    async fn format_buffers(
 669        self: Arc<Server>,
 670        request: TypedEnvelope<proto::FormatBuffers>,
 671    ) -> tide::Result<proto::FormatBuffersResponse> {
 672        let host = self
 673            .state()
 674            .read_project(request.payload.project_id, request.sender_id)?
 675            .host_connection_id;
 676        Ok(self
 677            .peer
 678            .forward_request(request.sender_id, host, request.payload.clone())
 679            .await?)
 680    }
 681
 682    async fn get_completions(
 683        self: Arc<Server>,
 684        request: TypedEnvelope<proto::GetCompletions>,
 685    ) -> tide::Result<proto::GetCompletionsResponse> {
 686        let host = self
 687            .state()
 688            .read_project(request.payload.project_id, request.sender_id)?
 689            .host_connection_id;
 690        Ok(self
 691            .peer
 692            .forward_request(request.sender_id, host, request.payload.clone())
 693            .await?)
 694    }
 695
 696    async fn apply_additional_edits_for_completion(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 699    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 700        let host = self
 701            .state()
 702            .read_project(request.payload.project_id, request.sender_id)?
 703            .host_connection_id;
 704        Ok(self
 705            .peer
 706            .forward_request(request.sender_id, host, request.payload.clone())
 707            .await?)
 708    }
 709
 710    async fn get_code_actions(
 711        self: Arc<Server>,
 712        request: TypedEnvelope<proto::GetCodeActions>,
 713    ) -> tide::Result<proto::GetCodeActionsResponse> {
 714        let host = self
 715            .state()
 716            .read_project(request.payload.project_id, request.sender_id)?
 717            .host_connection_id;
 718        Ok(self
 719            .peer
 720            .forward_request(request.sender_id, host, request.payload.clone())
 721            .await?)
 722    }
 723
 724    async fn apply_code_action(
 725        self: Arc<Server>,
 726        request: TypedEnvelope<proto::ApplyCodeAction>,
 727    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 728        let host = self
 729            .state()
 730            .read_project(request.payload.project_id, request.sender_id)?
 731            .host_connection_id;
 732        Ok(self
 733            .peer
 734            .forward_request(request.sender_id, host, request.payload.clone())
 735            .await?)
 736    }
 737
 738    async fn prepare_rename(
 739        self: Arc<Server>,
 740        request: TypedEnvelope<proto::PrepareRename>,
 741    ) -> tide::Result<proto::PrepareRenameResponse> {
 742        let host = self
 743            .state()
 744            .read_project(request.payload.project_id, request.sender_id)?
 745            .host_connection_id;
 746        Ok(self
 747            .peer
 748            .forward_request(request.sender_id, host, request.payload.clone())
 749            .await?)
 750    }
 751
 752    async fn perform_rename(
 753        self: Arc<Server>,
 754        request: TypedEnvelope<proto::PerformRename>,
 755    ) -> tide::Result<proto::PerformRenameResponse> {
 756        let host = self
 757            .state()
 758            .read_project(request.payload.project_id, request.sender_id)?
 759            .host_connection_id;
 760        Ok(self
 761            .peer
 762            .forward_request(request.sender_id, host, request.payload.clone())
 763            .await?)
 764    }
 765
 766    async fn update_buffer(
 767        self: Arc<Server>,
 768        request: TypedEnvelope<proto::UpdateBuffer>,
 769    ) -> tide::Result<proto::Ack> {
 770        let receiver_ids = self
 771            .state()
 772            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 773        broadcast(request.sender_id, receiver_ids, |connection_id| {
 774            self.peer
 775                .forward_send(request.sender_id, connection_id, request.payload.clone())
 776        })?;
 777        Ok(proto::Ack {})
 778    }
 779
 780    async fn update_buffer_file(
 781        self: Arc<Server>,
 782        request: TypedEnvelope<proto::UpdateBufferFile>,
 783    ) -> tide::Result<()> {
 784        let receiver_ids = self
 785            .state()
 786            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 787        broadcast(request.sender_id, receiver_ids, |connection_id| {
 788            self.peer
 789                .forward_send(request.sender_id, connection_id, request.payload.clone())
 790        })?;
 791        Ok(())
 792    }
 793
 794    async fn buffer_reloaded(
 795        self: Arc<Server>,
 796        request: TypedEnvelope<proto::BufferReloaded>,
 797    ) -> tide::Result<()> {
 798        let receiver_ids = self
 799            .state()
 800            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 801        broadcast(request.sender_id, receiver_ids, |connection_id| {
 802            self.peer
 803                .forward_send(request.sender_id, connection_id, request.payload.clone())
 804        })?;
 805        Ok(())
 806    }
 807
 808    async fn buffer_saved(
 809        self: Arc<Server>,
 810        request: TypedEnvelope<proto::BufferSaved>,
 811    ) -> tide::Result<()> {
 812        let receiver_ids = self
 813            .state()
 814            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 815        broadcast(request.sender_id, receiver_ids, |connection_id| {
 816            self.peer
 817                .forward_send(request.sender_id, connection_id, request.payload.clone())
 818        })?;
 819        Ok(())
 820    }
 821
 822    async fn get_channels(
 823        self: Arc<Server>,
 824        request: TypedEnvelope<proto::GetChannels>,
 825    ) -> tide::Result<proto::GetChannelsResponse> {
 826        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 827        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 828        Ok(proto::GetChannelsResponse {
 829            channels: channels
 830                .into_iter()
 831                .map(|chan| proto::Channel {
 832                    id: chan.id.to_proto(),
 833                    name: chan.name,
 834                })
 835                .collect(),
 836        })
 837    }
 838
 839    async fn get_users(
 840        self: Arc<Server>,
 841        request: TypedEnvelope<proto::GetUsers>,
 842    ) -> tide::Result<proto::GetUsersResponse> {
 843        let user_ids = request
 844            .payload
 845            .user_ids
 846            .into_iter()
 847            .map(UserId::from_proto)
 848            .collect();
 849        let users = self
 850            .app_state
 851            .db
 852            .get_users_by_ids(user_ids)
 853            .await?
 854            .into_iter()
 855            .map(|user| proto::User {
 856                id: user.id.to_proto(),
 857                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 858                github_login: user.github_login,
 859            })
 860            .collect();
 861        Ok(proto::GetUsersResponse { users })
 862    }
 863
 864    fn update_contacts_for_users<'a>(
 865        self: &Arc<Server>,
 866        user_ids: impl IntoIterator<Item = &'a UserId>,
 867    ) -> anyhow::Result<()> {
 868        let mut result = Ok(());
 869        let state = self.state();
 870        for user_id in user_ids {
 871            let contacts = state.contacts_for_user(*user_id);
 872            for connection_id in state.connection_ids_for_user(*user_id) {
 873                if let Err(error) = self.peer.send(
 874                    connection_id,
 875                    proto::UpdateContacts {
 876                        contacts: contacts.clone(),
 877                    },
 878                ) {
 879                    result = Err(error);
 880                }
 881            }
 882        }
 883        result
 884    }
 885
 886    async fn join_channel(
 887        mut self: Arc<Self>,
 888        request: TypedEnvelope<proto::JoinChannel>,
 889    ) -> tide::Result<proto::JoinChannelResponse> {
 890        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 891        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 892        if !self
 893            .app_state
 894            .db
 895            .can_user_access_channel(user_id, channel_id)
 896            .await?
 897        {
 898            Err(anyhow!("access denied"))?;
 899        }
 900
 901        self.state_mut().join_channel(request.sender_id, channel_id);
 902        let messages = self
 903            .app_state
 904            .db
 905            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 906            .await?
 907            .into_iter()
 908            .map(|msg| proto::ChannelMessage {
 909                id: msg.id.to_proto(),
 910                body: msg.body,
 911                timestamp: msg.sent_at.unix_timestamp() as u64,
 912                sender_id: msg.sender_id.to_proto(),
 913                nonce: Some(msg.nonce.as_u128().into()),
 914            })
 915            .collect::<Vec<_>>();
 916        Ok(proto::JoinChannelResponse {
 917            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 918            messages,
 919        })
 920    }
 921
 922    async fn leave_channel(
 923        mut self: Arc<Self>,
 924        request: TypedEnvelope<proto::LeaveChannel>,
 925    ) -> tide::Result<()> {
 926        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 927        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 928        if !self
 929            .app_state
 930            .db
 931            .can_user_access_channel(user_id, channel_id)
 932            .await?
 933        {
 934            Err(anyhow!("access denied"))?;
 935        }
 936
 937        self.state_mut()
 938            .leave_channel(request.sender_id, channel_id);
 939
 940        Ok(())
 941    }
 942
 943    async fn send_channel_message(
 944        self: Arc<Self>,
 945        request: TypedEnvelope<proto::SendChannelMessage>,
 946    ) -> tide::Result<proto::SendChannelMessageResponse> {
 947        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 948        let user_id;
 949        let connection_ids;
 950        {
 951            let state = self.state();
 952            user_id = state.user_id_for_connection(request.sender_id)?;
 953            connection_ids = state.channel_connection_ids(channel_id)?;
 954        }
 955
 956        // Validate the message body.
 957        let body = request.payload.body.trim().to_string();
 958        if body.len() > MAX_MESSAGE_LEN {
 959            return Err(anyhow!("message is too long"))?;
 960        }
 961        if body.is_empty() {
 962            return Err(anyhow!("message can't be blank"))?;
 963        }
 964
 965        let timestamp = OffsetDateTime::now_utc();
 966        let nonce = request
 967            .payload
 968            .nonce
 969            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 970
 971        let message_id = self
 972            .app_state
 973            .db
 974            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 975            .await?
 976            .to_proto();
 977        let message = proto::ChannelMessage {
 978            sender_id: user_id.to_proto(),
 979            id: message_id,
 980            body,
 981            timestamp: timestamp.unix_timestamp() as u64,
 982            nonce: Some(nonce),
 983        };
 984        broadcast(request.sender_id, connection_ids, |conn_id| {
 985            self.peer.send(
 986                conn_id,
 987                proto::ChannelMessageSent {
 988                    channel_id: channel_id.to_proto(),
 989                    message: Some(message.clone()),
 990                },
 991            )
 992        })?;
 993        Ok(proto::SendChannelMessageResponse {
 994            message: Some(message),
 995        })
 996    }
 997
 998    async fn get_channel_messages(
 999        self: Arc<Self>,
1000        request: TypedEnvelope<proto::GetChannelMessages>,
1001    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1002        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1003        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1004        if !self
1005            .app_state
1006            .db
1007            .can_user_access_channel(user_id, channel_id)
1008            .await?
1009        {
1010            Err(anyhow!("access denied"))?;
1011        }
1012
1013        let messages = self
1014            .app_state
1015            .db
1016            .get_channel_messages(
1017                channel_id,
1018                MESSAGE_COUNT_PER_PAGE,
1019                Some(MessageId::from_proto(request.payload.before_message_id)),
1020            )
1021            .await?
1022            .into_iter()
1023            .map(|msg| proto::ChannelMessage {
1024                id: msg.id.to_proto(),
1025                body: msg.body,
1026                timestamp: msg.sent_at.unix_timestamp() as u64,
1027                sender_id: msg.sender_id.to_proto(),
1028                nonce: Some(msg.nonce.as_u128().into()),
1029            })
1030            .collect::<Vec<_>>();
1031
1032        Ok(proto::GetChannelMessagesResponse {
1033            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1034            messages,
1035        })
1036    }
1037
1038    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1039        self.store.read()
1040    }
1041
1042    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1043        self.store.write()
1044    }
1045}
1046
1047impl Executor for RealExecutor {
1048    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1049        task::spawn(future);
1050    }
1051}
1052
1053fn broadcast<F>(
1054    sender_id: ConnectionId,
1055    receiver_ids: Vec<ConnectionId>,
1056    mut f: F,
1057) -> anyhow::Result<()>
1058where
1059    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1060{
1061    let mut result = Ok(());
1062    for receiver_id in receiver_ids {
1063        if receiver_id != sender_id {
1064            if let Err(error) = f(receiver_id) {
1065                if result.is_ok() {
1066                    result = Err(error);
1067                }
1068            }
1069        }
1070    }
1071    result
1072}
1073
1074pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1075    let server = Server::new(app.state().clone(), rpc.clone(), None);
1076    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1077        let server = server.clone();
1078        async move {
1079            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1080
1081            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1082            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1083            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1084            let client_protocol_version: Option<u32> = request
1085                .header("X-Zed-Protocol-Version")
1086                .and_then(|v| v.as_str().parse().ok());
1087
1088            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1089                return Ok(Response::new(StatusCode::UpgradeRequired));
1090            }
1091
1092            let header = match request.header("Sec-Websocket-Key") {
1093                Some(h) => h.as_str(),
1094                None => return Err(anyhow!("expected sec-websocket-key"))?,
1095            };
1096
1097            let user_id = process_auth_header(&request).await?;
1098
1099            let mut response = Response::new(StatusCode::SwitchingProtocols);
1100            response.insert_header(UPGRADE, "websocket");
1101            response.insert_header(CONNECTION, "Upgrade");
1102            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1103            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1104            response.insert_header("Sec-Websocket-Version", "13");
1105
1106            let http_res: &mut tide::http::Response = response.as_mut();
1107            let upgrade_receiver = http_res.recv_upgrade().await;
1108            let addr = request.remote().unwrap_or("unknown").to_string();
1109            task::spawn(async move {
1110                if let Some(stream) = upgrade_receiver.await {
1111                    server
1112                        .handle_connection(
1113                            Connection::new(
1114                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1115                            ),
1116                            addr,
1117                            user_id,
1118                            None,
1119                            RealExecutor,
1120                        )
1121                        .await;
1122                }
1123            });
1124
1125            Ok(response)
1126        }
1127    });
1128}
1129
1130fn header_contains_ignore_case<T>(
1131    request: &tide::Request<T>,
1132    header_name: HeaderName,
1133    value: &str,
1134) -> bool {
1135    request
1136        .header(header_name)
1137        .map(|h| {
1138            h.as_str()
1139                .split(',')
1140                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1141        })
1142        .unwrap_or(false)
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148    use crate::{
1149        auth,
1150        db::{tests::TestDb, UserId},
1151        github, AppState, Config,
1152    };
1153    use ::rpc::Peer;
1154    use collections::BTreeMap;
1155    use gpui::{executor, ModelHandle, TestAppContext};
1156    use parking_lot::Mutex;
1157    use postage::{sink::Sink, watch};
1158    use rand::prelude::*;
1159    use rpc::PeerId;
1160    use serde_json::json;
1161    use sqlx::types::time::OffsetDateTime;
1162    use std::{
1163        cell::Cell,
1164        env,
1165        ops::Deref,
1166        path::{Path, PathBuf},
1167        rc::Rc,
1168        sync::{
1169            atomic::{AtomicBool, Ordering::SeqCst},
1170            Arc,
1171        },
1172        time::Duration,
1173    };
1174    use zed::{
1175        client::{
1176            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1177            EstablishConnectionError, UserStore,
1178        },
1179        editor::{
1180            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1181            Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1182        },
1183        fs::{FakeFs, Fs as _},
1184        language::{
1185            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1186            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1187        },
1188        lsp,
1189        project::{DiagnosticSummary, Project, ProjectPath},
1190        workspace::{Settings, Workspace, WorkspaceParams},
1191    };
1192
1193    #[cfg(test)]
1194    #[ctor::ctor]
1195    fn init_logger() {
1196        if std::env::var("RUST_LOG").is_ok() {
1197            env_logger::init();
1198        }
1199    }
1200
1201    #[gpui::test(iterations = 10)]
1202    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1203        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1204        let lang_registry = Arc::new(LanguageRegistry::new());
1205        let fs = FakeFs::new(cx_a.background());
1206        cx_a.foreground().forbid_parking();
1207
1208        // Connect to a server as 2 clients.
1209        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1210        let client_a = server.create_client(&mut cx_a, "user_a").await;
1211        let client_b = server.create_client(&mut cx_b, "user_b").await;
1212
1213        // Share a project as client A
1214        fs.insert_tree(
1215            "/a",
1216            json!({
1217                ".zed.toml": r#"collaborators = ["user_b"]"#,
1218                "a.txt": "a-contents",
1219                "b.txt": "b-contents",
1220            }),
1221        )
1222        .await;
1223        let project_a = cx_a.update(|cx| {
1224            Project::local(
1225                client_a.clone(),
1226                client_a.user_store.clone(),
1227                lang_registry.clone(),
1228                fs.clone(),
1229                cx,
1230            )
1231        });
1232        let (worktree_a, _) = project_a
1233            .update(&mut cx_a, |p, cx| {
1234                p.find_or_create_local_worktree("/a", false, cx)
1235            })
1236            .await
1237            .unwrap();
1238        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1239        worktree_a
1240            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1241            .await;
1242        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1243        project_a
1244            .update(&mut cx_a, |p, cx| p.share(cx))
1245            .await
1246            .unwrap();
1247
1248        // Join that project as client B
1249        let project_b = Project::remote(
1250            project_id,
1251            client_b.clone(),
1252            client_b.user_store.clone(),
1253            lang_registry.clone(),
1254            fs.clone(),
1255            &mut cx_b.to_async(),
1256        )
1257        .await
1258        .unwrap();
1259
1260        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1261            assert_eq!(
1262                project
1263                    .collaborators()
1264                    .get(&client_a.peer_id)
1265                    .unwrap()
1266                    .user
1267                    .github_login,
1268                "user_a"
1269            );
1270            project.replica_id()
1271        });
1272        project_a
1273            .condition(&cx_a, |tree, _| {
1274                tree.collaborators()
1275                    .get(&client_b.peer_id)
1276                    .map_or(false, |collaborator| {
1277                        collaborator.replica_id == replica_id_b
1278                            && collaborator.user.github_login == "user_b"
1279                    })
1280            })
1281            .await;
1282
1283        // Open the same file as client B and client A.
1284        let buffer_b = project_b
1285            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1286            .await
1287            .unwrap();
1288        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1289        buffer_b.read_with(&cx_b, |buf, cx| {
1290            assert_eq!(buf.read(cx).text(), "b-contents")
1291        });
1292        project_a.read_with(&cx_a, |project, cx| {
1293            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1294        });
1295        let buffer_a = project_a
1296            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1297            .await
1298            .unwrap();
1299
1300        let editor_b = cx_b.add_view(window_b, |cx| {
1301            Editor::for_buffer(
1302                buffer_b,
1303                None,
1304                watch::channel_with(Settings::test(cx)).1,
1305                cx,
1306            )
1307        });
1308
1309        // TODO
1310        // // Create a selection set as client B and see that selection set as client A.
1311        // buffer_a
1312        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1313        //     .await;
1314
1315        // Edit the buffer as client B and see that edit as client A.
1316        editor_b.update(&mut cx_b, |editor, cx| {
1317            editor.handle_input(&Input("ok, ".into()), cx)
1318        });
1319        buffer_a
1320            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1321            .await;
1322
1323        // TODO
1324        // // Remove the selection set as client B, see those selections disappear as client A.
1325        cx_b.update(move |_| drop(editor_b));
1326        // buffer_a
1327        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1328        //     .await;
1329
1330        // Close the buffer as client A, see that the buffer is closed.
1331        cx_a.update(move |_| drop(buffer_a));
1332        project_a
1333            .condition(&cx_a, |project, cx| {
1334                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1335            })
1336            .await;
1337
1338        // Dropping the client B's project removes client B from client A's collaborators.
1339        cx_b.update(move |_| drop(project_b));
1340        project_a
1341            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1342            .await;
1343    }
1344
1345    #[gpui::test(iterations = 10)]
1346    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1347        let lang_registry = Arc::new(LanguageRegistry::new());
1348        let fs = FakeFs::new(cx_a.background());
1349        cx_a.foreground().forbid_parking();
1350
1351        // Connect to a server as 2 clients.
1352        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1353        let client_a = server.create_client(&mut cx_a, "user_a").await;
1354        let client_b = server.create_client(&mut cx_b, "user_b").await;
1355
1356        // Share a project as client A
1357        fs.insert_tree(
1358            "/a",
1359            json!({
1360                ".zed.toml": r#"collaborators = ["user_b"]"#,
1361                "a.txt": "a-contents",
1362                "b.txt": "b-contents",
1363            }),
1364        )
1365        .await;
1366        let project_a = cx_a.update(|cx| {
1367            Project::local(
1368                client_a.clone(),
1369                client_a.user_store.clone(),
1370                lang_registry.clone(),
1371                fs.clone(),
1372                cx,
1373            )
1374        });
1375        let (worktree_a, _) = project_a
1376            .update(&mut cx_a, |p, cx| {
1377                p.find_or_create_local_worktree("/a", false, cx)
1378            })
1379            .await
1380            .unwrap();
1381        worktree_a
1382            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1383            .await;
1384        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1385        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1386        project_a
1387            .update(&mut cx_a, |p, cx| p.share(cx))
1388            .await
1389            .unwrap();
1390        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1391
1392        // Join that project as client B
1393        let project_b = Project::remote(
1394            project_id,
1395            client_b.clone(),
1396            client_b.user_store.clone(),
1397            lang_registry.clone(),
1398            fs.clone(),
1399            &mut cx_b.to_async(),
1400        )
1401        .await
1402        .unwrap();
1403        project_b
1404            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1405            .await
1406            .unwrap();
1407
1408        // Unshare the project as client A
1409        project_a
1410            .update(&mut cx_a, |project, cx| project.unshare(cx))
1411            .await
1412            .unwrap();
1413        project_b
1414            .condition(&mut cx_b, |project, _| project.is_read_only())
1415            .await;
1416        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1417        drop(project_b);
1418
1419        // Share the project again and ensure guests can still join.
1420        project_a
1421            .update(&mut cx_a, |project, cx| project.share(cx))
1422            .await
1423            .unwrap();
1424        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1425
1426        let project_c = Project::remote(
1427            project_id,
1428            client_b.clone(),
1429            client_b.user_store.clone(),
1430            lang_registry.clone(),
1431            fs.clone(),
1432            &mut cx_b.to_async(),
1433        )
1434        .await
1435        .unwrap();
1436        project_c
1437            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1438            .await
1439            .unwrap();
1440    }
1441
1442    #[gpui::test(iterations = 10)]
1443    async fn test_propagate_saves_and_fs_changes(
1444        mut cx_a: TestAppContext,
1445        mut cx_b: TestAppContext,
1446        mut cx_c: TestAppContext,
1447    ) {
1448        let lang_registry = Arc::new(LanguageRegistry::new());
1449        let fs = FakeFs::new(cx_a.background());
1450        cx_a.foreground().forbid_parking();
1451
1452        // Connect to a server as 3 clients.
1453        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1454        let client_a = server.create_client(&mut cx_a, "user_a").await;
1455        let client_b = server.create_client(&mut cx_b, "user_b").await;
1456        let client_c = server.create_client(&mut cx_c, "user_c").await;
1457
1458        // Share a worktree as client A.
1459        fs.insert_tree(
1460            "/a",
1461            json!({
1462                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1463                "file1": "",
1464                "file2": ""
1465            }),
1466        )
1467        .await;
1468        let project_a = cx_a.update(|cx| {
1469            Project::local(
1470                client_a.clone(),
1471                client_a.user_store.clone(),
1472                lang_registry.clone(),
1473                fs.clone(),
1474                cx,
1475            )
1476        });
1477        let (worktree_a, _) = project_a
1478            .update(&mut cx_a, |p, cx| {
1479                p.find_or_create_local_worktree("/a", false, cx)
1480            })
1481            .await
1482            .unwrap();
1483        worktree_a
1484            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1485            .await;
1486        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1487        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1488        project_a
1489            .update(&mut cx_a, |p, cx| p.share(cx))
1490            .await
1491            .unwrap();
1492
1493        // Join that worktree as clients B and C.
1494        let project_b = Project::remote(
1495            project_id,
1496            client_b.clone(),
1497            client_b.user_store.clone(),
1498            lang_registry.clone(),
1499            fs.clone(),
1500            &mut cx_b.to_async(),
1501        )
1502        .await
1503        .unwrap();
1504        let project_c = Project::remote(
1505            project_id,
1506            client_c.clone(),
1507            client_c.user_store.clone(),
1508            lang_registry.clone(),
1509            fs.clone(),
1510            &mut cx_c.to_async(),
1511        )
1512        .await
1513        .unwrap();
1514        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1515        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1516
1517        // Open and edit a buffer as both guests B and C.
1518        let buffer_b = project_b
1519            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1520            .await
1521            .unwrap();
1522        let buffer_c = project_c
1523            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1524            .await
1525            .unwrap();
1526        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1527        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1528
1529        // Open and edit that buffer as the host.
1530        let buffer_a = project_a
1531            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1532            .await
1533            .unwrap();
1534
1535        buffer_a
1536            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1537            .await;
1538        buffer_a.update(&mut cx_a, |buf, cx| {
1539            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1540        });
1541
1542        // Wait for edits to propagate
1543        buffer_a
1544            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1545            .await;
1546        buffer_b
1547            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1548            .await;
1549        buffer_c
1550            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1551            .await;
1552
1553        // Edit the buffer as the host and concurrently save as guest B.
1554        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1555        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1556        save_b.await.unwrap();
1557        assert_eq!(
1558            fs.load("/a/file1".as_ref()).await.unwrap(),
1559            "hi-a, i-am-c, i-am-b, i-am-a"
1560        );
1561        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1562        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1563        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1564
1565        // Make changes on host's file system, see those changes on guest worktrees.
1566        fs.rename(
1567            "/a/file1".as_ref(),
1568            "/a/file1-renamed".as_ref(),
1569            Default::default(),
1570        )
1571        .await
1572        .unwrap();
1573
1574        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1575            .await
1576            .unwrap();
1577        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1578
1579        worktree_a
1580            .condition(&cx_a, |tree, _| {
1581                tree.paths()
1582                    .map(|p| p.to_string_lossy())
1583                    .collect::<Vec<_>>()
1584                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1585            })
1586            .await;
1587        worktree_b
1588            .condition(&cx_b, |tree, _| {
1589                tree.paths()
1590                    .map(|p| p.to_string_lossy())
1591                    .collect::<Vec<_>>()
1592                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1593            })
1594            .await;
1595        worktree_c
1596            .condition(&cx_c, |tree, _| {
1597                tree.paths()
1598                    .map(|p| p.to_string_lossy())
1599                    .collect::<Vec<_>>()
1600                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1601            })
1602            .await;
1603
1604        // Ensure buffer files are updated as well.
1605        buffer_a
1606            .condition(&cx_a, |buf, _| {
1607                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608            })
1609            .await;
1610        buffer_b
1611            .condition(&cx_b, |buf, _| {
1612                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613            })
1614            .await;
1615        buffer_c
1616            .condition(&cx_c, |buf, _| {
1617                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1618            })
1619            .await;
1620    }
1621
1622    #[gpui::test(iterations = 10)]
1623    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1624        cx_a.foreground().forbid_parking();
1625        let lang_registry = Arc::new(LanguageRegistry::new());
1626        let fs = FakeFs::new(cx_a.background());
1627
1628        // Connect to a server as 2 clients.
1629        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1630        let client_a = server.create_client(&mut cx_a, "user_a").await;
1631        let client_b = server.create_client(&mut cx_b, "user_b").await;
1632
1633        // Share a project as client A
1634        fs.insert_tree(
1635            "/dir",
1636            json!({
1637                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1638                "a.txt": "a-contents",
1639            }),
1640        )
1641        .await;
1642
1643        let project_a = cx_a.update(|cx| {
1644            Project::local(
1645                client_a.clone(),
1646                client_a.user_store.clone(),
1647                lang_registry.clone(),
1648                fs.clone(),
1649                cx,
1650            )
1651        });
1652        let (worktree_a, _) = project_a
1653            .update(&mut cx_a, |p, cx| {
1654                p.find_or_create_local_worktree("/dir", false, cx)
1655            })
1656            .await
1657            .unwrap();
1658        worktree_a
1659            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1660            .await;
1661        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1662        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1663        project_a
1664            .update(&mut cx_a, |p, cx| p.share(cx))
1665            .await
1666            .unwrap();
1667
1668        // Join that project as client B
1669        let project_b = Project::remote(
1670            project_id,
1671            client_b.clone(),
1672            client_b.user_store.clone(),
1673            lang_registry.clone(),
1674            fs.clone(),
1675            &mut cx_b.to_async(),
1676        )
1677        .await
1678        .unwrap();
1679
1680        // Open a buffer as client B
1681        let buffer_b = project_b
1682            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1683            .await
1684            .unwrap();
1685
1686        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1687        buffer_b.read_with(&cx_b, |buf, _| {
1688            assert!(buf.is_dirty());
1689            assert!(!buf.has_conflict());
1690        });
1691
1692        buffer_b
1693            .update(&mut cx_b, |buf, cx| buf.save(cx))
1694            .await
1695            .unwrap();
1696        buffer_b
1697            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1698            .await;
1699        buffer_b.read_with(&cx_b, |buf, _| {
1700            assert!(!buf.has_conflict());
1701        });
1702
1703        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1704        buffer_b.read_with(&cx_b, |buf, _| {
1705            assert!(buf.is_dirty());
1706            assert!(!buf.has_conflict());
1707        });
1708    }
1709
1710    #[gpui::test(iterations = 10)]
1711    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1712        cx_a.foreground().forbid_parking();
1713        let lang_registry = Arc::new(LanguageRegistry::new());
1714        let fs = FakeFs::new(cx_a.background());
1715
1716        // Connect to a server as 2 clients.
1717        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1718        let client_a = server.create_client(&mut cx_a, "user_a").await;
1719        let client_b = server.create_client(&mut cx_b, "user_b").await;
1720
1721        // Share a project as client A
1722        fs.insert_tree(
1723            "/dir",
1724            json!({
1725                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1726                "a.txt": "a-contents",
1727            }),
1728        )
1729        .await;
1730
1731        let project_a = cx_a.update(|cx| {
1732            Project::local(
1733                client_a.clone(),
1734                client_a.user_store.clone(),
1735                lang_registry.clone(),
1736                fs.clone(),
1737                cx,
1738            )
1739        });
1740        let (worktree_a, _) = project_a
1741            .update(&mut cx_a, |p, cx| {
1742                p.find_or_create_local_worktree("/dir", false, cx)
1743            })
1744            .await
1745            .unwrap();
1746        worktree_a
1747            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1748            .await;
1749        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1750        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1751        project_a
1752            .update(&mut cx_a, |p, cx| p.share(cx))
1753            .await
1754            .unwrap();
1755
1756        // Join that project as client B
1757        let project_b = Project::remote(
1758            project_id,
1759            client_b.clone(),
1760            client_b.user_store.clone(),
1761            lang_registry.clone(),
1762            fs.clone(),
1763            &mut cx_b.to_async(),
1764        )
1765        .await
1766        .unwrap();
1767        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1768
1769        // Open a buffer as client B
1770        let buffer_b = project_b
1771            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1772            .await
1773            .unwrap();
1774        buffer_b.read_with(&cx_b, |buf, _| {
1775            assert!(!buf.is_dirty());
1776            assert!(!buf.has_conflict());
1777        });
1778
1779        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1780            .await
1781            .unwrap();
1782        buffer_b
1783            .condition(&cx_b, |buf, _| {
1784                buf.text() == "new contents" && !buf.is_dirty()
1785            })
1786            .await;
1787        buffer_b.read_with(&cx_b, |buf, _| {
1788            assert!(!buf.has_conflict());
1789        });
1790    }
1791
1792    #[gpui::test(iterations = 10)]
1793    async fn test_editing_while_guest_opens_buffer(
1794        mut cx_a: TestAppContext,
1795        mut cx_b: TestAppContext,
1796    ) {
1797        cx_a.foreground().forbid_parking();
1798        let lang_registry = Arc::new(LanguageRegistry::new());
1799        let fs = FakeFs::new(cx_a.background());
1800
1801        // Connect to a server as 2 clients.
1802        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1803        let client_a = server.create_client(&mut cx_a, "user_a").await;
1804        let client_b = server.create_client(&mut cx_b, "user_b").await;
1805
1806        // Share a project as client A
1807        fs.insert_tree(
1808            "/dir",
1809            json!({
1810                ".zed.toml": r#"collaborators = ["user_b"]"#,
1811                "a.txt": "a-contents",
1812            }),
1813        )
1814        .await;
1815        let project_a = cx_a.update(|cx| {
1816            Project::local(
1817                client_a.clone(),
1818                client_a.user_store.clone(),
1819                lang_registry.clone(),
1820                fs.clone(),
1821                cx,
1822            )
1823        });
1824        let (worktree_a, _) = project_a
1825            .update(&mut cx_a, |p, cx| {
1826                p.find_or_create_local_worktree("/dir", false, cx)
1827            })
1828            .await
1829            .unwrap();
1830        worktree_a
1831            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1832            .await;
1833        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1834        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1835        project_a
1836            .update(&mut cx_a, |p, cx| p.share(cx))
1837            .await
1838            .unwrap();
1839
1840        // Join that project as client B
1841        let project_b = Project::remote(
1842            project_id,
1843            client_b.clone(),
1844            client_b.user_store.clone(),
1845            lang_registry.clone(),
1846            fs.clone(),
1847            &mut cx_b.to_async(),
1848        )
1849        .await
1850        .unwrap();
1851
1852        // Open a buffer as client A
1853        let buffer_a = project_a
1854            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1855            .await
1856            .unwrap();
1857
1858        // Start opening the same buffer as client B
1859        let buffer_b = cx_b
1860            .background()
1861            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1862
1863        // Edit the buffer as client A while client B is still opening it.
1864        cx_b.background().simulate_random_delay().await;
1865        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1866        cx_b.background().simulate_random_delay().await;
1867        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1868
1869        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1870        let buffer_b = buffer_b.await.unwrap();
1871        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1872    }
1873
1874    #[gpui::test(iterations = 10)]
1875    async fn test_leaving_worktree_while_opening_buffer(
1876        mut cx_a: TestAppContext,
1877        mut cx_b: TestAppContext,
1878    ) {
1879        cx_a.foreground().forbid_parking();
1880        let lang_registry = Arc::new(LanguageRegistry::new());
1881        let fs = FakeFs::new(cx_a.background());
1882
1883        // Connect to a server as 2 clients.
1884        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1885        let client_a = server.create_client(&mut cx_a, "user_a").await;
1886        let client_b = server.create_client(&mut cx_b, "user_b").await;
1887
1888        // Share a project as client A
1889        fs.insert_tree(
1890            "/dir",
1891            json!({
1892                ".zed.toml": r#"collaborators = ["user_b"]"#,
1893                "a.txt": "a-contents",
1894            }),
1895        )
1896        .await;
1897        let project_a = cx_a.update(|cx| {
1898            Project::local(
1899                client_a.clone(),
1900                client_a.user_store.clone(),
1901                lang_registry.clone(),
1902                fs.clone(),
1903                cx,
1904            )
1905        });
1906        let (worktree_a, _) = project_a
1907            .update(&mut cx_a, |p, cx| {
1908                p.find_or_create_local_worktree("/dir", false, cx)
1909            })
1910            .await
1911            .unwrap();
1912        worktree_a
1913            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1914            .await;
1915        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1916        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1917        project_a
1918            .update(&mut cx_a, |p, cx| p.share(cx))
1919            .await
1920            .unwrap();
1921
1922        // Join that project as client B
1923        let project_b = Project::remote(
1924            project_id,
1925            client_b.clone(),
1926            client_b.user_store.clone(),
1927            lang_registry.clone(),
1928            fs.clone(),
1929            &mut cx_b.to_async(),
1930        )
1931        .await
1932        .unwrap();
1933
1934        // See that a guest has joined as client A.
1935        project_a
1936            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1937            .await;
1938
1939        // Begin opening a buffer as client B, but leave the project before the open completes.
1940        let buffer_b = cx_b
1941            .background()
1942            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1943        cx_b.update(|_| drop(project_b));
1944        drop(buffer_b);
1945
1946        // See that the guest has left.
1947        project_a
1948            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1949            .await;
1950    }
1951
1952    #[gpui::test(iterations = 10)]
1953    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1954        cx_a.foreground().forbid_parking();
1955        let lang_registry = Arc::new(LanguageRegistry::new());
1956        let fs = FakeFs::new(cx_a.background());
1957
1958        // Connect to a server as 2 clients.
1959        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1960        let client_a = server.create_client(&mut cx_a, "user_a").await;
1961        let client_b = server.create_client(&mut cx_b, "user_b").await;
1962
1963        // Share a project as client A
1964        fs.insert_tree(
1965            "/a",
1966            json!({
1967                ".zed.toml": r#"collaborators = ["user_b"]"#,
1968                "a.txt": "a-contents",
1969                "b.txt": "b-contents",
1970            }),
1971        )
1972        .await;
1973        let project_a = cx_a.update(|cx| {
1974            Project::local(
1975                client_a.clone(),
1976                client_a.user_store.clone(),
1977                lang_registry.clone(),
1978                fs.clone(),
1979                cx,
1980            )
1981        });
1982        let (worktree_a, _) = project_a
1983            .update(&mut cx_a, |p, cx| {
1984                p.find_or_create_local_worktree("/a", false, cx)
1985            })
1986            .await
1987            .unwrap();
1988        worktree_a
1989            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1990            .await;
1991        let project_id = project_a
1992            .update(&mut cx_a, |project, _| project.next_remote_id())
1993            .await;
1994        project_a
1995            .update(&mut cx_a, |project, cx| project.share(cx))
1996            .await
1997            .unwrap();
1998
1999        // Join that project as client B
2000        let _project_b = Project::remote(
2001            project_id,
2002            client_b.clone(),
2003            client_b.user_store.clone(),
2004            lang_registry.clone(),
2005            fs.clone(),
2006            &mut cx_b.to_async(),
2007        )
2008        .await
2009        .unwrap();
2010
2011        // See that a guest has joined as client A.
2012        project_a
2013            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2014            .await;
2015
2016        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2017        client_b.disconnect(&cx_b.to_async()).unwrap();
2018        project_a
2019            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2020            .await;
2021    }
2022
2023    #[gpui::test(iterations = 10)]
2024    async fn test_collaborating_with_diagnostics(
2025        mut cx_a: TestAppContext,
2026        mut cx_b: TestAppContext,
2027    ) {
2028        cx_a.foreground().forbid_parking();
2029        let mut lang_registry = Arc::new(LanguageRegistry::new());
2030        let fs = FakeFs::new(cx_a.background());
2031
2032        // Set up a fake language server.
2033        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2034        Arc::get_mut(&mut lang_registry)
2035            .unwrap()
2036            .add(Arc::new(Language::new(
2037                LanguageConfig {
2038                    name: "Rust".into(),
2039                    path_suffixes: vec!["rs".to_string()],
2040                    language_server: Some(language_server_config),
2041                    ..Default::default()
2042                },
2043                Some(tree_sitter_rust::language()),
2044            )));
2045
2046        // Connect to a server as 2 clients.
2047        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2048        let client_a = server.create_client(&mut cx_a, "user_a").await;
2049        let client_b = server.create_client(&mut cx_b, "user_b").await;
2050
2051        // Share a project as client A
2052        fs.insert_tree(
2053            "/a",
2054            json!({
2055                ".zed.toml": r#"collaborators = ["user_b"]"#,
2056                "a.rs": "let one = two",
2057                "other.rs": "",
2058            }),
2059        )
2060        .await;
2061        let project_a = cx_a.update(|cx| {
2062            Project::local(
2063                client_a.clone(),
2064                client_a.user_store.clone(),
2065                lang_registry.clone(),
2066                fs.clone(),
2067                cx,
2068            )
2069        });
2070        let (worktree_a, _) = project_a
2071            .update(&mut cx_a, |p, cx| {
2072                p.find_or_create_local_worktree("/a", false, cx)
2073            })
2074            .await
2075            .unwrap();
2076        worktree_a
2077            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2078            .await;
2079        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2080        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2081        project_a
2082            .update(&mut cx_a, |p, cx| p.share(cx))
2083            .await
2084            .unwrap();
2085
2086        // Cause the language server to start.
2087        let _ = cx_a
2088            .background()
2089            .spawn(project_a.update(&mut cx_a, |project, cx| {
2090                project.open_buffer(
2091                    ProjectPath {
2092                        worktree_id,
2093                        path: Path::new("other.rs").into(),
2094                    },
2095                    cx,
2096                )
2097            }))
2098            .await
2099            .unwrap();
2100
2101        // Simulate a language server reporting errors for a file.
2102        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2103        fake_language_server
2104            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2105                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2106                version: None,
2107                diagnostics: vec![lsp::Diagnostic {
2108                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2109                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2110                    message: "message 1".to_string(),
2111                    ..Default::default()
2112                }],
2113            })
2114            .await;
2115
2116        // Wait for server to see the diagnostics update.
2117        server
2118            .condition(|store| {
2119                let worktree = store
2120                    .project(project_id)
2121                    .unwrap()
2122                    .share
2123                    .as_ref()
2124                    .unwrap()
2125                    .worktrees
2126                    .get(&worktree_id.to_proto())
2127                    .unwrap();
2128
2129                !worktree.diagnostic_summaries.is_empty()
2130            })
2131            .await;
2132
2133        // Join the worktree as client B.
2134        let project_b = Project::remote(
2135            project_id,
2136            client_b.clone(),
2137            client_b.user_store.clone(),
2138            lang_registry.clone(),
2139            fs.clone(),
2140            &mut cx_b.to_async(),
2141        )
2142        .await
2143        .unwrap();
2144
2145        project_b.read_with(&cx_b, |project, cx| {
2146            assert_eq!(
2147                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2148                &[(
2149                    ProjectPath {
2150                        worktree_id,
2151                        path: Arc::from(Path::new("a.rs")),
2152                    },
2153                    DiagnosticSummary {
2154                        error_count: 1,
2155                        warning_count: 0,
2156                        ..Default::default()
2157                    },
2158                )]
2159            )
2160        });
2161
2162        // Simulate a language server reporting more errors for a file.
2163        fake_language_server
2164            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2165                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2166                version: None,
2167                diagnostics: vec![
2168                    lsp::Diagnostic {
2169                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2170                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2171                        message: "message 1".to_string(),
2172                        ..Default::default()
2173                    },
2174                    lsp::Diagnostic {
2175                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2176                        range: lsp::Range::new(
2177                            lsp::Position::new(0, 10),
2178                            lsp::Position::new(0, 13),
2179                        ),
2180                        message: "message 2".to_string(),
2181                        ..Default::default()
2182                    },
2183                ],
2184            })
2185            .await;
2186
2187        // Client b gets the updated summaries
2188        project_b
2189            .condition(&cx_b, |project, cx| {
2190                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2191                    == &[(
2192                        ProjectPath {
2193                            worktree_id,
2194                            path: Arc::from(Path::new("a.rs")),
2195                        },
2196                        DiagnosticSummary {
2197                            error_count: 1,
2198                            warning_count: 1,
2199                            ..Default::default()
2200                        },
2201                    )]
2202            })
2203            .await;
2204
2205        // Open the file with the errors on client B. They should be present.
2206        let buffer_b = cx_b
2207            .background()
2208            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2209            .await
2210            .unwrap();
2211
2212        buffer_b.read_with(&cx_b, |buffer, _| {
2213            assert_eq!(
2214                buffer
2215                    .snapshot()
2216                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2217                    .map(|entry| entry)
2218                    .collect::<Vec<_>>(),
2219                &[
2220                    DiagnosticEntry {
2221                        range: Point::new(0, 4)..Point::new(0, 7),
2222                        diagnostic: Diagnostic {
2223                            group_id: 0,
2224                            message: "message 1".to_string(),
2225                            severity: lsp::DiagnosticSeverity::ERROR,
2226                            is_primary: true,
2227                            ..Default::default()
2228                        }
2229                    },
2230                    DiagnosticEntry {
2231                        range: Point::new(0, 10)..Point::new(0, 13),
2232                        diagnostic: Diagnostic {
2233                            group_id: 1,
2234                            severity: lsp::DiagnosticSeverity::WARNING,
2235                            message: "message 2".to_string(),
2236                            is_primary: true,
2237                            ..Default::default()
2238                        }
2239                    }
2240                ]
2241            );
2242        });
2243    }
2244
2245    #[gpui::test(iterations = 10)]
2246    async fn test_collaborating_with_completion(
2247        mut cx_a: TestAppContext,
2248        mut cx_b: TestAppContext,
2249    ) {
2250        cx_a.foreground().forbid_parking();
2251        let mut lang_registry = Arc::new(LanguageRegistry::new());
2252        let fs = FakeFs::new(cx_a.background());
2253
2254        // Set up a fake language server.
2255        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2256        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2257            completion_provider: Some(lsp::CompletionOptions {
2258                trigger_characters: Some(vec![".".to_string()]),
2259                ..Default::default()
2260            }),
2261            ..Default::default()
2262        });
2263        Arc::get_mut(&mut lang_registry)
2264            .unwrap()
2265            .add(Arc::new(Language::new(
2266                LanguageConfig {
2267                    name: "Rust".into(),
2268                    path_suffixes: vec!["rs".to_string()],
2269                    language_server: Some(language_server_config),
2270                    ..Default::default()
2271                },
2272                Some(tree_sitter_rust::language()),
2273            )));
2274
2275        // Connect to a server as 2 clients.
2276        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2277        let client_a = server.create_client(&mut cx_a, "user_a").await;
2278        let client_b = server.create_client(&mut cx_b, "user_b").await;
2279
2280        // Share a project as client A
2281        fs.insert_tree(
2282            "/a",
2283            json!({
2284                ".zed.toml": r#"collaborators = ["user_b"]"#,
2285                "main.rs": "fn main() { a }",
2286                "other.rs": "",
2287            }),
2288        )
2289        .await;
2290        let project_a = cx_a.update(|cx| {
2291            Project::local(
2292                client_a.clone(),
2293                client_a.user_store.clone(),
2294                lang_registry.clone(),
2295                fs.clone(),
2296                cx,
2297            )
2298        });
2299        let (worktree_a, _) = project_a
2300            .update(&mut cx_a, |p, cx| {
2301                p.find_or_create_local_worktree("/a", false, cx)
2302            })
2303            .await
2304            .unwrap();
2305        worktree_a
2306            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2307            .await;
2308        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2309        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2310        project_a
2311            .update(&mut cx_a, |p, cx| p.share(cx))
2312            .await
2313            .unwrap();
2314
2315        // Join the worktree as client B.
2316        let project_b = Project::remote(
2317            project_id,
2318            client_b.clone(),
2319            client_b.user_store.clone(),
2320            lang_registry.clone(),
2321            fs.clone(),
2322            &mut cx_b.to_async(),
2323        )
2324        .await
2325        .unwrap();
2326
2327        // Open a file in an editor as the guest.
2328        let buffer_b = project_b
2329            .update(&mut cx_b, |p, cx| {
2330                p.open_buffer((worktree_id, "main.rs"), cx)
2331            })
2332            .await
2333            .unwrap();
2334        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2335        let editor_b = cx_b.add_view(window_b, |cx| {
2336            Editor::for_buffer(
2337                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2338                Some(project_b.clone()),
2339                watch::channel_with(Settings::test(cx)).1,
2340                cx,
2341            )
2342        });
2343
2344        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2345        buffer_b
2346            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2347            .await;
2348
2349        // Type a completion trigger character as the guest.
2350        editor_b.update(&mut cx_b, |editor, cx| {
2351            editor.select_ranges([13..13], None, cx);
2352            editor.handle_input(&Input(".".into()), cx);
2353            cx.focus(&editor_b);
2354        });
2355
2356        // Receive a completion request as the host's language server.
2357        // Return some completions from the host's language server.
2358        cx_a.foreground().start_waiting();
2359        fake_language_server
2360            .handle_request::<lsp::request::Completion, _>(|params, _| {
2361                assert_eq!(
2362                    params.text_document_position.text_document.uri,
2363                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2364                );
2365                assert_eq!(
2366                    params.text_document_position.position,
2367                    lsp::Position::new(0, 14),
2368                );
2369
2370                Some(lsp::CompletionResponse::Array(vec![
2371                    lsp::CompletionItem {
2372                        label: "first_method(…)".into(),
2373                        detail: Some("fn(&mut self, B) -> C".into()),
2374                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2375                            new_text: "first_method($1)".to_string(),
2376                            range: lsp::Range::new(
2377                                lsp::Position::new(0, 14),
2378                                lsp::Position::new(0, 14),
2379                            ),
2380                        })),
2381                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2382                        ..Default::default()
2383                    },
2384                    lsp::CompletionItem {
2385                        label: "second_method(…)".into(),
2386                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2387                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2388                            new_text: "second_method()".to_string(),
2389                            range: lsp::Range::new(
2390                                lsp::Position::new(0, 14),
2391                                lsp::Position::new(0, 14),
2392                            ),
2393                        })),
2394                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2395                        ..Default::default()
2396                    },
2397                ]))
2398            })
2399            .next()
2400            .await
2401            .unwrap();
2402        cx_a.foreground().finish_waiting();
2403
2404        // Open the buffer on the host.
2405        let buffer_a = project_a
2406            .update(&mut cx_a, |p, cx| {
2407                p.open_buffer((worktree_id, "main.rs"), cx)
2408            })
2409            .await
2410            .unwrap();
2411        buffer_a
2412            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2413            .await;
2414
2415        // Confirm a completion on the guest.
2416        editor_b
2417            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2418            .await;
2419        editor_b.update(&mut cx_b, |editor, cx| {
2420            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2421            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2422        });
2423
2424        // Return a resolved completion from the host's language server.
2425        // The resolved completion has an additional text edit.
2426        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2427            |params, _| {
2428                assert_eq!(params.label, "first_method(…)");
2429                lsp::CompletionItem {
2430                    label: "first_method(…)".into(),
2431                    detail: Some("fn(&mut self, B) -> C".into()),
2432                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2433                        new_text: "first_method($1)".to_string(),
2434                        range: lsp::Range::new(
2435                            lsp::Position::new(0, 14),
2436                            lsp::Position::new(0, 14),
2437                        ),
2438                    })),
2439                    additional_text_edits: Some(vec![lsp::TextEdit {
2440                        new_text: "use d::SomeTrait;\n".to_string(),
2441                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2442                    }]),
2443                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2444                    ..Default::default()
2445                }
2446            },
2447        );
2448
2449        // The additional edit is applied.
2450        buffer_a
2451            .condition(&cx_a, |buffer, _| {
2452                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2453            })
2454            .await;
2455        buffer_b
2456            .condition(&cx_b, |buffer, _| {
2457                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2458            })
2459            .await;
2460    }
2461
2462    #[gpui::test(iterations = 10)]
2463    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2464        cx_a.foreground().forbid_parking();
2465        let mut lang_registry = Arc::new(LanguageRegistry::new());
2466        let fs = FakeFs::new(cx_a.background());
2467
2468        // Set up a fake language server.
2469        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2470        Arc::get_mut(&mut lang_registry)
2471            .unwrap()
2472            .add(Arc::new(Language::new(
2473                LanguageConfig {
2474                    name: "Rust".into(),
2475                    path_suffixes: vec!["rs".to_string()],
2476                    language_server: Some(language_server_config),
2477                    ..Default::default()
2478                },
2479                Some(tree_sitter_rust::language()),
2480            )));
2481
2482        // Connect to a server as 2 clients.
2483        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2484        let client_a = server.create_client(&mut cx_a, "user_a").await;
2485        let client_b = server.create_client(&mut cx_b, "user_b").await;
2486
2487        // Share a project as client A
2488        fs.insert_tree(
2489            "/a",
2490            json!({
2491                ".zed.toml": r#"collaborators = ["user_b"]"#,
2492                "a.rs": "let one = two",
2493            }),
2494        )
2495        .await;
2496        let project_a = cx_a.update(|cx| {
2497            Project::local(
2498                client_a.clone(),
2499                client_a.user_store.clone(),
2500                lang_registry.clone(),
2501                fs.clone(),
2502                cx,
2503            )
2504        });
2505        let (worktree_a, _) = project_a
2506            .update(&mut cx_a, |p, cx| {
2507                p.find_or_create_local_worktree("/a", false, cx)
2508            })
2509            .await
2510            .unwrap();
2511        worktree_a
2512            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2513            .await;
2514        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2515        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2516        project_a
2517            .update(&mut cx_a, |p, cx| p.share(cx))
2518            .await
2519            .unwrap();
2520
2521        // Join the worktree as client B.
2522        let project_b = Project::remote(
2523            project_id,
2524            client_b.clone(),
2525            client_b.user_store.clone(),
2526            lang_registry.clone(),
2527            fs.clone(),
2528            &mut cx_b.to_async(),
2529        )
2530        .await
2531        .unwrap();
2532
2533        let buffer_b = cx_b
2534            .background()
2535            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2536            .await
2537            .unwrap();
2538
2539        let format = project_b.update(&mut cx_b, |project, cx| {
2540            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2541        });
2542
2543        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2544        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2545            Some(vec![
2546                lsp::TextEdit {
2547                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2548                    new_text: "h".to_string(),
2549                },
2550                lsp::TextEdit {
2551                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2552                    new_text: "y".to_string(),
2553                },
2554            ])
2555        });
2556
2557        format.await.unwrap();
2558        assert_eq!(
2559            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2560            "let honey = two"
2561        );
2562    }
2563
2564    #[gpui::test(iterations = 10)]
2565    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2566        cx_a.foreground().forbid_parking();
2567        let mut lang_registry = Arc::new(LanguageRegistry::new());
2568        let fs = FakeFs::new(cx_a.background());
2569        fs.insert_tree(
2570            "/root-1",
2571            json!({
2572                ".zed.toml": r#"collaborators = ["user_b"]"#,
2573                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2574            }),
2575        )
2576        .await;
2577        fs.insert_tree(
2578            "/root-2",
2579            json!({
2580                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2581            }),
2582        )
2583        .await;
2584
2585        // Set up a fake language server.
2586        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2587        Arc::get_mut(&mut lang_registry)
2588            .unwrap()
2589            .add(Arc::new(Language::new(
2590                LanguageConfig {
2591                    name: "Rust".into(),
2592                    path_suffixes: vec!["rs".to_string()],
2593                    language_server: Some(language_server_config),
2594                    ..Default::default()
2595                },
2596                Some(tree_sitter_rust::language()),
2597            )));
2598
2599        // Connect to a server as 2 clients.
2600        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2601        let client_a = server.create_client(&mut cx_a, "user_a").await;
2602        let client_b = server.create_client(&mut cx_b, "user_b").await;
2603
2604        // Share a project as client A
2605        let project_a = cx_a.update(|cx| {
2606            Project::local(
2607                client_a.clone(),
2608                client_a.user_store.clone(),
2609                lang_registry.clone(),
2610                fs.clone(),
2611                cx,
2612            )
2613        });
2614        let (worktree_a, _) = project_a
2615            .update(&mut cx_a, |p, cx| {
2616                p.find_or_create_local_worktree("/root-1", false, cx)
2617            })
2618            .await
2619            .unwrap();
2620        worktree_a
2621            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2622            .await;
2623        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2624        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2625        project_a
2626            .update(&mut cx_a, |p, cx| p.share(cx))
2627            .await
2628            .unwrap();
2629
2630        // Join the worktree as client B.
2631        let project_b = Project::remote(
2632            project_id,
2633            client_b.clone(),
2634            client_b.user_store.clone(),
2635            lang_registry.clone(),
2636            fs.clone(),
2637            &mut cx_b.to_async(),
2638        )
2639        .await
2640        .unwrap();
2641
2642        // Open the file on client B.
2643        let buffer_b = cx_b
2644            .background()
2645            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2646            .await
2647            .unwrap();
2648
2649        // Request the definition of a symbol as the guest.
2650        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2651
2652        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2653        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2654            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2655                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2656                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2657            )))
2658        });
2659
2660        let definitions_1 = definitions_1.await.unwrap();
2661        cx_b.read(|cx| {
2662            assert_eq!(definitions_1.len(), 1);
2663            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2664            let target_buffer = definitions_1[0].buffer.read(cx);
2665            assert_eq!(
2666                target_buffer.text(),
2667                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2668            );
2669            assert_eq!(
2670                definitions_1[0].range.to_point(target_buffer),
2671                Point::new(0, 6)..Point::new(0, 9)
2672            );
2673        });
2674
2675        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2676        // the previous call to `definition`.
2677        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2678        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2679            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2680                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2681                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2682            )))
2683        });
2684
2685        let definitions_2 = definitions_2.await.unwrap();
2686        cx_b.read(|cx| {
2687            assert_eq!(definitions_2.len(), 1);
2688            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2689            let target_buffer = definitions_2[0].buffer.read(cx);
2690            assert_eq!(
2691                target_buffer.text(),
2692                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2693            );
2694            assert_eq!(
2695                definitions_2[0].range.to_point(target_buffer),
2696                Point::new(1, 6)..Point::new(1, 11)
2697            );
2698        });
2699        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2700
2701        cx_b.update(|_| {
2702            drop(definitions_1);
2703            drop(definitions_2);
2704        });
2705        project_b
2706            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2707            .await;
2708    }
2709
2710    #[gpui::test(iterations = 10)]
2711    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2712        cx_a.foreground().forbid_parking();
2713        let mut lang_registry = Arc::new(LanguageRegistry::new());
2714        let fs = FakeFs::new(cx_a.background());
2715        fs.insert_tree(
2716            "/root-1",
2717            json!({
2718                ".zed.toml": r#"collaborators = ["user_b"]"#,
2719                "one.rs": "const ONE: usize = 1;",
2720                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2721            }),
2722        )
2723        .await;
2724        fs.insert_tree(
2725            "/root-2",
2726            json!({
2727                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2728            }),
2729        )
2730        .await;
2731
2732        // Set up a fake language server.
2733        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2734        Arc::get_mut(&mut lang_registry)
2735            .unwrap()
2736            .add(Arc::new(Language::new(
2737                LanguageConfig {
2738                    name: "Rust".into(),
2739                    path_suffixes: vec!["rs".to_string()],
2740                    language_server: Some(language_server_config),
2741                    ..Default::default()
2742                },
2743                Some(tree_sitter_rust::language()),
2744            )));
2745
2746        // Connect to a server as 2 clients.
2747        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2748        let client_a = server.create_client(&mut cx_a, "user_a").await;
2749        let client_b = server.create_client(&mut cx_b, "user_b").await;
2750
2751        // Share a project as client A
2752        let project_a = cx_a.update(|cx| {
2753            Project::local(
2754                client_a.clone(),
2755                client_a.user_store.clone(),
2756                lang_registry.clone(),
2757                fs.clone(),
2758                cx,
2759            )
2760        });
2761        let (worktree_a, _) = project_a
2762            .update(&mut cx_a, |p, cx| {
2763                p.find_or_create_local_worktree("/root-1", false, cx)
2764            })
2765            .await
2766            .unwrap();
2767        worktree_a
2768            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2769            .await;
2770        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2771        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2772        project_a
2773            .update(&mut cx_a, |p, cx| p.share(cx))
2774            .await
2775            .unwrap();
2776
2777        // Join the worktree as client B.
2778        let project_b = Project::remote(
2779            project_id,
2780            client_b.clone(),
2781            client_b.user_store.clone(),
2782            lang_registry.clone(),
2783            fs.clone(),
2784            &mut cx_b.to_async(),
2785        )
2786        .await
2787        .unwrap();
2788
2789        // Open the file on client B.
2790        let buffer_b = cx_b
2791            .background()
2792            .spawn(project_b.update(&mut cx_b, |p, cx| {
2793                p.open_buffer((worktree_id, "one.rs"), cx)
2794            }))
2795            .await
2796            .unwrap();
2797
2798        // Request references to a symbol as the guest.
2799        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2800
2801        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2802        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2803            assert_eq!(
2804                params.text_document_position.text_document.uri.as_str(),
2805                "file:///root-1/one.rs"
2806            );
2807            Some(vec![
2808                lsp::Location {
2809                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2810                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2811                },
2812                lsp::Location {
2813                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2814                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2815                },
2816                lsp::Location {
2817                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2818                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2819                },
2820            ])
2821        });
2822
2823        let references = references.await.unwrap();
2824        cx_b.read(|cx| {
2825            assert_eq!(references.len(), 3);
2826            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2827
2828            let two_buffer = references[0].buffer.read(cx);
2829            let three_buffer = references[2].buffer.read(cx);
2830            assert_eq!(
2831                two_buffer.file().unwrap().path().as_ref(),
2832                Path::new("two.rs")
2833            );
2834            assert_eq!(references[1].buffer, references[0].buffer);
2835            assert_eq!(
2836                three_buffer.file().unwrap().full_path(cx),
2837                Path::new("three.rs")
2838            );
2839
2840            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2841            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2842            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2843        });
2844    }
2845
2846    #[gpui::test(iterations = 10)]
2847    async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2848        cx_a.foreground().forbid_parking();
2849        let lang_registry = Arc::new(LanguageRegistry::new());
2850        let fs = FakeFs::new(cx_a.background());
2851        fs.insert_tree(
2852            "/root-1",
2853            json!({
2854                ".zed.toml": r#"collaborators = ["user_b"]"#,
2855                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2856            }),
2857        )
2858        .await;
2859
2860        // Set up a fake language server.
2861        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2862        lang_registry.add(Arc::new(Language::new(
2863            LanguageConfig {
2864                name: "Rust".into(),
2865                path_suffixes: vec!["rs".to_string()],
2866                language_server: Some(language_server_config),
2867                ..Default::default()
2868            },
2869            Some(tree_sitter_rust::language()),
2870        )));
2871
2872        // Connect to a server as 2 clients.
2873        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2874        let client_a = server.create_client(&mut cx_a, "user_a").await;
2875        let client_b = server.create_client(&mut cx_b, "user_b").await;
2876
2877        // Share a project as client A
2878        let project_a = cx_a.update(|cx| {
2879            Project::local(
2880                client_a.clone(),
2881                client_a.user_store.clone(),
2882                lang_registry.clone(),
2883                fs.clone(),
2884                cx,
2885            )
2886        });
2887        let (worktree_a, _) = project_a
2888            .update(&mut cx_a, |p, cx| {
2889                p.find_or_create_local_worktree("/root-1", false, cx)
2890            })
2891            .await
2892            .unwrap();
2893        worktree_a
2894            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2895            .await;
2896        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2897        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2898        project_a
2899            .update(&mut cx_a, |p, cx| p.share(cx))
2900            .await
2901            .unwrap();
2902
2903        // Join the worktree as client B.
2904        let project_b = Project::remote(
2905            project_id,
2906            client_b.clone(),
2907            client_b.user_store.clone(),
2908            lang_registry.clone(),
2909            fs.clone(),
2910            &mut cx_b.to_async(),
2911        )
2912        .await
2913        .unwrap();
2914
2915        // Open the file on client B.
2916        let buffer_b = cx_b
2917            .background()
2918            .spawn(project_b.update(&mut cx_b, |p, cx| {
2919                p.open_buffer((worktree_id, "main.rs"), cx)
2920            }))
2921            .await
2922            .unwrap();
2923
2924        // Request document highlights as the guest.
2925        let highlights =
2926            project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2927
2928        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2929        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2930            |params, _| {
2931                assert_eq!(
2932                    params
2933                        .text_document_position_params
2934                        .text_document
2935                        .uri
2936                        .as_str(),
2937                    "file:///root-1/main.rs"
2938                );
2939                assert_eq!(
2940                    params.text_document_position_params.position,
2941                    lsp::Position::new(0, 34)
2942                );
2943                Some(vec![
2944                    lsp::DocumentHighlight {
2945                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2946                        range: lsp::Range::new(
2947                            lsp::Position::new(0, 10),
2948                            lsp::Position::new(0, 16),
2949                        ),
2950                    },
2951                    lsp::DocumentHighlight {
2952                        kind: Some(lsp::DocumentHighlightKind::READ),
2953                        range: lsp::Range::new(
2954                            lsp::Position::new(0, 32),
2955                            lsp::Position::new(0, 38),
2956                        ),
2957                    },
2958                    lsp::DocumentHighlight {
2959                        kind: Some(lsp::DocumentHighlightKind::READ),
2960                        range: lsp::Range::new(
2961                            lsp::Position::new(0, 41),
2962                            lsp::Position::new(0, 47),
2963                        ),
2964                    },
2965                ])
2966            },
2967        );
2968
2969        let highlights = highlights.await.unwrap();
2970        buffer_b.read_with(&cx_b, |buffer, _| {
2971            let snapshot = buffer.snapshot();
2972
2973            let highlights = highlights
2974                .into_iter()
2975                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2976                .collect::<Vec<_>>();
2977            assert_eq!(
2978                highlights,
2979                &[
2980                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2981                    (lsp::DocumentHighlightKind::READ, 32..38),
2982                    (lsp::DocumentHighlightKind::READ, 41..47)
2983                ]
2984            )
2985        });
2986    }
2987
2988    #[gpui::test(iterations = 10)]
2989    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2990        cx_a.foreground().forbid_parking();
2991        let mut lang_registry = Arc::new(LanguageRegistry::new());
2992        let fs = FakeFs::new(cx_a.background());
2993        fs.insert_tree(
2994            "/code",
2995            json!({
2996                "crate-1": {
2997                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2998                    "one.rs": "const ONE: usize = 1;",
2999                },
3000                "crate-2": {
3001                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3002                },
3003                "private": {
3004                    "passwords.txt": "the-password",
3005                }
3006            }),
3007        )
3008        .await;
3009
3010        // Set up a fake language server.
3011        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3012        Arc::get_mut(&mut lang_registry)
3013            .unwrap()
3014            .add(Arc::new(Language::new(
3015                LanguageConfig {
3016                    name: "Rust".into(),
3017                    path_suffixes: vec!["rs".to_string()],
3018                    language_server: Some(language_server_config),
3019                    ..Default::default()
3020                },
3021                Some(tree_sitter_rust::language()),
3022            )));
3023
3024        // Connect to a server as 2 clients.
3025        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3026        let client_a = server.create_client(&mut cx_a, "user_a").await;
3027        let client_b = server.create_client(&mut cx_b, "user_b").await;
3028
3029        // Share a project as client A
3030        let project_a = cx_a.update(|cx| {
3031            Project::local(
3032                client_a.clone(),
3033                client_a.user_store.clone(),
3034                lang_registry.clone(),
3035                fs.clone(),
3036                cx,
3037            )
3038        });
3039        let (worktree_a, _) = project_a
3040            .update(&mut cx_a, |p, cx| {
3041                p.find_or_create_local_worktree("/code/crate-1", false, cx)
3042            })
3043            .await
3044            .unwrap();
3045        worktree_a
3046            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3047            .await;
3048        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3049        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3050        project_a
3051            .update(&mut cx_a, |p, cx| p.share(cx))
3052            .await
3053            .unwrap();
3054
3055        // Join the worktree as client B.
3056        let project_b = Project::remote(
3057            project_id,
3058            client_b.clone(),
3059            client_b.user_store.clone(),
3060            lang_registry.clone(),
3061            fs.clone(),
3062            &mut cx_b.to_async(),
3063        )
3064        .await
3065        .unwrap();
3066
3067        // Cause the language server to start.
3068        let _buffer = cx_b
3069            .background()
3070            .spawn(project_b.update(&mut cx_b, |p, cx| {
3071                p.open_buffer((worktree_id, "one.rs"), cx)
3072            }))
3073            .await
3074            .unwrap();
3075
3076        // Request the definition of a symbol as the guest.
3077        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3078        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3079        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3080            #[allow(deprecated)]
3081            Some(vec![lsp::SymbolInformation {
3082                name: "TWO".into(),
3083                location: lsp::Location {
3084                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3085                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3086                },
3087                kind: lsp::SymbolKind::CONSTANT,
3088                tags: None,
3089                container_name: None,
3090                deprecated: None,
3091            }])
3092        });
3093
3094        let symbols = symbols.await.unwrap();
3095        assert_eq!(symbols.len(), 1);
3096        assert_eq!(symbols[0].name, "TWO");
3097
3098        // Open one of the returned symbols.
3099        let buffer_b_2 = project_b
3100            .update(&mut cx_b, |project, cx| {
3101                project.open_buffer_for_symbol(&symbols[0], cx)
3102            })
3103            .await
3104            .unwrap();
3105        buffer_b_2.read_with(&cx_b, |buffer, _| {
3106            assert_eq!(
3107                buffer.file().unwrap().path().as_ref(),
3108                Path::new("../crate-2/two.rs")
3109            );
3110        });
3111
3112        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3113        let mut fake_symbol = symbols[0].clone();
3114        fake_symbol.path = Path::new("/code/secrets").into();
3115        let error = project_b
3116            .update(&mut cx_b, |project, cx| {
3117                project.open_buffer_for_symbol(&fake_symbol, cx)
3118            })
3119            .await
3120            .unwrap_err();
3121        assert!(error.to_string().contains("invalid symbol signature"));
3122    }
3123
3124    #[gpui::test(iterations = 10)]
3125    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3126        mut cx_a: TestAppContext,
3127        mut cx_b: TestAppContext,
3128        mut rng: StdRng,
3129    ) {
3130        cx_a.foreground().forbid_parking();
3131        let mut lang_registry = Arc::new(LanguageRegistry::new());
3132        let fs = FakeFs::new(cx_a.background());
3133        fs.insert_tree(
3134            "/root",
3135            json!({
3136                ".zed.toml": r#"collaborators = ["user_b"]"#,
3137                "a.rs": "const ONE: usize = b::TWO;",
3138                "b.rs": "const TWO: usize = 2",
3139            }),
3140        )
3141        .await;
3142
3143        // Set up a fake language server.
3144        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3145
3146        Arc::get_mut(&mut lang_registry)
3147            .unwrap()
3148            .add(Arc::new(Language::new(
3149                LanguageConfig {
3150                    name: "Rust".into(),
3151                    path_suffixes: vec!["rs".to_string()],
3152                    language_server: Some(language_server_config),
3153                    ..Default::default()
3154                },
3155                Some(tree_sitter_rust::language()),
3156            )));
3157
3158        // Connect to a server as 2 clients.
3159        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3160        let client_a = server.create_client(&mut cx_a, "user_a").await;
3161        let client_b = server.create_client(&mut cx_b, "user_b").await;
3162
3163        // Share a project as client A
3164        let project_a = cx_a.update(|cx| {
3165            Project::local(
3166                client_a.clone(),
3167                client_a.user_store.clone(),
3168                lang_registry.clone(),
3169                fs.clone(),
3170                cx,
3171            )
3172        });
3173
3174        let (worktree_a, _) = project_a
3175            .update(&mut cx_a, |p, cx| {
3176                p.find_or_create_local_worktree("/root", false, cx)
3177            })
3178            .await
3179            .unwrap();
3180        worktree_a
3181            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3182            .await;
3183        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3184        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3185        project_a
3186            .update(&mut cx_a, |p, cx| p.share(cx))
3187            .await
3188            .unwrap();
3189
3190        // Join the worktree as client B.
3191        let project_b = Project::remote(
3192            project_id,
3193            client_b.clone(),
3194            client_b.user_store.clone(),
3195            lang_registry.clone(),
3196            fs.clone(),
3197            &mut cx_b.to_async(),
3198        )
3199        .await
3200        .unwrap();
3201
3202        let buffer_b1 = cx_b
3203            .background()
3204            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3205            .await
3206            .unwrap();
3207
3208        let definitions;
3209        let buffer_b2;
3210        if rng.gen() {
3211            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3212            buffer_b2 =
3213                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3214        } else {
3215            buffer_b2 =
3216                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3217            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3218        }
3219
3220        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3221        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3222            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3223                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3224                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3225            )))
3226        });
3227
3228        let buffer_b2 = buffer_b2.await.unwrap();
3229        let definitions = definitions.await.unwrap();
3230        assert_eq!(definitions.len(), 1);
3231        assert_eq!(definitions[0].buffer, buffer_b2);
3232    }
3233
3234    #[gpui::test(iterations = 10)]
3235    async fn test_collaborating_with_code_actions(
3236        mut cx_a: TestAppContext,
3237        mut cx_b: TestAppContext,
3238    ) {
3239        cx_a.foreground().forbid_parking();
3240        let mut lang_registry = Arc::new(LanguageRegistry::new());
3241        let fs = FakeFs::new(cx_a.background());
3242        let mut path_openers_b = Vec::new();
3243        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3244
3245        // Set up a fake language server.
3246        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3247        Arc::get_mut(&mut lang_registry)
3248            .unwrap()
3249            .add(Arc::new(Language::new(
3250                LanguageConfig {
3251                    name: "Rust".into(),
3252                    path_suffixes: vec!["rs".to_string()],
3253                    language_server: Some(language_server_config),
3254                    ..Default::default()
3255                },
3256                Some(tree_sitter_rust::language()),
3257            )));
3258
3259        // Connect to a server as 2 clients.
3260        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3261        let client_a = server.create_client(&mut cx_a, "user_a").await;
3262        let client_b = server.create_client(&mut cx_b, "user_b").await;
3263
3264        // Share a project as client A
3265        fs.insert_tree(
3266            "/a",
3267            json!({
3268                ".zed.toml": r#"collaborators = ["user_b"]"#,
3269                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3270                "other.rs": "pub fn foo() -> usize { 4 }",
3271            }),
3272        )
3273        .await;
3274        let project_a = cx_a.update(|cx| {
3275            Project::local(
3276                client_a.clone(),
3277                client_a.user_store.clone(),
3278                lang_registry.clone(),
3279                fs.clone(),
3280                cx,
3281            )
3282        });
3283        let (worktree_a, _) = project_a
3284            .update(&mut cx_a, |p, cx| {
3285                p.find_or_create_local_worktree("/a", false, cx)
3286            })
3287            .await
3288            .unwrap();
3289        worktree_a
3290            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3291            .await;
3292        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3293        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3294        project_a
3295            .update(&mut cx_a, |p, cx| p.share(cx))
3296            .await
3297            .unwrap();
3298
3299        // Join the worktree as client B.
3300        let project_b = Project::remote(
3301            project_id,
3302            client_b.clone(),
3303            client_b.user_store.clone(),
3304            lang_registry.clone(),
3305            fs.clone(),
3306            &mut cx_b.to_async(),
3307        )
3308        .await
3309        .unwrap();
3310        let mut params = cx_b.update(WorkspaceParams::test);
3311        params.languages = lang_registry.clone();
3312        params.client = client_b.client.clone();
3313        params.user_store = client_b.user_store.clone();
3314        params.project = project_b;
3315        params.path_openers = path_openers_b.into();
3316
3317        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3318        let editor_b = workspace_b
3319            .update(&mut cx_b, |workspace, cx| {
3320                workspace.open_path((worktree_id, "main.rs").into(), cx)
3321            })
3322            .await
3323            .unwrap()
3324            .downcast::<Editor>()
3325            .unwrap();
3326
3327        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3328        fake_language_server
3329            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3330                assert_eq!(
3331                    params.text_document.uri,
3332                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3333                );
3334                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3335                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3336                None
3337            })
3338            .next()
3339            .await;
3340
3341        // Move cursor to a location that contains code actions.
3342        editor_b.update(&mut cx_b, |editor, cx| {
3343            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3344            cx.focus(&editor_b);
3345        });
3346
3347        fake_language_server
3348            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3349                assert_eq!(
3350                    params.text_document.uri,
3351                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3352                );
3353                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3354                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3355
3356                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3357                    lsp::CodeAction {
3358                        title: "Inline into all callers".to_string(),
3359                        edit: Some(lsp::WorkspaceEdit {
3360                            changes: Some(
3361                                [
3362                                    (
3363                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3364                                        vec![lsp::TextEdit::new(
3365                                            lsp::Range::new(
3366                                                lsp::Position::new(1, 22),
3367                                                lsp::Position::new(1, 34),
3368                                            ),
3369                                            "4".to_string(),
3370                                        )],
3371                                    ),
3372                                    (
3373                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3374                                        vec![lsp::TextEdit::new(
3375                                            lsp::Range::new(
3376                                                lsp::Position::new(0, 0),
3377                                                lsp::Position::new(0, 27),
3378                                            ),
3379                                            "".to_string(),
3380                                        )],
3381                                    ),
3382                                ]
3383                                .into_iter()
3384                                .collect(),
3385                            ),
3386                            ..Default::default()
3387                        }),
3388                        data: Some(json!({
3389                            "codeActionParams": {
3390                                "range": {
3391                                    "start": {"line": 1, "column": 31},
3392                                    "end": {"line": 1, "column": 31},
3393                                }
3394                            }
3395                        })),
3396                        ..Default::default()
3397                    },
3398                )])
3399            })
3400            .next()
3401            .await;
3402
3403        // Toggle code actions and wait for them to display.
3404        editor_b.update(&mut cx_b, |editor, cx| {
3405            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3406        });
3407        editor_b
3408            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3409            .await;
3410
3411        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3412
3413        // Confirming the code action will trigger a resolve request.
3414        let confirm_action = workspace_b
3415            .update(&mut cx_b, |workspace, cx| {
3416                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3417            })
3418            .unwrap();
3419        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3420            lsp::CodeAction {
3421                title: "Inline into all callers".to_string(),
3422                edit: Some(lsp::WorkspaceEdit {
3423                    changes: Some(
3424                        [
3425                            (
3426                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3427                                vec![lsp::TextEdit::new(
3428                                    lsp::Range::new(
3429                                        lsp::Position::new(1, 22),
3430                                        lsp::Position::new(1, 34),
3431                                    ),
3432                                    "4".to_string(),
3433                                )],
3434                            ),
3435                            (
3436                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3437                                vec![lsp::TextEdit::new(
3438                                    lsp::Range::new(
3439                                        lsp::Position::new(0, 0),
3440                                        lsp::Position::new(0, 27),
3441                                    ),
3442                                    "".to_string(),
3443                                )],
3444                            ),
3445                        ]
3446                        .into_iter()
3447                        .collect(),
3448                    ),
3449                    ..Default::default()
3450                }),
3451                ..Default::default()
3452            }
3453        });
3454
3455        // After the action is confirmed, an editor containing both modified files is opened.
3456        confirm_action.await.unwrap();
3457        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3458            workspace
3459                .active_item(cx)
3460                .unwrap()
3461                .downcast::<Editor>()
3462                .unwrap()
3463        });
3464        code_action_editor.update(&mut cx_b, |editor, cx| {
3465            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3466            editor.undo(&Undo, cx);
3467            assert_eq!(
3468                editor.text(cx),
3469                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3470            );
3471            editor.redo(&Redo, cx);
3472            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3473        });
3474    }
3475
3476    #[gpui::test(iterations = 10)]
3477    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3478        cx_a.foreground().forbid_parking();
3479        let mut lang_registry = Arc::new(LanguageRegistry::new());
3480        let fs = FakeFs::new(cx_a.background());
3481        let mut path_openers_b = Vec::new();
3482        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3483
3484        // Set up a fake language server.
3485        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3486        Arc::get_mut(&mut lang_registry)
3487            .unwrap()
3488            .add(Arc::new(Language::new(
3489                LanguageConfig {
3490                    name: "Rust".into(),
3491                    path_suffixes: vec!["rs".to_string()],
3492                    language_server: Some(language_server_config),
3493                    ..Default::default()
3494                },
3495                Some(tree_sitter_rust::language()),
3496            )));
3497
3498        // Connect to a server as 2 clients.
3499        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3500        let client_a = server.create_client(&mut cx_a, "user_a").await;
3501        let client_b = server.create_client(&mut cx_b, "user_b").await;
3502
3503        // Share a project as client A
3504        fs.insert_tree(
3505            "/dir",
3506            json!({
3507                ".zed.toml": r#"collaborators = ["user_b"]"#,
3508                "one.rs": "const ONE: usize = 1;",
3509                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3510            }),
3511        )
3512        .await;
3513        let project_a = cx_a.update(|cx| {
3514            Project::local(
3515                client_a.clone(),
3516                client_a.user_store.clone(),
3517                lang_registry.clone(),
3518                fs.clone(),
3519                cx,
3520            )
3521        });
3522        let (worktree_a, _) = project_a
3523            .update(&mut cx_a, |p, cx| {
3524                p.find_or_create_local_worktree("/dir", false, cx)
3525            })
3526            .await
3527            .unwrap();
3528        worktree_a
3529            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3530            .await;
3531        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3532        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3533        project_a
3534            .update(&mut cx_a, |p, cx| p.share(cx))
3535            .await
3536            .unwrap();
3537
3538        // Join the worktree as client B.
3539        let project_b = Project::remote(
3540            project_id,
3541            client_b.clone(),
3542            client_b.user_store.clone(),
3543            lang_registry.clone(),
3544            fs.clone(),
3545            &mut cx_b.to_async(),
3546        )
3547        .await
3548        .unwrap();
3549        let mut params = cx_b.update(WorkspaceParams::test);
3550        params.languages = lang_registry.clone();
3551        params.client = client_b.client.clone();
3552        params.user_store = client_b.user_store.clone();
3553        params.project = project_b;
3554        params.path_openers = path_openers_b.into();
3555
3556        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3557        let editor_b = workspace_b
3558            .update(&mut cx_b, |workspace, cx| {
3559                workspace.open_path((worktree_id, "one.rs").into(), cx)
3560            })
3561            .await
3562            .unwrap()
3563            .downcast::<Editor>()
3564            .unwrap();
3565        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3566
3567        // Move cursor to a location that can be renamed.
3568        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3569            editor.select_ranges([7..7], None, cx);
3570            editor.rename(&Rename, cx).unwrap()
3571        });
3572
3573        fake_language_server
3574            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3575                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3576                assert_eq!(params.position, lsp::Position::new(0, 7));
3577                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3578                    lsp::Position::new(0, 6),
3579                    lsp::Position::new(0, 9),
3580                )))
3581            })
3582            .next()
3583            .await
3584            .unwrap();
3585        prepare_rename.await.unwrap();
3586        editor_b.update(&mut cx_b, |editor, cx| {
3587            let rename = editor.pending_rename().unwrap();
3588            let buffer = editor.buffer().read(cx).snapshot(cx);
3589            assert_eq!(
3590                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3591                6..9
3592            );
3593            rename.editor.update(cx, |rename_editor, cx| {
3594                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3595                    rename_buffer.edit([0..3], "THREE", cx);
3596                });
3597            });
3598        });
3599
3600        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3601            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3602        });
3603        fake_language_server
3604            .handle_request::<lsp::request::Rename, _>(|params, _| {
3605                assert_eq!(
3606                    params.text_document_position.text_document.uri.as_str(),
3607                    "file:///dir/one.rs"
3608                );
3609                assert_eq!(
3610                    params.text_document_position.position,
3611                    lsp::Position::new(0, 6)
3612                );
3613                assert_eq!(params.new_name, "THREE");
3614                Some(lsp::WorkspaceEdit {
3615                    changes: Some(
3616                        [
3617                            (
3618                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3619                                vec![lsp::TextEdit::new(
3620                                    lsp::Range::new(
3621                                        lsp::Position::new(0, 6),
3622                                        lsp::Position::new(0, 9),
3623                                    ),
3624                                    "THREE".to_string(),
3625                                )],
3626                            ),
3627                            (
3628                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3629                                vec![
3630                                    lsp::TextEdit::new(
3631                                        lsp::Range::new(
3632                                            lsp::Position::new(0, 24),
3633                                            lsp::Position::new(0, 27),
3634                                        ),
3635                                        "THREE".to_string(),
3636                                    ),
3637                                    lsp::TextEdit::new(
3638                                        lsp::Range::new(
3639                                            lsp::Position::new(0, 35),
3640                                            lsp::Position::new(0, 38),
3641                                        ),
3642                                        "THREE".to_string(),
3643                                    ),
3644                                ],
3645                            ),
3646                        ]
3647                        .into_iter()
3648                        .collect(),
3649                    ),
3650                    ..Default::default()
3651                })
3652            })
3653            .next()
3654            .await
3655            .unwrap();
3656        confirm_rename.await.unwrap();
3657
3658        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3659            workspace
3660                .active_item(cx)
3661                .unwrap()
3662                .downcast::<Editor>()
3663                .unwrap()
3664        });
3665        rename_editor.update(&mut cx_b, |editor, cx| {
3666            assert_eq!(
3667                editor.text(cx),
3668                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3669            );
3670            editor.undo(&Undo, cx);
3671            assert_eq!(
3672                editor.text(cx),
3673                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3674            );
3675            editor.redo(&Redo, cx);
3676            assert_eq!(
3677                editor.text(cx),
3678                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3679            );
3680        });
3681
3682        // Ensure temporary rename edits cannot be undone/redone.
3683        editor_b.update(&mut cx_b, |editor, cx| {
3684            editor.undo(&Undo, cx);
3685            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3686            editor.undo(&Undo, cx);
3687            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3688            editor.redo(&Redo, cx);
3689            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3690        })
3691    }
3692
3693    #[gpui::test(iterations = 10)]
3694    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3695        cx_a.foreground().forbid_parking();
3696
3697        // Connect to a server as 2 clients.
3698        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3699        let client_a = server.create_client(&mut cx_a, "user_a").await;
3700        let client_b = server.create_client(&mut cx_b, "user_b").await;
3701
3702        // Create an org that includes these 2 users.
3703        let db = &server.app_state.db;
3704        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3705        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3706            .await
3707            .unwrap();
3708        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3709            .await
3710            .unwrap();
3711
3712        // Create a channel that includes all the users.
3713        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3714        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3715            .await
3716            .unwrap();
3717        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3718            .await
3719            .unwrap();
3720        db.create_channel_message(
3721            channel_id,
3722            client_b.current_user_id(&cx_b),
3723            "hello A, it's B.",
3724            OffsetDateTime::now_utc(),
3725            1,
3726        )
3727        .await
3728        .unwrap();
3729
3730        let channels_a = cx_a
3731            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3732        channels_a
3733            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3734            .await;
3735        channels_a.read_with(&cx_a, |list, _| {
3736            assert_eq!(
3737                list.available_channels().unwrap(),
3738                &[ChannelDetails {
3739                    id: channel_id.to_proto(),
3740                    name: "test-channel".to_string()
3741                }]
3742            )
3743        });
3744        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3745            this.get_channel(channel_id.to_proto(), cx).unwrap()
3746        });
3747        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3748        channel_a
3749            .condition(&cx_a, |channel, _| {
3750                channel_messages(channel)
3751                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3752            })
3753            .await;
3754
3755        let channels_b = cx_b
3756            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3757        channels_b
3758            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3759            .await;
3760        channels_b.read_with(&cx_b, |list, _| {
3761            assert_eq!(
3762                list.available_channels().unwrap(),
3763                &[ChannelDetails {
3764                    id: channel_id.to_proto(),
3765                    name: "test-channel".to_string()
3766                }]
3767            )
3768        });
3769
3770        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3771            this.get_channel(channel_id.to_proto(), cx).unwrap()
3772        });
3773        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3774        channel_b
3775            .condition(&cx_b, |channel, _| {
3776                channel_messages(channel)
3777                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3778            })
3779            .await;
3780
3781        channel_a
3782            .update(&mut cx_a, |channel, cx| {
3783                channel
3784                    .send_message("oh, hi B.".to_string(), cx)
3785                    .unwrap()
3786                    .detach();
3787                let task = channel.send_message("sup".to_string(), cx).unwrap();
3788                assert_eq!(
3789                    channel_messages(channel),
3790                    &[
3791                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3792                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3793                        ("user_a".to_string(), "sup".to_string(), true)
3794                    ]
3795                );
3796                task
3797            })
3798            .await
3799            .unwrap();
3800
3801        channel_b
3802            .condition(&cx_b, |channel, _| {
3803                channel_messages(channel)
3804                    == [
3805                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3806                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3807                        ("user_a".to_string(), "sup".to_string(), false),
3808                    ]
3809            })
3810            .await;
3811
3812        assert_eq!(
3813            server
3814                .state()
3815                .await
3816                .channel(channel_id)
3817                .unwrap()
3818                .connection_ids
3819                .len(),
3820            2
3821        );
3822        cx_b.update(|_| drop(channel_b));
3823        server
3824            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3825            .await;
3826
3827        cx_a.update(|_| drop(channel_a));
3828        server
3829            .condition(|state| state.channel(channel_id).is_none())
3830            .await;
3831    }
3832
3833    #[gpui::test(iterations = 10)]
3834    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3835        cx_a.foreground().forbid_parking();
3836
3837        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3838        let client_a = server.create_client(&mut cx_a, "user_a").await;
3839
3840        let db = &server.app_state.db;
3841        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3842        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3843        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3844            .await
3845            .unwrap();
3846        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3847            .await
3848            .unwrap();
3849
3850        let channels_a = cx_a
3851            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3852        channels_a
3853            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3854            .await;
3855        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3856            this.get_channel(channel_id.to_proto(), cx).unwrap()
3857        });
3858
3859        // Messages aren't allowed to be too long.
3860        channel_a
3861            .update(&mut cx_a, |channel, cx| {
3862                let long_body = "this is long.\n".repeat(1024);
3863                channel.send_message(long_body, cx).unwrap()
3864            })
3865            .await
3866            .unwrap_err();
3867
3868        // Messages aren't allowed to be blank.
3869        channel_a.update(&mut cx_a, |channel, cx| {
3870            channel.send_message(String::new(), cx).unwrap_err()
3871        });
3872
3873        // Leading and trailing whitespace are trimmed.
3874        channel_a
3875            .update(&mut cx_a, |channel, cx| {
3876                channel
3877                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3878                    .unwrap()
3879            })
3880            .await
3881            .unwrap();
3882        assert_eq!(
3883            db.get_channel_messages(channel_id, 10, None)
3884                .await
3885                .unwrap()
3886                .iter()
3887                .map(|m| &m.body)
3888                .collect::<Vec<_>>(),
3889            &["surrounded by whitespace"]
3890        );
3891    }
3892
3893    #[gpui::test(iterations = 10)]
3894    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3895        cx_a.foreground().forbid_parking();
3896
3897        // Connect to a server as 2 clients.
3898        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3899        let client_a = server.create_client(&mut cx_a, "user_a").await;
3900        let client_b = server.create_client(&mut cx_b, "user_b").await;
3901        let mut status_b = client_b.status();
3902
3903        // Create an org that includes these 2 users.
3904        let db = &server.app_state.db;
3905        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3906        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3907            .await
3908            .unwrap();
3909        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3910            .await
3911            .unwrap();
3912
3913        // Create a channel that includes all the users.
3914        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3915        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3916            .await
3917            .unwrap();
3918        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3919            .await
3920            .unwrap();
3921        db.create_channel_message(
3922            channel_id,
3923            client_b.current_user_id(&cx_b),
3924            "hello A, it's B.",
3925            OffsetDateTime::now_utc(),
3926            2,
3927        )
3928        .await
3929        .unwrap();
3930
3931        let channels_a = cx_a
3932            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3933        channels_a
3934            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3935            .await;
3936
3937        channels_a.read_with(&cx_a, |list, _| {
3938            assert_eq!(
3939                list.available_channels().unwrap(),
3940                &[ChannelDetails {
3941                    id: channel_id.to_proto(),
3942                    name: "test-channel".to_string()
3943                }]
3944            )
3945        });
3946        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3947            this.get_channel(channel_id.to_proto(), cx).unwrap()
3948        });
3949        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3950        channel_a
3951            .condition(&cx_a, |channel, _| {
3952                channel_messages(channel)
3953                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3954            })
3955            .await;
3956
3957        let channels_b = cx_b
3958            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3959        channels_b
3960            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3961            .await;
3962        channels_b.read_with(&cx_b, |list, _| {
3963            assert_eq!(
3964                list.available_channels().unwrap(),
3965                &[ChannelDetails {
3966                    id: channel_id.to_proto(),
3967                    name: "test-channel".to_string()
3968                }]
3969            )
3970        });
3971
3972        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3973            this.get_channel(channel_id.to_proto(), cx).unwrap()
3974        });
3975        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3976        channel_b
3977            .condition(&cx_b, |channel, _| {
3978                channel_messages(channel)
3979                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3980            })
3981            .await;
3982
3983        // Disconnect client B, ensuring we can still access its cached channel data.
3984        server.forbid_connections();
3985        server.disconnect_client(client_b.current_user_id(&cx_b));
3986        while !matches!(
3987            status_b.next().await,
3988            Some(client::Status::ReconnectionError { .. })
3989        ) {}
3990
3991        channels_b.read_with(&cx_b, |channels, _| {
3992            assert_eq!(
3993                channels.available_channels().unwrap(),
3994                [ChannelDetails {
3995                    id: channel_id.to_proto(),
3996                    name: "test-channel".to_string()
3997                }]
3998            )
3999        });
4000        channel_b.read_with(&cx_b, |channel, _| {
4001            assert_eq!(
4002                channel_messages(channel),
4003                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4004            )
4005        });
4006
4007        // Send a message from client B while it is disconnected.
4008        channel_b
4009            .update(&mut cx_b, |channel, cx| {
4010                let task = channel
4011                    .send_message("can you see this?".to_string(), cx)
4012                    .unwrap();
4013                assert_eq!(
4014                    channel_messages(channel),
4015                    &[
4016                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4017                        ("user_b".to_string(), "can you see this?".to_string(), true)
4018                    ]
4019                );
4020                task
4021            })
4022            .await
4023            .unwrap_err();
4024
4025        // Send a message from client A while B is disconnected.
4026        channel_a
4027            .update(&mut cx_a, |channel, cx| {
4028                channel
4029                    .send_message("oh, hi B.".to_string(), cx)
4030                    .unwrap()
4031                    .detach();
4032                let task = channel.send_message("sup".to_string(), cx).unwrap();
4033                assert_eq!(
4034                    channel_messages(channel),
4035                    &[
4036                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4037                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4038                        ("user_a".to_string(), "sup".to_string(), true)
4039                    ]
4040                );
4041                task
4042            })
4043            .await
4044            .unwrap();
4045
4046        // Give client B a chance to reconnect.
4047        server.allow_connections();
4048        cx_b.foreground().advance_clock(Duration::from_secs(10));
4049
4050        // Verify that B sees the new messages upon reconnection, as well as the message client B
4051        // sent while offline.
4052        channel_b
4053            .condition(&cx_b, |channel, _| {
4054                channel_messages(channel)
4055                    == [
4056                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4057                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4058                        ("user_a".to_string(), "sup".to_string(), false),
4059                        ("user_b".to_string(), "can you see this?".to_string(), false),
4060                    ]
4061            })
4062            .await;
4063
4064        // Ensure client A and B can communicate normally after reconnection.
4065        channel_a
4066            .update(&mut cx_a, |channel, cx| {
4067                channel.send_message("you online?".to_string(), cx).unwrap()
4068            })
4069            .await
4070            .unwrap();
4071        channel_b
4072            .condition(&cx_b, |channel, _| {
4073                channel_messages(channel)
4074                    == [
4075                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4076                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4077                        ("user_a".to_string(), "sup".to_string(), false),
4078                        ("user_b".to_string(), "can you see this?".to_string(), false),
4079                        ("user_a".to_string(), "you online?".to_string(), false),
4080                    ]
4081            })
4082            .await;
4083
4084        channel_b
4085            .update(&mut cx_b, |channel, cx| {
4086                channel.send_message("yep".to_string(), cx).unwrap()
4087            })
4088            .await
4089            .unwrap();
4090        channel_a
4091            .condition(&cx_a, |channel, _| {
4092                channel_messages(channel)
4093                    == [
4094                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4095                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4096                        ("user_a".to_string(), "sup".to_string(), false),
4097                        ("user_b".to_string(), "can you see this?".to_string(), false),
4098                        ("user_a".to_string(), "you online?".to_string(), false),
4099                        ("user_b".to_string(), "yep".to_string(), false),
4100                    ]
4101            })
4102            .await;
4103    }
4104
4105    #[gpui::test(iterations = 10)]
4106    async fn test_contacts(
4107        mut cx_a: TestAppContext,
4108        mut cx_b: TestAppContext,
4109        mut cx_c: TestAppContext,
4110    ) {
4111        cx_a.foreground().forbid_parking();
4112        let lang_registry = Arc::new(LanguageRegistry::new());
4113        let fs = FakeFs::new(cx_a.background());
4114
4115        // Connect to a server as 3 clients.
4116        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4117        let client_a = server.create_client(&mut cx_a, "user_a").await;
4118        let client_b = server.create_client(&mut cx_b, "user_b").await;
4119        let client_c = server.create_client(&mut cx_c, "user_c").await;
4120
4121        // Share a worktree as client A.
4122        fs.insert_tree(
4123            "/a",
4124            json!({
4125                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4126            }),
4127        )
4128        .await;
4129
4130        let project_a = cx_a.update(|cx| {
4131            Project::local(
4132                client_a.clone(),
4133                client_a.user_store.clone(),
4134                lang_registry.clone(),
4135                fs.clone(),
4136                cx,
4137            )
4138        });
4139        let (worktree_a, _) = project_a
4140            .update(&mut cx_a, |p, cx| {
4141                p.find_or_create_local_worktree("/a", false, cx)
4142            })
4143            .await
4144            .unwrap();
4145        worktree_a
4146            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4147            .await;
4148
4149        client_a
4150            .user_store
4151            .condition(&cx_a, |user_store, _| {
4152                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4153            })
4154            .await;
4155        client_b
4156            .user_store
4157            .condition(&cx_b, |user_store, _| {
4158                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4159            })
4160            .await;
4161        client_c
4162            .user_store
4163            .condition(&cx_c, |user_store, _| {
4164                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4165            })
4166            .await;
4167
4168        let project_id = project_a
4169            .update(&mut cx_a, |project, _| project.next_remote_id())
4170            .await;
4171        project_a
4172            .update(&mut cx_a, |project, cx| project.share(cx))
4173            .await
4174            .unwrap();
4175
4176        let _project_b = Project::remote(
4177            project_id,
4178            client_b.clone(),
4179            client_b.user_store.clone(),
4180            lang_registry.clone(),
4181            fs.clone(),
4182            &mut cx_b.to_async(),
4183        )
4184        .await
4185        .unwrap();
4186
4187        client_a
4188            .user_store
4189            .condition(&cx_a, |user_store, _| {
4190                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4191            })
4192            .await;
4193        client_b
4194            .user_store
4195            .condition(&cx_b, |user_store, _| {
4196                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4197            })
4198            .await;
4199        client_c
4200            .user_store
4201            .condition(&cx_c, |user_store, _| {
4202                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4203            })
4204            .await;
4205
4206        project_a
4207            .condition(&cx_a, |project, _| {
4208                project.collaborators().contains_key(&client_b.peer_id)
4209            })
4210            .await;
4211
4212        cx_a.update(move |_| drop(project_a));
4213        client_a
4214            .user_store
4215            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4216            .await;
4217        client_b
4218            .user_store
4219            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4220            .await;
4221        client_c
4222            .user_store
4223            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4224            .await;
4225
4226        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4227            user_store
4228                .contacts()
4229                .iter()
4230                .map(|contact| {
4231                    let worktrees = contact
4232                        .projects
4233                        .iter()
4234                        .map(|p| {
4235                            (
4236                                p.worktree_root_names[0].as_str(),
4237                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4238                            )
4239                        })
4240                        .collect();
4241                    (contact.user.github_login.as_str(), worktrees)
4242                })
4243                .collect()
4244        }
4245    }
4246
4247    #[gpui::test(iterations = 100)]
4248    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4249        cx.foreground().forbid_parking();
4250        let max_peers = env::var("MAX_PEERS")
4251            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4252            .unwrap_or(5);
4253        let max_operations = env::var("OPERATIONS")
4254            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4255            .unwrap_or(10);
4256
4257        let rng = Arc::new(Mutex::new(rng));
4258
4259        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4260        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4261
4262        let fs = FakeFs::new(cx.background());
4263        fs.insert_tree(
4264            "/_collab",
4265            json!({
4266                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4267            }),
4268        )
4269        .await;
4270
4271        let operations = Rc::new(Cell::new(0));
4272        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4273        let mut clients = Vec::new();
4274
4275        let mut next_entity_id = 100000;
4276        let mut host_cx = TestAppContext::new(
4277            cx.foreground_platform(),
4278            cx.platform(),
4279            cx.foreground(),
4280            cx.background(),
4281            cx.font_cache(),
4282            next_entity_id,
4283        );
4284        let host = server.create_client(&mut host_cx, "host").await;
4285        let host_project = host_cx.update(|cx| {
4286            Project::local(
4287                host.client.clone(),
4288                host.user_store.clone(),
4289                Arc::new(LanguageRegistry::new()),
4290                fs.clone(),
4291                cx,
4292            )
4293        });
4294        let host_project_id = host_project
4295            .update(&mut host_cx, |p, _| p.next_remote_id())
4296            .await;
4297
4298        let (collab_worktree, _) = host_project
4299            .update(&mut host_cx, |project, cx| {
4300                project.find_or_create_local_worktree("/_collab", false, cx)
4301            })
4302            .await
4303            .unwrap();
4304        collab_worktree
4305            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4306            .await;
4307        host_project
4308            .update(&mut host_cx, |project, cx| project.share(cx))
4309            .await
4310            .unwrap();
4311
4312        clients.push(cx.foreground().spawn(host.simulate_host(
4313            host_project.clone(),
4314            language_server_config,
4315            operations.clone(),
4316            max_operations,
4317            rng.clone(),
4318            host_cx.clone(),
4319        )));
4320
4321        while operations.get() < max_operations {
4322            cx.background().simulate_random_delay().await;
4323            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4324                operations.set(operations.get() + 1);
4325
4326                let guest_id = clients.len();
4327                log::info!("Adding guest {}", guest_id);
4328                next_entity_id += 100000;
4329                let mut guest_cx = TestAppContext::new(
4330                    cx.foreground_platform(),
4331                    cx.platform(),
4332                    cx.foreground(),
4333                    cx.background(),
4334                    cx.font_cache(),
4335                    next_entity_id,
4336                );
4337                let guest = server
4338                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4339                    .await;
4340                let guest_project = Project::remote(
4341                    host_project_id,
4342                    guest.client.clone(),
4343                    guest.user_store.clone(),
4344                    guest_lang_registry.clone(),
4345                    fs.clone(),
4346                    &mut guest_cx.to_async(),
4347                )
4348                .await
4349                .unwrap();
4350                clients.push(cx.foreground().spawn(guest.simulate_guest(
4351                    guest_id,
4352                    guest_project,
4353                    operations.clone(),
4354                    max_operations,
4355                    rng.clone(),
4356                    guest_cx,
4357                )));
4358
4359                log::info!("Guest {} added", guest_id);
4360            }
4361        }
4362
4363        let clients = futures::future::join_all(clients).await;
4364        cx.foreground().run_until_parked();
4365
4366        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4367            project
4368                .worktrees(cx)
4369                .map(|worktree| {
4370                    let snapshot = worktree.read(cx).snapshot();
4371                    (snapshot.id(), snapshot)
4372                })
4373                .collect::<BTreeMap<_, _>>()
4374        });
4375
4376        for (guest_client, guest_cx) in clients.iter().skip(1) {
4377            let guest_id = guest_client.client.id();
4378            let worktree_snapshots =
4379                guest_client
4380                    .project
4381                    .as_ref()
4382                    .unwrap()
4383                    .read_with(guest_cx, |project, cx| {
4384                        project
4385                            .worktrees(cx)
4386                            .map(|worktree| {
4387                                let worktree = worktree.read(cx);
4388                                (worktree.id(), worktree.snapshot())
4389                            })
4390                            .collect::<BTreeMap<_, _>>()
4391                    });
4392
4393            assert_eq!(
4394                worktree_snapshots.keys().collect::<Vec<_>>(),
4395                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4396                "guest {} has different worktrees than the host",
4397                guest_id
4398            );
4399            for (id, host_snapshot) in &host_worktree_snapshots {
4400                let guest_snapshot = &worktree_snapshots[id];
4401                assert_eq!(
4402                    guest_snapshot.root_name(),
4403                    host_snapshot.root_name(),
4404                    "guest {} has different root name than the host for worktree {}",
4405                    guest_id,
4406                    id
4407                );
4408                assert_eq!(
4409                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4410                    host_snapshot.entries(false).collect::<Vec<_>>(),
4411                    "guest {} has different snapshot than the host for worktree {}",
4412                    guest_id,
4413                    id
4414                );
4415            }
4416
4417            guest_client
4418                .project
4419                .as_ref()
4420                .unwrap()
4421                .read_with(guest_cx, |project, _| {
4422                    assert!(
4423                        !project.has_buffered_operations(),
4424                        "guest {} has buffered operations ",
4425                        guest_id,
4426                    );
4427                });
4428
4429            for guest_buffer in &guest_client.buffers {
4430                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4431                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4432                    project
4433                        .shared_buffer(guest_client.peer_id, buffer_id)
4434                        .expect(&format!(
4435                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4436                            guest_id, guest_client.peer_id, buffer_id
4437                        ))
4438                });
4439                assert_eq!(
4440                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4441                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4442                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4443                    guest_id,
4444                    buffer_id,
4445                    host_buffer
4446                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4447                );
4448            }
4449        }
4450    }
4451
4452    struct TestServer {
4453        peer: Arc<Peer>,
4454        app_state: Arc<AppState>,
4455        server: Arc<Server>,
4456        foreground: Rc<executor::Foreground>,
4457        notifications: mpsc::UnboundedReceiver<()>,
4458        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4459        forbid_connections: Arc<AtomicBool>,
4460        _test_db: TestDb,
4461    }
4462
4463    impl TestServer {
4464        async fn start(
4465            foreground: Rc<executor::Foreground>,
4466            background: Arc<executor::Background>,
4467        ) -> Self {
4468            let test_db = TestDb::fake(background);
4469            let app_state = Self::build_app_state(&test_db).await;
4470            let peer = Peer::new();
4471            let notifications = mpsc::unbounded();
4472            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4473            Self {
4474                peer,
4475                app_state,
4476                server,
4477                foreground,
4478                notifications: notifications.1,
4479                connection_killers: Default::default(),
4480                forbid_connections: Default::default(),
4481                _test_db: test_db,
4482            }
4483        }
4484
4485        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4486            let http = FakeHttpClient::with_404_response();
4487            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4488            let client_name = name.to_string();
4489            let mut client = Client::new(http.clone());
4490            let server = self.server.clone();
4491            let connection_killers = self.connection_killers.clone();
4492            let forbid_connections = self.forbid_connections.clone();
4493            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4494
4495            Arc::get_mut(&mut client)
4496                .unwrap()
4497                .override_authenticate(move |cx| {
4498                    cx.spawn(|_| async move {
4499                        let access_token = "the-token".to_string();
4500                        Ok(Credentials {
4501                            user_id: user_id.0 as u64,
4502                            access_token,
4503                        })
4504                    })
4505                })
4506                .override_establish_connection(move |credentials, cx| {
4507                    assert_eq!(credentials.user_id, user_id.0 as u64);
4508                    assert_eq!(credentials.access_token, "the-token");
4509
4510                    let server = server.clone();
4511                    let connection_killers = connection_killers.clone();
4512                    let forbid_connections = forbid_connections.clone();
4513                    let client_name = client_name.clone();
4514                    let connection_id_tx = connection_id_tx.clone();
4515                    cx.spawn(move |cx| async move {
4516                        if forbid_connections.load(SeqCst) {
4517                            Err(EstablishConnectionError::other(anyhow!(
4518                                "server is forbidding connections"
4519                            )))
4520                        } else {
4521                            let (client_conn, server_conn, kill_conn) =
4522                                Connection::in_memory(cx.background());
4523                            connection_killers.lock().insert(user_id, kill_conn);
4524                            cx.background()
4525                                .spawn(server.handle_connection(
4526                                    server_conn,
4527                                    client_name,
4528                                    user_id,
4529                                    Some(connection_id_tx),
4530                                    cx.background(),
4531                                ))
4532                                .detach();
4533                            Ok(client_conn)
4534                        }
4535                    })
4536                });
4537
4538            client
4539                .authenticate_and_connect(&cx.to_async())
4540                .await
4541                .unwrap();
4542
4543            Channel::init(&client);
4544            Project::init(&client);
4545
4546            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4547            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4548            let mut authed_user =
4549                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4550            while authed_user.next().await.unwrap().is_none() {}
4551
4552            TestClient {
4553                client,
4554                peer_id,
4555                user_store,
4556                project: Default::default(),
4557                buffers: Default::default(),
4558            }
4559        }
4560
4561        fn disconnect_client(&self, user_id: UserId) {
4562            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4563                let _ = kill_conn.try_send(Some(()));
4564            }
4565        }
4566
4567        fn forbid_connections(&self) {
4568            self.forbid_connections.store(true, SeqCst);
4569        }
4570
4571        fn allow_connections(&self) {
4572            self.forbid_connections.store(false, SeqCst);
4573        }
4574
4575        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4576            let mut config = Config::default();
4577            config.session_secret = "a".repeat(32);
4578            config.database_url = test_db.url.clone();
4579            let github_client = github::AppClient::test();
4580            Arc::new(AppState {
4581                db: test_db.db().clone(),
4582                handlebars: Default::default(),
4583                auth_client: auth::build_client("", ""),
4584                repo_client: github::RepoClient::test(&github_client),
4585                github_client,
4586                config,
4587            })
4588        }
4589
4590        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4591            self.server.store.read()
4592        }
4593
4594        async fn condition<F>(&mut self, mut predicate: F)
4595        where
4596            F: FnMut(&Store) -> bool,
4597        {
4598            async_std::future::timeout(Duration::from_millis(500), async {
4599                while !(predicate)(&*self.server.store.read()) {
4600                    self.foreground.start_waiting();
4601                    self.notifications.next().await;
4602                    self.foreground.finish_waiting();
4603                }
4604            })
4605            .await
4606            .expect("condition timed out");
4607        }
4608    }
4609
4610    impl Drop for TestServer {
4611        fn drop(&mut self) {
4612            self.peer.reset();
4613        }
4614    }
4615
4616    struct TestClient {
4617        client: Arc<Client>,
4618        pub peer_id: PeerId,
4619        pub user_store: ModelHandle<UserStore>,
4620        project: Option<ModelHandle<Project>>,
4621        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4622    }
4623
4624    impl Deref for TestClient {
4625        type Target = Arc<Client>;
4626
4627        fn deref(&self) -> &Self::Target {
4628            &self.client
4629        }
4630    }
4631
4632    impl TestClient {
4633        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4634            UserId::from_proto(
4635                self.user_store
4636                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4637            )
4638        }
4639
4640        fn simulate_host(
4641            mut self,
4642            project: ModelHandle<Project>,
4643            mut language_server_config: LanguageServerConfig,
4644            operations: Rc<Cell<usize>>,
4645            max_operations: usize,
4646            rng: Arc<Mutex<StdRng>>,
4647            mut cx: TestAppContext,
4648        ) -> impl Future<Output = (Self, TestAppContext)> {
4649            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4650
4651            // Set up a fake language server.
4652            language_server_config.set_fake_initializer({
4653                let rng = rng.clone();
4654                let files = files.clone();
4655                let project = project.clone();
4656                move |fake_server| {
4657                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4658                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4659                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4660                                range: lsp::Range::new(
4661                                    lsp::Position::new(0, 0),
4662                                    lsp::Position::new(0, 0),
4663                                ),
4664                                new_text: "the-new-text".to_string(),
4665                            })),
4666                            ..Default::default()
4667                        }]))
4668                    });
4669
4670                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4671                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4672                            lsp::CodeAction {
4673                                title: "the-code-action".to_string(),
4674                                ..Default::default()
4675                            },
4676                        )])
4677                    });
4678
4679                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4680                        |params, _| {
4681                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4682                                params.position,
4683                                params.position,
4684                            )))
4685                        },
4686                    );
4687
4688                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4689                        let files = files.clone();
4690                        let rng = rng.clone();
4691                        move |_, _| {
4692                            let files = files.lock();
4693                            let mut rng = rng.lock();
4694                            let count = rng.gen_range::<usize, _>(1..3);
4695                            let files = (0..count)
4696                                .map(|_| files.choose(&mut *rng).unwrap())
4697                                .collect::<Vec<_>>();
4698                            log::info!("LSP: Returning definitions in files {:?}", &files);
4699                            Some(lsp::GotoDefinitionResponse::Array(
4700                                files
4701                                    .into_iter()
4702                                    .map(|file| lsp::Location {
4703                                        uri: lsp::Url::from_file_path(file).unwrap(),
4704                                        range: Default::default(),
4705                                    })
4706                                    .collect(),
4707                            ))
4708                        }
4709                    });
4710
4711                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4712                        let rng = rng.clone();
4713                        let project = project.clone();
4714                        move |params, mut cx| {
4715                            project.update(&mut cx, |project, cx| {
4716                                let path = params
4717                                    .text_document_position_params
4718                                    .text_document
4719                                    .uri
4720                                    .to_file_path()
4721                                    .unwrap();
4722                                let (worktree, relative_path) =
4723                                    project.find_local_worktree(&path, cx)?;
4724                                let project_path =
4725                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4726                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4727
4728                                let mut highlights = Vec::new();
4729                                let highlight_count = rng.lock().gen_range(1..=5);
4730                                let mut prev_end = 0;
4731                                for _ in 0..highlight_count {
4732                                    let range =
4733                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4734                                    let start =
4735                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4736                                    let end =
4737                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4738                                    highlights.push(lsp::DocumentHighlight {
4739                                        range: lsp::Range::new(start, end),
4740                                        kind: Some(lsp::DocumentHighlightKind::READ),
4741                                    });
4742                                    prev_end = range.end;
4743                                }
4744                                Some(highlights)
4745                            })
4746                        }
4747                    });
4748                }
4749            });
4750
4751            project.update(&mut cx, |project, _| {
4752                project.languages().add(Arc::new(Language::new(
4753                    LanguageConfig {
4754                        name: "Rust".into(),
4755                        path_suffixes: vec!["rs".to_string()],
4756                        language_server: Some(language_server_config),
4757                        ..Default::default()
4758                    },
4759                    None,
4760                )));
4761            });
4762
4763            async move {
4764                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4765                while operations.get() < max_operations {
4766                    operations.set(operations.get() + 1);
4767
4768                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4769                    match distribution {
4770                        0..=20 if !files.lock().is_empty() => {
4771                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4772                            let mut path = path.as_path();
4773                            while let Some(parent_path) = path.parent() {
4774                                path = parent_path;
4775                                if rng.lock().gen() {
4776                                    break;
4777                                }
4778                            }
4779
4780                            log::info!("Host: find/create local worktree {:?}", path);
4781                            project
4782                                .update(&mut cx, |project, cx| {
4783                                    project.find_or_create_local_worktree(path, false, cx)
4784                                })
4785                                .await
4786                                .unwrap();
4787                        }
4788                        10..=80 if !files.lock().is_empty() => {
4789                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4790                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4791                                let (worktree, path) = project
4792                                    .update(&mut cx, |project, cx| {
4793                                        project.find_or_create_local_worktree(
4794                                            file.clone(),
4795                                            false,
4796                                            cx,
4797                                        )
4798                                    })
4799                                    .await
4800                                    .unwrap();
4801                                let project_path =
4802                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4803                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
4804                                let buffer = project
4805                                    .update(&mut cx, |project, cx| {
4806                                        project.open_buffer(project_path, cx)
4807                                    })
4808                                    .await
4809                                    .unwrap();
4810                                self.buffers.insert(buffer.clone());
4811                                buffer
4812                            } else {
4813                                self.buffers
4814                                    .iter()
4815                                    .choose(&mut *rng.lock())
4816                                    .unwrap()
4817                                    .clone()
4818                            };
4819
4820                            if rng.lock().gen_bool(0.1) {
4821                                cx.update(|cx| {
4822                                    log::info!(
4823                                        "Host: dropping buffer {:?}",
4824                                        buffer.read(cx).file().unwrap().full_path(cx)
4825                                    );
4826                                    self.buffers.remove(&buffer);
4827                                    drop(buffer);
4828                                });
4829                            } else {
4830                                buffer.update(&mut cx, |buffer, cx| {
4831                                    log::info!(
4832                                        "Host: updating buffer {:?}",
4833                                        buffer.file().unwrap().full_path(cx)
4834                                    );
4835                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4836                                });
4837                            }
4838                        }
4839                        _ => loop {
4840                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4841                            let mut path = PathBuf::new();
4842                            path.push("/");
4843                            for _ in 0..path_component_count {
4844                                let letter = rng.lock().gen_range(b'a'..=b'z');
4845                                path.push(std::str::from_utf8(&[letter]).unwrap());
4846                            }
4847                            path.set_extension("rs");
4848                            let parent_path = path.parent().unwrap();
4849
4850                            log::info!("Host: creating file {:?}", path,);
4851
4852                            if fs.create_dir(&parent_path).await.is_ok()
4853                                && fs.create_file(&path, Default::default()).await.is_ok()
4854                            {
4855                                files.lock().push(path);
4856                                break;
4857                            } else {
4858                                log::info!("Host: cannot create file");
4859                            }
4860                        },
4861                    }
4862
4863                    cx.background().simulate_random_delay().await;
4864                }
4865
4866                log::info!("Host done");
4867
4868                self.project = Some(project);
4869                (self, cx)
4870            }
4871        }
4872
4873        pub async fn simulate_guest(
4874            mut self,
4875            guest_id: usize,
4876            project: ModelHandle<Project>,
4877            operations: Rc<Cell<usize>>,
4878            max_operations: usize,
4879            rng: Arc<Mutex<StdRng>>,
4880            mut cx: TestAppContext,
4881        ) -> (Self, TestAppContext) {
4882            while operations.get() < max_operations {
4883                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4884                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4885                        project
4886                            .worktrees(&cx)
4887                            .filter(|worktree| {
4888                                let worktree = worktree.read(cx);
4889                                !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
4890                            })
4891                            .choose(&mut *rng.lock())
4892                    }) {
4893                        worktree
4894                    } else {
4895                        cx.background().simulate_random_delay().await;
4896                        continue;
4897                    };
4898
4899                    operations.set(operations.get() + 1);
4900                    let (worktree_root_name, project_path) =
4901                        worktree.read_with(&cx, |worktree, _| {
4902                            let entry = worktree
4903                                .entries(false)
4904                                .filter(|e| e.is_file())
4905                                .choose(&mut *rng.lock())
4906                                .unwrap();
4907                            (
4908                                worktree.root_name().to_string(),
4909                                (worktree.id(), entry.path.clone()),
4910                            )
4911                        });
4912                    log::info!(
4913                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
4914                        guest_id,
4915                        project_path.0,
4916                        worktree_root_name,
4917                        project_path.1
4918                    );
4919                    let buffer = project
4920                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4921                        .await
4922                        .unwrap();
4923                    self.buffers.insert(buffer.clone());
4924                    buffer
4925                } else {
4926                    operations.set(operations.get() + 1);
4927
4928                    self.buffers
4929                        .iter()
4930                        .choose(&mut *rng.lock())
4931                        .unwrap()
4932                        .clone()
4933                };
4934
4935                let choice = rng.lock().gen_range(0..100);
4936                match choice {
4937                    0..=9 => {
4938                        cx.update(|cx| {
4939                            log::info!(
4940                                "Guest {}: dropping buffer {:?}",
4941                                guest_id,
4942                                buffer.read(cx).file().unwrap().full_path(cx)
4943                            );
4944                            self.buffers.remove(&buffer);
4945                            drop(buffer);
4946                        });
4947                    }
4948                    10..=19 => {
4949                        let completions = project.update(&mut cx, |project, cx| {
4950                            log::info!(
4951                                "Guest {}: requesting completions for buffer {:?}",
4952                                guest_id,
4953                                buffer.read(cx).file().unwrap().full_path(cx)
4954                            );
4955                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4956                            project.completions(&buffer, offset, cx)
4957                        });
4958                        let completions = cx.background().spawn(async move {
4959                            completions.await.expect("completions request failed");
4960                        });
4961                        if rng.lock().gen_bool(0.3) {
4962                            log::info!("Guest {}: detaching completions request", guest_id);
4963                            completions.detach();
4964                        } else {
4965                            completions.await;
4966                        }
4967                    }
4968                    20..=29 => {
4969                        let code_actions = project.update(&mut cx, |project, cx| {
4970                            log::info!(
4971                                "Guest {}: requesting code actions for buffer {:?}",
4972                                guest_id,
4973                                buffer.read(cx).file().unwrap().full_path(cx)
4974                            );
4975                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4976                            project.code_actions(&buffer, range, cx)
4977                        });
4978                        let code_actions = cx.background().spawn(async move {
4979                            code_actions.await.expect("code actions request failed");
4980                        });
4981                        if rng.lock().gen_bool(0.3) {
4982                            log::info!("Guest {}: detaching code actions request", guest_id);
4983                            code_actions.detach();
4984                        } else {
4985                            code_actions.await;
4986                        }
4987                    }
4988                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4989                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4990                            log::info!(
4991                                "Guest {}: saving buffer {:?}",
4992                                guest_id,
4993                                buffer.file().unwrap().full_path(cx)
4994                            );
4995                            (buffer.version(), buffer.save(cx))
4996                        });
4997                        let save = cx.spawn(|cx| async move {
4998                            let (saved_version, _) = save.await.expect("save request failed");
4999                            buffer.read_with(&cx, |buffer, _| {
5000                                assert!(buffer.version().observed_all(&saved_version));
5001                                assert!(saved_version.observed_all(&requested_version));
5002                            });
5003                        });
5004                        if rng.lock().gen_bool(0.3) {
5005                            log::info!("Guest {}: detaching save request", guest_id);
5006                            save.detach();
5007                        } else {
5008                            save.await;
5009                        }
5010                    }
5011                    40..=45 => {
5012                        let prepare_rename = project.update(&mut cx, |project, cx| {
5013                            log::info!(
5014                                "Guest {}: preparing rename for buffer {:?}",
5015                                guest_id,
5016                                buffer.read(cx).file().unwrap().full_path(cx)
5017                            );
5018                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5019                            project.prepare_rename(buffer, offset, cx)
5020                        });
5021                        let prepare_rename = cx.background().spawn(async move {
5022                            prepare_rename.await.expect("prepare rename request failed");
5023                        });
5024                        if rng.lock().gen_bool(0.3) {
5025                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5026                            prepare_rename.detach();
5027                        } else {
5028                            prepare_rename.await;
5029                        }
5030                    }
5031                    46..=49 => {
5032                        let definitions = project.update(&mut cx, |project, cx| {
5033                            log::info!(
5034                                "Guest {}: requesting defintions for buffer {:?}",
5035                                guest_id,
5036                                buffer.read(cx).file().unwrap().full_path(cx)
5037                            );
5038                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5039                            project.definition(&buffer, offset, cx)
5040                        });
5041                        let definitions = cx.background().spawn(async move {
5042                            definitions.await.expect("definitions request failed")
5043                        });
5044                        if rng.lock().gen_bool(0.3) {
5045                            log::info!("Guest {}: detaching definitions request", guest_id);
5046                            definitions.detach();
5047                        } else {
5048                            self.buffers
5049                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5050                        }
5051                    }
5052                    50..=55 => {
5053                        let highlights = project.update(&mut cx, |project, cx| {
5054                            log::info!(
5055                                "Guest {}: requesting highlights for buffer {:?}",
5056                                guest_id,
5057                                buffer.read(cx).file().unwrap().full_path(cx)
5058                            );
5059                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5060                            project.document_highlights(&buffer, offset, cx)
5061                        });
5062                        let highlights = cx.background().spawn(async move {
5063                            highlights.await.expect("highlights request failed");
5064                        });
5065                        if rng.lock().gen_bool(0.3) {
5066                            log::info!("Guest {}: detaching highlights request", guest_id);
5067                            highlights.detach();
5068                        } else {
5069                            highlights.await;
5070                        }
5071                    }
5072                    _ => {
5073                        buffer.update(&mut cx, |buffer, cx| {
5074                            log::info!(
5075                                "Guest {}: updating buffer {:?}",
5076                                guest_id,
5077                                buffer.file().unwrap().full_path(cx)
5078                            );
5079                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5080                        });
5081                    }
5082                }
5083                cx.background().simulate_random_delay().await;
5084            }
5085
5086            log::info!("Guest {} done", guest_id);
5087
5088            self.project = Some(project);
5089            (self, cx)
5090        }
5091    }
5092
5093    impl Executor for Arc<gpui::executor::Background> {
5094        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5095            self.spawn(future).detach();
5096        }
5097    }
5098
5099    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5100        channel
5101            .messages()
5102            .cursor::<()>()
5103            .map(|m| {
5104                (
5105                    m.sender.github_login.clone(),
5106                    m.body.clone(),
5107                    m.is_pending(),
5108                )
5109            })
5110            .collect()
5111    }
5112
5113    struct EmptyView;
5114
5115    impl gpui::Entity for EmptyView {
5116        type Event = ();
5117    }
5118
5119    impl gpui::View for EmptyView {
5120        fn ui_name() -> &'static str {
5121            "empty view"
5122        }
5123
5124        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5125            gpui::Element::boxed(gpui::elements::Empty)
5126        }
5127    }
5128}