rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44pub trait Executor {
  45    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  46}
  47
  48pub struct RealExecutor;
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_request_handler(Server::ping)
  69            .add_request_handler(Server::register_project)
  70            .add_message_handler(Server::unregister_project)
  71            .add_request_handler(Server::share_project)
  72            .add_message_handler(Server::unshare_project)
  73            .add_request_handler(Server::join_project)
  74            .add_message_handler(Server::leave_project)
  75            .add_request_handler(Server::register_worktree)
  76            .add_message_handler(Server::unregister_worktree)
  77            .add_request_handler(Server::share_worktree)
  78            .add_request_handler(Server::update_worktree)
  79            .add_message_handler(Server::update_diagnostic_summary)
  80            .add_message_handler(Server::disk_based_diagnostics_updating)
  81            .add_message_handler(Server::disk_based_diagnostics_updated)
  82            .add_request_handler(Server::get_definition)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::get_channels)
  96            .add_request_handler(Server::get_users)
  97            .add_request_handler(Server::join_channel)
  98            .add_message_handler(Server::leave_channel)
  99            .add_request_handler(Server::send_channel_message)
 100            .add_request_handler(Server::get_channel_messages);
 101
 102        Arc::new(server)
 103    }
 104
 105    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 106    where
 107        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 108        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 109        M: EnvelopedMessage,
 110    {
 111        let prev_handler = self.handlers.insert(
 112            TypeId::of::<M>(),
 113            Box::new(move |server, envelope| {
 114                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 115                (handler)(server, *envelope).boxed()
 116            }),
 117        );
 118        if prev_handler.is_some() {
 119            panic!("registered a handler for the same message twice");
 120        }
 121        self
 122    }
 123
 124    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 128        M: RequestMessage,
 129    {
 130        self.add_message_handler(move |server, envelope| {
 131            let receipt = envelope.receipt();
 132            let response = (handler)(server.clone(), envelope);
 133            async move {
 134                match response.await {
 135                    Ok(response) => {
 136                        server.peer.respond(receipt, response)?;
 137                        Ok(())
 138                    }
 139                    Err(error) => {
 140                        server.peer.respond_with_error(
 141                            receipt,
 142                            proto::Error {
 143                                message: error.to_string(),
 144                            },
 145                        )?;
 146                        Err(error)
 147                    }
 148                }
 149            }
 150        })
 151    }
 152
 153    pub fn handle_connection<E: Executor>(
 154        self: &Arc<Self>,
 155        connection: Connection,
 156        addr: String,
 157        user_id: UserId,
 158        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 159        executor: E,
 160    ) -> impl Future<Output = ()> {
 161        let mut this = self.clone();
 162        async move {
 163            let (connection_id, handle_io, mut incoming_rx) =
 164                this.peer.add_connection(connection).await;
 165
 166            if let Some(send_connection_id) = send_connection_id.as_mut() {
 167                let _ = send_connection_id.send(connection_id).await;
 168            }
 169
 170            this.state_mut().add_connection(connection_id, user_id);
 171            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 172                log::error!("error updating contacts for {:?}: {}", user_id, err);
 173            }
 174
 175            let handle_io = handle_io.fuse();
 176            futures::pin_mut!(handle_io);
 177            loop {
 178                let next_message = incoming_rx.next().fuse();
 179                futures::pin_mut!(next_message);
 180                futures::select_biased! {
 181                    result = handle_io => {
 182                        if let Err(err) = result {
 183                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 184                        }
 185                        break;
 186                    }
 187                    message = next_message => {
 188                        if let Some(message) = message {
 189                            let start_time = Instant::now();
 190                            let type_name = message.payload_type_name();
 191                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 192                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 193                                let notifications = this.notifications.clone();
 194                                let is_background = message.is_background();
 195                                let handle_message = (handler)(this.clone(), message);
 196                                let handle_message = async move {
 197                                    if let Err(err) = handle_message.await {
 198                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 199                                    } else {
 200                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 201                                    }
 202                                    if let Some(mut notifications) = notifications {
 203                                        let _ = notifications.send(()).await;
 204                                    }
 205                                };
 206                                if is_background {
 207                                    executor.spawn_detached(handle_message);
 208                                } else {
 209                                    handle_message.await;
 210                                }
 211                            } else {
 212                                log::warn!("unhandled message: {}", type_name);
 213                            }
 214                        } else {
 215                            log::info!("rpc connection closed {:?}", addr);
 216                            break;
 217                        }
 218                    }
 219                }
 220            }
 221
 222            if let Err(err) = this.sign_out(connection_id).await {
 223                log::error!("error signing out connection {:?} - {:?}", addr, err);
 224            }
 225        }
 226    }
 227
 228    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 229        self.peer.disconnect(connection_id);
 230        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 231
 232        for (project_id, project) in removed_connection.hosted_projects {
 233            if let Some(share) = project.share {
 234                broadcast(
 235                    connection_id,
 236                    share.guests.keys().copied().collect(),
 237                    |conn_id| {
 238                        self.peer
 239                            .send(conn_id, proto::UnshareProject { project_id })
 240                    },
 241                )?;
 242            }
 243        }
 244
 245        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 246            broadcast(connection_id, peer_ids, |conn_id| {
 247                self.peer.send(
 248                    conn_id,
 249                    proto::RemoveProjectCollaborator {
 250                        project_id,
 251                        peer_id: connection_id.0,
 252                    },
 253                )
 254            })?;
 255        }
 256
 257        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 258        Ok(())
 259    }
 260
 261    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 262        Ok(proto::Ack {})
 263    }
 264
 265    async fn register_project(
 266        mut self: Arc<Server>,
 267        request: TypedEnvelope<proto::RegisterProject>,
 268    ) -> tide::Result<proto::RegisterProjectResponse> {
 269        let project_id = {
 270            let mut state = self.state_mut();
 271            let user_id = state.user_id_for_connection(request.sender_id)?;
 272            state.register_project(request.sender_id, user_id)
 273        };
 274        Ok(proto::RegisterProjectResponse { project_id })
 275    }
 276
 277    async fn unregister_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::UnregisterProject>,
 280    ) -> tide::Result<()> {
 281        let project = self
 282            .state_mut()
 283            .unregister_project(request.payload.project_id, request.sender_id)?;
 284        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 285        Ok(())
 286    }
 287
 288    async fn share_project(
 289        mut self: Arc<Server>,
 290        request: TypedEnvelope<proto::ShareProject>,
 291    ) -> tide::Result<proto::Ack> {
 292        self.state_mut()
 293            .share_project(request.payload.project_id, request.sender_id);
 294        Ok(proto::Ack {})
 295    }
 296
 297    async fn unshare_project(
 298        mut self: Arc<Server>,
 299        request: TypedEnvelope<proto::UnshareProject>,
 300    ) -> tide::Result<()> {
 301        let project_id = request.payload.project_id;
 302        let project = self
 303            .state_mut()
 304            .unshare_project(project_id, request.sender_id)?;
 305
 306        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 307            self.peer
 308                .send(conn_id, proto::UnshareProject { project_id })
 309        })?;
 310        self.update_contacts_for_users(&project.authorized_user_ids)?;
 311        Ok(())
 312    }
 313
 314    async fn join_project(
 315        mut self: Arc<Server>,
 316        request: TypedEnvelope<proto::JoinProject>,
 317    ) -> tide::Result<proto::JoinProjectResponse> {
 318        let project_id = request.payload.project_id;
 319
 320        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 321        let (response, connection_ids, contact_user_ids) = self
 322            .state_mut()
 323            .join_project(request.sender_id, user_id, project_id)
 324            .and_then(|joined| {
 325                let share = joined.project.share()?;
 326                let peer_count = share.guests.len();
 327                let mut collaborators = Vec::with_capacity(peer_count);
 328                collaborators.push(proto::Collaborator {
 329                    peer_id: joined.project.host_connection_id.0,
 330                    replica_id: 0,
 331                    user_id: joined.project.host_user_id.to_proto(),
 332                });
 333                let worktrees = joined
 334                    .project
 335                    .worktrees
 336                    .iter()
 337                    .filter_map(|(id, worktree)| {
 338                        worktree.share.as_ref().map(|share| proto::Worktree {
 339                            id: *id,
 340                            root_name: worktree.root_name.clone(),
 341                            entries: share.entries.values().cloned().collect(),
 342                            diagnostic_summaries: share
 343                                .diagnostic_summaries
 344                                .values()
 345                                .cloned()
 346                                .collect(),
 347                            weak: worktree.weak,
 348                            next_update_id: share.next_update_id as u64,
 349                        })
 350                    })
 351                    .collect();
 352                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 353                    if *peer_conn_id != request.sender_id {
 354                        collaborators.push(proto::Collaborator {
 355                            peer_id: peer_conn_id.0,
 356                            replica_id: *peer_replica_id as u32,
 357                            user_id: peer_user_id.to_proto(),
 358                        });
 359                    }
 360                }
 361                let response = proto::JoinProjectResponse {
 362                    worktrees,
 363                    replica_id: joined.replica_id as u32,
 364                    collaborators,
 365                };
 366                let connection_ids = joined.project.connection_ids();
 367                let contact_user_ids = joined.project.authorized_user_ids();
 368                Ok((response, connection_ids, contact_user_ids))
 369            })?;
 370
 371        broadcast(request.sender_id, connection_ids, |conn_id| {
 372            self.peer.send(
 373                conn_id,
 374                proto::AddProjectCollaborator {
 375                    project_id,
 376                    collaborator: Some(proto::Collaborator {
 377                        peer_id: request.sender_id.0,
 378                        replica_id: response.replica_id,
 379                        user_id: user_id.to_proto(),
 380                    }),
 381                },
 382            )
 383        })?;
 384        self.update_contacts_for_users(&contact_user_ids)?;
 385        Ok(response)
 386    }
 387
 388    async fn leave_project(
 389        mut self: Arc<Server>,
 390        request: TypedEnvelope<proto::LeaveProject>,
 391    ) -> tide::Result<()> {
 392        let sender_id = request.sender_id;
 393        let project_id = request.payload.project_id;
 394        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 395
 396        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::RemoveProjectCollaborator {
 400                    project_id,
 401                    peer_id: sender_id.0,
 402                },
 403            )
 404        })?;
 405        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 406
 407        Ok(())
 408    }
 409
 410    async fn register_worktree(
 411        mut self: Arc<Server>,
 412        request: TypedEnvelope<proto::RegisterWorktree>,
 413    ) -> tide::Result<proto::Ack> {
 414        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 415
 416        let mut contact_user_ids = HashSet::default();
 417        contact_user_ids.insert(host_user_id);
 418        for github_login in request.payload.authorized_logins {
 419            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 420            contact_user_ids.insert(contact_user_id);
 421        }
 422
 423        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 424        self.state_mut().register_worktree(
 425            request.payload.project_id,
 426            request.payload.worktree_id,
 427            request.sender_id,
 428            Worktree {
 429                authorized_user_ids: contact_user_ids.clone(),
 430                root_name: request.payload.root_name,
 431                share: None,
 432                weak: false,
 433            },
 434        )?;
 435        self.update_contacts_for_users(&contact_user_ids)?;
 436        Ok(proto::Ack {})
 437    }
 438
 439    async fn unregister_worktree(
 440        mut self: Arc<Server>,
 441        request: TypedEnvelope<proto::UnregisterWorktree>,
 442    ) -> tide::Result<()> {
 443        let project_id = request.payload.project_id;
 444        let worktree_id = request.payload.worktree_id;
 445        let (worktree, guest_connection_ids) =
 446            self.state_mut()
 447                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 448        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 449            self.peer.send(
 450                conn_id,
 451                proto::UnregisterWorktree {
 452                    project_id,
 453                    worktree_id,
 454                },
 455            )
 456        })?;
 457        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 458        Ok(())
 459    }
 460
 461    async fn share_worktree(
 462        mut self: Arc<Server>,
 463        mut request: TypedEnvelope<proto::ShareWorktree>,
 464    ) -> tide::Result<proto::Ack> {
 465        let worktree = request
 466            .payload
 467            .worktree
 468            .as_mut()
 469            .ok_or_else(|| anyhow!("missing worktree"))?;
 470        let entries = worktree
 471            .entries
 472            .iter()
 473            .map(|entry| (entry.id, entry.clone()))
 474            .collect();
 475        let diagnostic_summaries = worktree
 476            .diagnostic_summaries
 477            .iter()
 478            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 479            .collect();
 480
 481        let shared_worktree = self.state_mut().share_worktree(
 482            request.payload.project_id,
 483            worktree.id,
 484            request.sender_id,
 485            entries,
 486            diagnostic_summaries,
 487            worktree.next_update_id,
 488        )?;
 489
 490        broadcast(
 491            request.sender_id,
 492            shared_worktree.connection_ids,
 493            |connection_id| {
 494                self.peer
 495                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 496            },
 497        )?;
 498        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 499
 500        Ok(proto::Ack {})
 501    }
 502
 503    async fn update_worktree(
 504        mut self: Arc<Server>,
 505        request: TypedEnvelope<proto::UpdateWorktree>,
 506    ) -> tide::Result<proto::Ack> {
 507        let connection_ids = self.state_mut().update_worktree(
 508            request.sender_id,
 509            request.payload.project_id,
 510            request.payload.worktree_id,
 511            request.payload.id,
 512            &request.payload.removed_entries,
 513            &request.payload.updated_entries,
 514        )?;
 515
 516        broadcast(request.sender_id, connection_ids, |connection_id| {
 517            self.peer
 518                .forward_send(request.sender_id, connection_id, request.payload.clone())
 519        })?;
 520
 521        Ok(proto::Ack {})
 522    }
 523
 524    async fn update_diagnostic_summary(
 525        mut self: Arc<Server>,
 526        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 527    ) -> tide::Result<()> {
 528        let summary = request
 529            .payload
 530            .summary
 531            .clone()
 532            .ok_or_else(|| anyhow!("invalid summary"))?;
 533        let receiver_ids = self.state_mut().update_diagnostic_summary(
 534            request.payload.project_id,
 535            request.payload.worktree_id,
 536            request.sender_id,
 537            summary,
 538        )?;
 539
 540        broadcast(request.sender_id, receiver_ids, |connection_id| {
 541            self.peer
 542                .forward_send(request.sender_id, connection_id, request.payload.clone())
 543        })?;
 544        Ok(())
 545    }
 546
 547    async fn disk_based_diagnostics_updating(
 548        self: Arc<Server>,
 549        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 550    ) -> tide::Result<()> {
 551        let receiver_ids = self
 552            .state()
 553            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 554        broadcast(request.sender_id, receiver_ids, |connection_id| {
 555            self.peer
 556                .forward_send(request.sender_id, connection_id, request.payload.clone())
 557        })?;
 558        Ok(())
 559    }
 560
 561    async fn disk_based_diagnostics_updated(
 562        self: Arc<Server>,
 563        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 564    ) -> tide::Result<()> {
 565        let receiver_ids = self
 566            .state()
 567            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 568        broadcast(request.sender_id, receiver_ids, |connection_id| {
 569            self.peer
 570                .forward_send(request.sender_id, connection_id, request.payload.clone())
 571        })?;
 572        Ok(())
 573    }
 574
 575    async fn get_definition(
 576        self: Arc<Server>,
 577        request: TypedEnvelope<proto::GetDefinition>,
 578    ) -> tide::Result<proto::GetDefinitionResponse> {
 579        let host_connection_id = self
 580            .state()
 581            .read_project(request.payload.project_id, request.sender_id)?
 582            .host_connection_id;
 583        Ok(self
 584            .peer
 585            .forward_request(request.sender_id, host_connection_id, request.payload)
 586            .await?)
 587    }
 588
 589    async fn open_buffer(
 590        self: Arc<Server>,
 591        request: TypedEnvelope<proto::OpenBuffer>,
 592    ) -> tide::Result<proto::OpenBufferResponse> {
 593        let host_connection_id = self
 594            .state()
 595            .read_project(request.payload.project_id, request.sender_id)?
 596            .host_connection_id;
 597        Ok(self
 598            .peer
 599            .forward_request(request.sender_id, host_connection_id, request.payload)
 600            .await?)
 601    }
 602
 603    async fn close_buffer(
 604        self: Arc<Server>,
 605        request: TypedEnvelope<proto::CloseBuffer>,
 606    ) -> tide::Result<()> {
 607        let host_connection_id = self
 608            .state()
 609            .read_project(request.payload.project_id, request.sender_id)?
 610            .host_connection_id;
 611        self.peer
 612            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 613        Ok(())
 614    }
 615
 616    async fn save_buffer(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::SaveBuffer>,
 619    ) -> tide::Result<proto::BufferSaved> {
 620        let host;
 621        let mut guests;
 622        {
 623            let state = self.state();
 624            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 625            host = project.host_connection_id;
 626            guests = project.guest_connection_ids()
 627        }
 628
 629        let response = self
 630            .peer
 631            .forward_request(request.sender_id, host, request.payload.clone())
 632            .await?;
 633
 634        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 635        broadcast(host, guests, |conn_id| {
 636            self.peer.forward_send(host, conn_id, response.clone())
 637        })?;
 638
 639        Ok(response)
 640    }
 641
 642    async fn format_buffers(
 643        self: Arc<Server>,
 644        request: TypedEnvelope<proto::FormatBuffers>,
 645    ) -> tide::Result<proto::FormatBuffersResponse> {
 646        let host = self
 647            .state()
 648            .read_project(request.payload.project_id, request.sender_id)?
 649            .host_connection_id;
 650        Ok(self
 651            .peer
 652            .forward_request(request.sender_id, host, request.payload.clone())
 653            .await?)
 654    }
 655
 656    async fn get_completions(
 657        self: Arc<Server>,
 658        request: TypedEnvelope<proto::GetCompletions>,
 659    ) -> tide::Result<proto::GetCompletionsResponse> {
 660        let host = self
 661            .state()
 662            .read_project(request.payload.project_id, request.sender_id)?
 663            .host_connection_id;
 664        Ok(self
 665            .peer
 666            .forward_request(request.sender_id, host, request.payload.clone())
 667            .await?)
 668    }
 669
 670    async fn apply_additional_edits_for_completion(
 671        self: Arc<Server>,
 672        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 673    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 674        let host = self
 675            .state()
 676            .read_project(request.payload.project_id, request.sender_id)?
 677            .host_connection_id;
 678        Ok(self
 679            .peer
 680            .forward_request(request.sender_id, host, request.payload.clone())
 681            .await?)
 682    }
 683
 684    async fn get_code_actions(
 685        self: Arc<Server>,
 686        request: TypedEnvelope<proto::GetCodeActions>,
 687    ) -> tide::Result<proto::GetCodeActionsResponse> {
 688        let host = self
 689            .state()
 690            .read_project(request.payload.project_id, request.sender_id)?
 691            .host_connection_id;
 692        Ok(self
 693            .peer
 694            .forward_request(request.sender_id, host, request.payload.clone())
 695            .await?)
 696    }
 697
 698    async fn apply_code_action(
 699        self: Arc<Server>,
 700        request: TypedEnvelope<proto::ApplyCodeAction>,
 701    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 702        let host = self
 703            .state()
 704            .read_project(request.payload.project_id, request.sender_id)?
 705            .host_connection_id;
 706        Ok(self
 707            .peer
 708            .forward_request(request.sender_id, host, request.payload.clone())
 709            .await?)
 710    }
 711
 712    async fn update_buffer(
 713        self: Arc<Server>,
 714        request: TypedEnvelope<proto::UpdateBuffer>,
 715    ) -> tide::Result<proto::Ack> {
 716        let receiver_ids = self
 717            .state()
 718            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 719        broadcast(request.sender_id, receiver_ids, |connection_id| {
 720            self.peer
 721                .forward_send(request.sender_id, connection_id, request.payload.clone())
 722        })?;
 723        Ok(proto::Ack {})
 724    }
 725
 726    async fn update_buffer_file(
 727        self: Arc<Server>,
 728        request: TypedEnvelope<proto::UpdateBufferFile>,
 729    ) -> tide::Result<()> {
 730        let receiver_ids = self
 731            .state()
 732            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 733        broadcast(request.sender_id, receiver_ids, |connection_id| {
 734            self.peer
 735                .forward_send(request.sender_id, connection_id, request.payload.clone())
 736        })?;
 737        Ok(())
 738    }
 739
 740    async fn buffer_reloaded(
 741        self: Arc<Server>,
 742        request: TypedEnvelope<proto::BufferReloaded>,
 743    ) -> tide::Result<()> {
 744        let receiver_ids = self
 745            .state()
 746            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 747        broadcast(request.sender_id, receiver_ids, |connection_id| {
 748            self.peer
 749                .forward_send(request.sender_id, connection_id, request.payload.clone())
 750        })?;
 751        Ok(())
 752    }
 753
 754    async fn buffer_saved(
 755        self: Arc<Server>,
 756        request: TypedEnvelope<proto::BufferSaved>,
 757    ) -> tide::Result<()> {
 758        let receiver_ids = self
 759            .state()
 760            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 761        broadcast(request.sender_id, receiver_ids, |connection_id| {
 762            self.peer
 763                .forward_send(request.sender_id, connection_id, request.payload.clone())
 764        })?;
 765        Ok(())
 766    }
 767
 768    async fn get_channels(
 769        self: Arc<Server>,
 770        request: TypedEnvelope<proto::GetChannels>,
 771    ) -> tide::Result<proto::GetChannelsResponse> {
 772        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 773        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 774        Ok(proto::GetChannelsResponse {
 775            channels: channels
 776                .into_iter()
 777                .map(|chan| proto::Channel {
 778                    id: chan.id.to_proto(),
 779                    name: chan.name,
 780                })
 781                .collect(),
 782        })
 783    }
 784
 785    async fn get_users(
 786        self: Arc<Server>,
 787        request: TypedEnvelope<proto::GetUsers>,
 788    ) -> tide::Result<proto::GetUsersResponse> {
 789        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 790        let users = self
 791            .app_state
 792            .db
 793            .get_users_by_ids(user_ids)
 794            .await?
 795            .into_iter()
 796            .map(|user| proto::User {
 797                id: user.id.to_proto(),
 798                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 799                github_login: user.github_login,
 800            })
 801            .collect();
 802        Ok(proto::GetUsersResponse { users })
 803    }
 804
 805    fn update_contacts_for_users<'a>(
 806        self: &Arc<Server>,
 807        user_ids: impl IntoIterator<Item = &'a UserId>,
 808    ) -> anyhow::Result<()> {
 809        let mut result = Ok(());
 810        let state = self.state();
 811        for user_id in user_ids {
 812            let contacts = state.contacts_for_user(*user_id);
 813            for connection_id in state.connection_ids_for_user(*user_id) {
 814                if let Err(error) = self.peer.send(
 815                    connection_id,
 816                    proto::UpdateContacts {
 817                        contacts: contacts.clone(),
 818                    },
 819                ) {
 820                    result = Err(error);
 821                }
 822            }
 823        }
 824        result
 825    }
 826
 827    async fn join_channel(
 828        mut self: Arc<Self>,
 829        request: TypedEnvelope<proto::JoinChannel>,
 830    ) -> tide::Result<proto::JoinChannelResponse> {
 831        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 832        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 833        if !self
 834            .app_state
 835            .db
 836            .can_user_access_channel(user_id, channel_id)
 837            .await?
 838        {
 839            Err(anyhow!("access denied"))?;
 840        }
 841
 842        self.state_mut().join_channel(request.sender_id, channel_id);
 843        let messages = self
 844            .app_state
 845            .db
 846            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 847            .await?
 848            .into_iter()
 849            .map(|msg| proto::ChannelMessage {
 850                id: msg.id.to_proto(),
 851                body: msg.body,
 852                timestamp: msg.sent_at.unix_timestamp() as u64,
 853                sender_id: msg.sender_id.to_proto(),
 854                nonce: Some(msg.nonce.as_u128().into()),
 855            })
 856            .collect::<Vec<_>>();
 857        Ok(proto::JoinChannelResponse {
 858            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 859            messages,
 860        })
 861    }
 862
 863    async fn leave_channel(
 864        mut self: Arc<Self>,
 865        request: TypedEnvelope<proto::LeaveChannel>,
 866    ) -> tide::Result<()> {
 867        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 868        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 869        if !self
 870            .app_state
 871            .db
 872            .can_user_access_channel(user_id, channel_id)
 873            .await?
 874        {
 875            Err(anyhow!("access denied"))?;
 876        }
 877
 878        self.state_mut()
 879            .leave_channel(request.sender_id, channel_id);
 880
 881        Ok(())
 882    }
 883
 884    async fn send_channel_message(
 885        self: Arc<Self>,
 886        request: TypedEnvelope<proto::SendChannelMessage>,
 887    ) -> tide::Result<proto::SendChannelMessageResponse> {
 888        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 889        let user_id;
 890        let connection_ids;
 891        {
 892            let state = self.state();
 893            user_id = state.user_id_for_connection(request.sender_id)?;
 894            connection_ids = state.channel_connection_ids(channel_id)?;
 895        }
 896
 897        // Validate the message body.
 898        let body = request.payload.body.trim().to_string();
 899        if body.len() > MAX_MESSAGE_LEN {
 900            return Err(anyhow!("message is too long"))?;
 901        }
 902        if body.is_empty() {
 903            return Err(anyhow!("message can't be blank"))?;
 904        }
 905
 906        let timestamp = OffsetDateTime::now_utc();
 907        let nonce = request
 908            .payload
 909            .nonce
 910            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 911
 912        let message_id = self
 913            .app_state
 914            .db
 915            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 916            .await?
 917            .to_proto();
 918        let message = proto::ChannelMessage {
 919            sender_id: user_id.to_proto(),
 920            id: message_id,
 921            body,
 922            timestamp: timestamp.unix_timestamp() as u64,
 923            nonce: Some(nonce),
 924        };
 925        broadcast(request.sender_id, connection_ids, |conn_id| {
 926            self.peer.send(
 927                conn_id,
 928                proto::ChannelMessageSent {
 929                    channel_id: channel_id.to_proto(),
 930                    message: Some(message.clone()),
 931                },
 932            )
 933        })?;
 934        Ok(proto::SendChannelMessageResponse {
 935            message: Some(message),
 936        })
 937    }
 938
 939    async fn get_channel_messages(
 940        self: Arc<Self>,
 941        request: TypedEnvelope<proto::GetChannelMessages>,
 942    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 943        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 944        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 945        if !self
 946            .app_state
 947            .db
 948            .can_user_access_channel(user_id, channel_id)
 949            .await?
 950        {
 951            Err(anyhow!("access denied"))?;
 952        }
 953
 954        let messages = self
 955            .app_state
 956            .db
 957            .get_channel_messages(
 958                channel_id,
 959                MESSAGE_COUNT_PER_PAGE,
 960                Some(MessageId::from_proto(request.payload.before_message_id)),
 961            )
 962            .await?
 963            .into_iter()
 964            .map(|msg| proto::ChannelMessage {
 965                id: msg.id.to_proto(),
 966                body: msg.body,
 967                timestamp: msg.sent_at.unix_timestamp() as u64,
 968                sender_id: msg.sender_id.to_proto(),
 969                nonce: Some(msg.nonce.as_u128().into()),
 970            })
 971            .collect::<Vec<_>>();
 972
 973        Ok(proto::GetChannelMessagesResponse {
 974            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 975            messages,
 976        })
 977    }
 978
 979    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 980        self.store.read()
 981    }
 982
 983    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 984        self.store.write()
 985    }
 986}
 987
 988impl Executor for RealExecutor {
 989    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 990        task::spawn(future);
 991    }
 992}
 993
 994fn broadcast<F>(
 995    sender_id: ConnectionId,
 996    receiver_ids: Vec<ConnectionId>,
 997    mut f: F,
 998) -> anyhow::Result<()>
 999where
1000    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1001{
1002    let mut result = Ok(());
1003    for receiver_id in receiver_ids {
1004        if receiver_id != sender_id {
1005            if let Err(error) = f(receiver_id) {
1006                if result.is_ok() {
1007                    result = Err(error);
1008                }
1009            }
1010        }
1011    }
1012    result
1013}
1014
1015pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1016    let server = Server::new(app.state().clone(), rpc.clone(), None);
1017    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1018        let server = server.clone();
1019        async move {
1020            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1021
1022            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1023            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1024            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1025            let client_protocol_version: Option<u32> = request
1026                .header("X-Zed-Protocol-Version")
1027                .and_then(|v| v.as_str().parse().ok());
1028
1029            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1030                return Ok(Response::new(StatusCode::UpgradeRequired));
1031            }
1032
1033            let header = match request.header("Sec-Websocket-Key") {
1034                Some(h) => h.as_str(),
1035                None => return Err(anyhow!("expected sec-websocket-key"))?,
1036            };
1037
1038            let user_id = process_auth_header(&request).await?;
1039
1040            let mut response = Response::new(StatusCode::SwitchingProtocols);
1041            response.insert_header(UPGRADE, "websocket");
1042            response.insert_header(CONNECTION, "Upgrade");
1043            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1044            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1045            response.insert_header("Sec-Websocket-Version", "13");
1046
1047            let http_res: &mut tide::http::Response = response.as_mut();
1048            let upgrade_receiver = http_res.recv_upgrade().await;
1049            let addr = request.remote().unwrap_or("unknown").to_string();
1050            task::spawn(async move {
1051                if let Some(stream) = upgrade_receiver.await {
1052                    server
1053                        .handle_connection(
1054                            Connection::new(
1055                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1056                            ),
1057                            addr,
1058                            user_id,
1059                            None,
1060                            RealExecutor,
1061                        )
1062                        .await;
1063                }
1064            });
1065
1066            Ok(response)
1067        }
1068    });
1069}
1070
1071fn header_contains_ignore_case<T>(
1072    request: &tide::Request<T>,
1073    header_name: HeaderName,
1074    value: &str,
1075) -> bool {
1076    request
1077        .header(header_name)
1078        .map(|h| {
1079            h.as_str()
1080                .split(',')
1081                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1082        })
1083        .unwrap_or(false)
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088    use super::*;
1089    use crate::{
1090        auth,
1091        db::{tests::TestDb, UserId},
1092        github, AppState, Config,
1093    };
1094    use ::rpc::Peer;
1095    use collections::BTreeMap;
1096    use gpui::{executor, ModelHandle, TestAppContext};
1097    use parking_lot::Mutex;
1098    use postage::{mpsc, watch};
1099    use rand::prelude::*;
1100    use rpc::PeerId;
1101    use serde_json::json;
1102    use sqlx::types::time::OffsetDateTime;
1103    use std::{
1104        cell::{Cell, RefCell},
1105        env,
1106        ops::Deref,
1107        path::Path,
1108        rc::Rc,
1109        sync::{
1110            atomic::{AtomicBool, Ordering::SeqCst},
1111            Arc,
1112        },
1113        time::Duration,
1114    };
1115    use zed::{
1116        client::{
1117            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1118            EstablishConnectionError, UserStore,
1119        },
1120        editor::{
1121            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1122            Redo, ToggleCodeActions, Undo,
1123        },
1124        fs::{FakeFs, Fs as _},
1125        language::{
1126            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1127            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1128        },
1129        lsp,
1130        project::{DiagnosticSummary, Project, ProjectPath},
1131        workspace::{Workspace, WorkspaceParams},
1132    };
1133
1134    #[cfg(test)]
1135    #[ctor::ctor]
1136    fn init_logger() {
1137        if std::env::var("RUST_LOG").is_ok() {
1138            env_logger::init();
1139        }
1140    }
1141
1142    #[gpui::test(iterations = 10)]
1143    async fn test_share_project(
1144        mut cx_a: TestAppContext,
1145        mut cx_b: TestAppContext,
1146        last_iteration: bool,
1147    ) {
1148        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1149        let lang_registry = Arc::new(LanguageRegistry::new());
1150        let fs = Arc::new(FakeFs::new(cx_a.background()));
1151        cx_a.foreground().forbid_parking();
1152
1153        // Connect to a server as 2 clients.
1154        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1155        let client_a = server.create_client(&mut cx_a, "user_a").await;
1156        let client_b = server.create_client(&mut cx_b, "user_b").await;
1157
1158        // Share a project as client A
1159        fs.insert_tree(
1160            "/a",
1161            json!({
1162                ".zed.toml": r#"collaborators = ["user_b"]"#,
1163                "a.txt": "a-contents",
1164                "b.txt": "b-contents",
1165            }),
1166        )
1167        .await;
1168        let project_a = cx_a.update(|cx| {
1169            Project::local(
1170                client_a.clone(),
1171                client_a.user_store.clone(),
1172                lang_registry.clone(),
1173                fs.clone(),
1174                cx,
1175            )
1176        });
1177        let (worktree_a, _) = project_a
1178            .update(&mut cx_a, |p, cx| {
1179                p.find_or_create_local_worktree("/a", false, cx)
1180            })
1181            .await
1182            .unwrap();
1183        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1184        worktree_a
1185            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1186            .await;
1187        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1188        project_a
1189            .update(&mut cx_a, |p, cx| p.share(cx))
1190            .await
1191            .unwrap();
1192
1193        // Join that project as client B
1194        let project_b = Project::remote(
1195            project_id,
1196            client_b.clone(),
1197            client_b.user_store.clone(),
1198            lang_registry.clone(),
1199            fs.clone(),
1200            &mut cx_b.to_async(),
1201        )
1202        .await
1203        .unwrap();
1204
1205        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1206            assert_eq!(
1207                project
1208                    .collaborators()
1209                    .get(&client_a.peer_id)
1210                    .unwrap()
1211                    .user
1212                    .github_login,
1213                "user_a"
1214            );
1215            project.replica_id()
1216        });
1217        project_a
1218            .condition(&cx_a, |tree, _| {
1219                tree.collaborators()
1220                    .get(&client_b.peer_id)
1221                    .map_or(false, |collaborator| {
1222                        collaborator.replica_id == replica_id_b
1223                            && collaborator.user.github_login == "user_b"
1224                    })
1225            })
1226            .await;
1227
1228        // Open the same file as client B and client A.
1229        let buffer_b = project_b
1230            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1231            .await
1232            .unwrap();
1233        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1234        buffer_b.read_with(&cx_b, |buf, cx| {
1235            assert_eq!(buf.read(cx).text(), "b-contents")
1236        });
1237        project_a.read_with(&cx_a, |project, cx| {
1238            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1239        });
1240        let buffer_a = project_a
1241            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1242            .await
1243            .unwrap();
1244
1245        let editor_b = cx_b.add_view(window_b, |cx| {
1246            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1247        });
1248
1249        // TODO
1250        // // Create a selection set as client B and see that selection set as client A.
1251        // buffer_a
1252        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1253        //     .await;
1254
1255        // Edit the buffer as client B and see that edit as client A.
1256        editor_b.update(&mut cx_b, |editor, cx| {
1257            editor.handle_input(&Input("ok, ".into()), cx)
1258        });
1259        buffer_a
1260            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1261            .await;
1262
1263        // TODO
1264        // // Remove the selection set as client B, see those selections disappear as client A.
1265        cx_b.update(move |_| drop(editor_b));
1266        // buffer_a
1267        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1268        //     .await;
1269
1270        // Close the buffer as client A, see that the buffer is closed.
1271        cx_a.update(move |_| drop(buffer_a));
1272        project_a
1273            .condition(&cx_a, |project, cx| {
1274                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1275            })
1276            .await;
1277
1278        // Dropping the client B's project removes client B from client A's collaborators.
1279        cx_b.update(move |_| drop(project_b));
1280        project_a
1281            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1282            .await;
1283    }
1284
1285    #[gpui::test(iterations = 10)]
1286    async fn test_unshare_project(
1287        mut cx_a: TestAppContext,
1288        mut cx_b: TestAppContext,
1289        last_iteration: bool,
1290    ) {
1291        let lang_registry = Arc::new(LanguageRegistry::new());
1292        let fs = Arc::new(FakeFs::new(cx_a.background()));
1293        cx_a.foreground().forbid_parking();
1294
1295        // Connect to a server as 2 clients.
1296        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1297        let client_a = server.create_client(&mut cx_a, "user_a").await;
1298        let client_b = server.create_client(&mut cx_b, "user_b").await;
1299
1300        // Share a project as client A
1301        fs.insert_tree(
1302            "/a",
1303            json!({
1304                ".zed.toml": r#"collaborators = ["user_b"]"#,
1305                "a.txt": "a-contents",
1306                "b.txt": "b-contents",
1307            }),
1308        )
1309        .await;
1310        let project_a = cx_a.update(|cx| {
1311            Project::local(
1312                client_a.clone(),
1313                client_a.user_store.clone(),
1314                lang_registry.clone(),
1315                fs.clone(),
1316                cx,
1317            )
1318        });
1319        let (worktree_a, _) = project_a
1320            .update(&mut cx_a, |p, cx| {
1321                p.find_or_create_local_worktree("/a", false, cx)
1322            })
1323            .await
1324            .unwrap();
1325        worktree_a
1326            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1327            .await;
1328        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1329        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1330        project_a
1331            .update(&mut cx_a, |p, cx| p.share(cx))
1332            .await
1333            .unwrap();
1334        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1335
1336        // Join that project as client B
1337        let project_b = Project::remote(
1338            project_id,
1339            client_b.clone(),
1340            client_b.user_store.clone(),
1341            lang_registry.clone(),
1342            fs.clone(),
1343            &mut cx_b.to_async(),
1344        )
1345        .await
1346        .unwrap();
1347        project_b
1348            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1349            .await
1350            .unwrap();
1351
1352        // Unshare the project as client A
1353        project_a
1354            .update(&mut cx_a, |project, cx| project.unshare(cx))
1355            .await
1356            .unwrap();
1357        project_b
1358            .condition(&mut cx_b, |project, _| project.is_read_only())
1359            .await;
1360        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1361        drop(project_b);
1362
1363        // Share the project again and ensure guests can still join.
1364        project_a
1365            .update(&mut cx_a, |project, cx| project.share(cx))
1366            .await
1367            .unwrap();
1368        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1369
1370        let project_c = Project::remote(
1371            project_id,
1372            client_b.clone(),
1373            client_b.user_store.clone(),
1374            lang_registry.clone(),
1375            fs.clone(),
1376            &mut cx_b.to_async(),
1377        )
1378        .await
1379        .unwrap();
1380        project_c
1381            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1382            .await
1383            .unwrap();
1384    }
1385
1386    #[gpui::test(iterations = 10)]
1387    async fn test_propagate_saves_and_fs_changes(
1388        mut cx_a: TestAppContext,
1389        mut cx_b: TestAppContext,
1390        mut cx_c: TestAppContext,
1391        last_iteration: bool,
1392    ) {
1393        let lang_registry = Arc::new(LanguageRegistry::new());
1394        let fs = Arc::new(FakeFs::new(cx_a.background()));
1395        cx_a.foreground().forbid_parking();
1396
1397        // Connect to a server as 3 clients.
1398        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1399        let client_a = server.create_client(&mut cx_a, "user_a").await;
1400        let client_b = server.create_client(&mut cx_b, "user_b").await;
1401        let client_c = server.create_client(&mut cx_c, "user_c").await;
1402
1403        // Share a worktree as client A.
1404        fs.insert_tree(
1405            "/a",
1406            json!({
1407                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1408                "file1": "",
1409                "file2": ""
1410            }),
1411        )
1412        .await;
1413        let project_a = cx_a.update(|cx| {
1414            Project::local(
1415                client_a.clone(),
1416                client_a.user_store.clone(),
1417                lang_registry.clone(),
1418                fs.clone(),
1419                cx,
1420            )
1421        });
1422        let (worktree_a, _) = project_a
1423            .update(&mut cx_a, |p, cx| {
1424                p.find_or_create_local_worktree("/a", false, cx)
1425            })
1426            .await
1427            .unwrap();
1428        worktree_a
1429            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1430            .await;
1431        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1432        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1433        project_a
1434            .update(&mut cx_a, |p, cx| p.share(cx))
1435            .await
1436            .unwrap();
1437
1438        // Join that worktree as clients B and C.
1439        let project_b = Project::remote(
1440            project_id,
1441            client_b.clone(),
1442            client_b.user_store.clone(),
1443            lang_registry.clone(),
1444            fs.clone(),
1445            &mut cx_b.to_async(),
1446        )
1447        .await
1448        .unwrap();
1449        let project_c = Project::remote(
1450            project_id,
1451            client_c.clone(),
1452            client_c.user_store.clone(),
1453            lang_registry.clone(),
1454            fs.clone(),
1455            &mut cx_c.to_async(),
1456        )
1457        .await
1458        .unwrap();
1459        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1460        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1461
1462        // Open and edit a buffer as both guests B and C.
1463        let buffer_b = project_b
1464            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1465            .await
1466            .unwrap();
1467        let buffer_c = project_c
1468            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1469            .await
1470            .unwrap();
1471        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1472        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1473
1474        // Open and edit that buffer as the host.
1475        let buffer_a = project_a
1476            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1477            .await
1478            .unwrap();
1479
1480        buffer_a
1481            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1482            .await;
1483        buffer_a.update(&mut cx_a, |buf, cx| {
1484            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1485        });
1486
1487        // Wait for edits to propagate
1488        buffer_a
1489            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1490            .await;
1491        buffer_b
1492            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1493            .await;
1494        buffer_c
1495            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1496            .await;
1497
1498        // Edit the buffer as the host and concurrently save as guest B.
1499        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1500        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1501        save_b.await.unwrap();
1502        assert_eq!(
1503            fs.load("/a/file1".as_ref()).await.unwrap(),
1504            "hi-a, i-am-c, i-am-b, i-am-a"
1505        );
1506        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1507        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1508        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1509
1510        // Make changes on host's file system, see those changes on guest worktrees.
1511        fs.rename(
1512            "/a/file1".as_ref(),
1513            "/a/file1-renamed".as_ref(),
1514            Default::default(),
1515        )
1516        .await
1517        .unwrap();
1518
1519        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1520            .await
1521            .unwrap();
1522        fs.insert_file(Path::new("/a/file4"), "4".into())
1523            .await
1524            .unwrap();
1525
1526        worktree_a
1527            .condition(&cx_a, |tree, _| {
1528                tree.paths()
1529                    .map(|p| p.to_string_lossy())
1530                    .collect::<Vec<_>>()
1531                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1532            })
1533            .await;
1534        worktree_b
1535            .condition(&cx_b, |tree, _| {
1536                tree.paths()
1537                    .map(|p| p.to_string_lossy())
1538                    .collect::<Vec<_>>()
1539                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1540            })
1541            .await;
1542        worktree_c
1543            .condition(&cx_c, |tree, _| {
1544                tree.paths()
1545                    .map(|p| p.to_string_lossy())
1546                    .collect::<Vec<_>>()
1547                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1548            })
1549            .await;
1550
1551        // Ensure buffer files are updated as well.
1552        buffer_a
1553            .condition(&cx_a, |buf, _| {
1554                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1555            })
1556            .await;
1557        buffer_b
1558            .condition(&cx_b, |buf, _| {
1559                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1560            })
1561            .await;
1562        buffer_c
1563            .condition(&cx_c, |buf, _| {
1564                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1565            })
1566            .await;
1567    }
1568
1569    #[gpui::test(iterations = 10)]
1570    async fn test_buffer_conflict_after_save(
1571        mut cx_a: TestAppContext,
1572        mut cx_b: TestAppContext,
1573        last_iteration: bool,
1574    ) {
1575        cx_a.foreground().forbid_parking();
1576        let lang_registry = Arc::new(LanguageRegistry::new());
1577        let fs = Arc::new(FakeFs::new(cx_a.background()));
1578
1579        // Connect to a server as 2 clients.
1580        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1581        let client_a = server.create_client(&mut cx_a, "user_a").await;
1582        let client_b = server.create_client(&mut cx_b, "user_b").await;
1583
1584        // Share a project as client A
1585        fs.insert_tree(
1586            "/dir",
1587            json!({
1588                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1589                "a.txt": "a-contents",
1590            }),
1591        )
1592        .await;
1593
1594        let project_a = cx_a.update(|cx| {
1595            Project::local(
1596                client_a.clone(),
1597                client_a.user_store.clone(),
1598                lang_registry.clone(),
1599                fs.clone(),
1600                cx,
1601            )
1602        });
1603        let (worktree_a, _) = project_a
1604            .update(&mut cx_a, |p, cx| {
1605                p.find_or_create_local_worktree("/dir", false, cx)
1606            })
1607            .await
1608            .unwrap();
1609        worktree_a
1610            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1611            .await;
1612        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1613        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1614        project_a
1615            .update(&mut cx_a, |p, cx| p.share(cx))
1616            .await
1617            .unwrap();
1618
1619        // Join that project as client B
1620        let project_b = Project::remote(
1621            project_id,
1622            client_b.clone(),
1623            client_b.user_store.clone(),
1624            lang_registry.clone(),
1625            fs.clone(),
1626            &mut cx_b.to_async(),
1627        )
1628        .await
1629        .unwrap();
1630
1631        // Open a buffer as client B
1632        let buffer_b = project_b
1633            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1634            .await
1635            .unwrap();
1636
1637        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1638        buffer_b.read_with(&cx_b, |buf, _| {
1639            assert!(buf.is_dirty());
1640            assert!(!buf.has_conflict());
1641        });
1642
1643        buffer_b
1644            .update(&mut cx_b, |buf, cx| buf.save(cx))
1645            .await
1646            .unwrap();
1647        buffer_b
1648            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1649            .await;
1650        buffer_b.read_with(&cx_b, |buf, _| {
1651            assert!(!buf.has_conflict());
1652        });
1653
1654        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1655        buffer_b.read_with(&cx_b, |buf, _| {
1656            assert!(buf.is_dirty());
1657            assert!(!buf.has_conflict());
1658        });
1659    }
1660
1661    #[gpui::test(iterations = 10)]
1662    async fn test_buffer_reloading(
1663        mut cx_a: TestAppContext,
1664        mut cx_b: TestAppContext,
1665        last_iteration: bool,
1666    ) {
1667        cx_a.foreground().forbid_parking();
1668        let lang_registry = Arc::new(LanguageRegistry::new());
1669        let fs = Arc::new(FakeFs::new(cx_a.background()));
1670
1671        // Connect to a server as 2 clients.
1672        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1673        let client_a = server.create_client(&mut cx_a, "user_a").await;
1674        let client_b = server.create_client(&mut cx_b, "user_b").await;
1675
1676        // Share a project as client A
1677        fs.insert_tree(
1678            "/dir",
1679            json!({
1680                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1681                "a.txt": "a-contents",
1682            }),
1683        )
1684        .await;
1685
1686        let project_a = cx_a.update(|cx| {
1687            Project::local(
1688                client_a.clone(),
1689                client_a.user_store.clone(),
1690                lang_registry.clone(),
1691                fs.clone(),
1692                cx,
1693            )
1694        });
1695        let (worktree_a, _) = project_a
1696            .update(&mut cx_a, |p, cx| {
1697                p.find_or_create_local_worktree("/dir", false, cx)
1698            })
1699            .await
1700            .unwrap();
1701        worktree_a
1702            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1703            .await;
1704        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1705        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1706        project_a
1707            .update(&mut cx_a, |p, cx| p.share(cx))
1708            .await
1709            .unwrap();
1710
1711        // Join that project as client B
1712        let project_b = Project::remote(
1713            project_id,
1714            client_b.clone(),
1715            client_b.user_store.clone(),
1716            lang_registry.clone(),
1717            fs.clone(),
1718            &mut cx_b.to_async(),
1719        )
1720        .await
1721        .unwrap();
1722        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1723
1724        // Open a buffer as client B
1725        let buffer_b = project_b
1726            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1727            .await
1728            .unwrap();
1729        buffer_b.read_with(&cx_b, |buf, _| {
1730            assert!(!buf.is_dirty());
1731            assert!(!buf.has_conflict());
1732        });
1733
1734        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1735            .await
1736            .unwrap();
1737        buffer_b
1738            .condition(&cx_b, |buf, _| {
1739                buf.text() == "new contents" && !buf.is_dirty()
1740            })
1741            .await;
1742        buffer_b.read_with(&cx_b, |buf, _| {
1743            assert!(!buf.has_conflict());
1744        });
1745    }
1746
1747    #[gpui::test(iterations = 10)]
1748    async fn test_editing_while_guest_opens_buffer(
1749        mut cx_a: TestAppContext,
1750        mut cx_b: TestAppContext,
1751        last_iteration: bool,
1752    ) {
1753        cx_a.foreground().forbid_parking();
1754        let lang_registry = Arc::new(LanguageRegistry::new());
1755        let fs = Arc::new(FakeFs::new(cx_a.background()));
1756
1757        // Connect to a server as 2 clients.
1758        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1759        let client_a = server.create_client(&mut cx_a, "user_a").await;
1760        let client_b = server.create_client(&mut cx_b, "user_b").await;
1761
1762        // Share a project as client A
1763        fs.insert_tree(
1764            "/dir",
1765            json!({
1766                ".zed.toml": r#"collaborators = ["user_b"]"#,
1767                "a.txt": "a-contents",
1768            }),
1769        )
1770        .await;
1771        let project_a = cx_a.update(|cx| {
1772            Project::local(
1773                client_a.clone(),
1774                client_a.user_store.clone(),
1775                lang_registry.clone(),
1776                fs.clone(),
1777                cx,
1778            )
1779        });
1780        let (worktree_a, _) = project_a
1781            .update(&mut cx_a, |p, cx| {
1782                p.find_or_create_local_worktree("/dir", false, cx)
1783            })
1784            .await
1785            .unwrap();
1786        worktree_a
1787            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1788            .await;
1789        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1790        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1791        project_a
1792            .update(&mut cx_a, |p, cx| p.share(cx))
1793            .await
1794            .unwrap();
1795
1796        // Join that project as client B
1797        let project_b = Project::remote(
1798            project_id,
1799            client_b.clone(),
1800            client_b.user_store.clone(),
1801            lang_registry.clone(),
1802            fs.clone(),
1803            &mut cx_b.to_async(),
1804        )
1805        .await
1806        .unwrap();
1807
1808        // Open a buffer as client A
1809        let buffer_a = project_a
1810            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1811            .await
1812            .unwrap();
1813
1814        // Start opening the same buffer as client B
1815        let buffer_b = cx_b
1816            .background()
1817            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1818
1819        // Edit the buffer as client A while client B is still opening it.
1820        cx_b.background().simulate_random_delay().await;
1821        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1822        cx_b.background().simulate_random_delay().await;
1823        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1824
1825        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1826        let buffer_b = buffer_b.await.unwrap();
1827        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1828    }
1829
1830    #[gpui::test(iterations = 10)]
1831    async fn test_leaving_worktree_while_opening_buffer(
1832        mut cx_a: TestAppContext,
1833        mut cx_b: TestAppContext,
1834        last_iteration: bool,
1835    ) {
1836        cx_a.foreground().forbid_parking();
1837        let lang_registry = Arc::new(LanguageRegistry::new());
1838        let fs = Arc::new(FakeFs::new(cx_a.background()));
1839
1840        // Connect to a server as 2 clients.
1841        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1842        let client_a = server.create_client(&mut cx_a, "user_a").await;
1843        let client_b = server.create_client(&mut cx_b, "user_b").await;
1844
1845        // Share a project as client A
1846        fs.insert_tree(
1847            "/dir",
1848            json!({
1849                ".zed.toml": r#"collaborators = ["user_b"]"#,
1850                "a.txt": "a-contents",
1851            }),
1852        )
1853        .await;
1854        let project_a = cx_a.update(|cx| {
1855            Project::local(
1856                client_a.clone(),
1857                client_a.user_store.clone(),
1858                lang_registry.clone(),
1859                fs.clone(),
1860                cx,
1861            )
1862        });
1863        let (worktree_a, _) = project_a
1864            .update(&mut cx_a, |p, cx| {
1865                p.find_or_create_local_worktree("/dir", false, cx)
1866            })
1867            .await
1868            .unwrap();
1869        worktree_a
1870            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1871            .await;
1872        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1873        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1874        project_a
1875            .update(&mut cx_a, |p, cx| p.share(cx))
1876            .await
1877            .unwrap();
1878
1879        // Join that project as client B
1880        let project_b = Project::remote(
1881            project_id,
1882            client_b.clone(),
1883            client_b.user_store.clone(),
1884            lang_registry.clone(),
1885            fs.clone(),
1886            &mut cx_b.to_async(),
1887        )
1888        .await
1889        .unwrap();
1890
1891        // See that a guest has joined as client A.
1892        project_a
1893            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1894            .await;
1895
1896        // Begin opening a buffer as client B, but leave the project before the open completes.
1897        let buffer_b = cx_b
1898            .background()
1899            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1900        cx_b.update(|_| drop(project_b));
1901        drop(buffer_b);
1902
1903        // See that the guest has left.
1904        project_a
1905            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1906            .await;
1907    }
1908
1909    #[gpui::test(iterations = 10)]
1910    async fn test_peer_disconnection(
1911        mut cx_a: TestAppContext,
1912        mut cx_b: TestAppContext,
1913        last_iteration: bool,
1914    ) {
1915        cx_a.foreground().forbid_parking();
1916        let lang_registry = Arc::new(LanguageRegistry::new());
1917        let fs = Arc::new(FakeFs::new(cx_a.background()));
1918
1919        // Connect to a server as 2 clients.
1920        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1921        let client_a = server.create_client(&mut cx_a, "user_a").await;
1922        let client_b = server.create_client(&mut cx_b, "user_b").await;
1923
1924        // Share a project as client A
1925        fs.insert_tree(
1926            "/a",
1927            json!({
1928                ".zed.toml": r#"collaborators = ["user_b"]"#,
1929                "a.txt": "a-contents",
1930                "b.txt": "b-contents",
1931            }),
1932        )
1933        .await;
1934        let project_a = cx_a.update(|cx| {
1935            Project::local(
1936                client_a.clone(),
1937                client_a.user_store.clone(),
1938                lang_registry.clone(),
1939                fs.clone(),
1940                cx,
1941            )
1942        });
1943        let (worktree_a, _) = project_a
1944            .update(&mut cx_a, |p, cx| {
1945                p.find_or_create_local_worktree("/a", false, cx)
1946            })
1947            .await
1948            .unwrap();
1949        worktree_a
1950            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1951            .await;
1952        let project_id = project_a
1953            .update(&mut cx_a, |project, _| project.next_remote_id())
1954            .await;
1955        project_a
1956            .update(&mut cx_a, |project, cx| project.share(cx))
1957            .await
1958            .unwrap();
1959
1960        // Join that project as client B
1961        let _project_b = Project::remote(
1962            project_id,
1963            client_b.clone(),
1964            client_b.user_store.clone(),
1965            lang_registry.clone(),
1966            fs.clone(),
1967            &mut cx_b.to_async(),
1968        )
1969        .await
1970        .unwrap();
1971
1972        // See that a guest has joined as client A.
1973        project_a
1974            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1975            .await;
1976
1977        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1978        client_b.disconnect(&cx_b.to_async()).unwrap();
1979        project_a
1980            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1981            .await;
1982    }
1983
1984    #[gpui::test(iterations = 10)]
1985    async fn test_collaborating_with_diagnostics(
1986        mut cx_a: TestAppContext,
1987        mut cx_b: TestAppContext,
1988        last_iteration: bool,
1989    ) {
1990        cx_a.foreground().forbid_parking();
1991        let mut lang_registry = Arc::new(LanguageRegistry::new());
1992        let fs = Arc::new(FakeFs::new(cx_a.background()));
1993
1994        // Set up a fake language server.
1995        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1996        Arc::get_mut(&mut lang_registry)
1997            .unwrap()
1998            .add(Arc::new(Language::new(
1999                LanguageConfig {
2000                    name: "Rust".to_string(),
2001                    path_suffixes: vec!["rs".to_string()],
2002                    language_server: Some(language_server_config),
2003                    ..Default::default()
2004                },
2005                Some(tree_sitter_rust::language()),
2006            )));
2007
2008        // Connect to a server as 2 clients.
2009        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2010        let client_a = server.create_client(&mut cx_a, "user_a").await;
2011        let client_b = server.create_client(&mut cx_b, "user_b").await;
2012
2013        // Share a project as client A
2014        fs.insert_tree(
2015            "/a",
2016            json!({
2017                ".zed.toml": r#"collaborators = ["user_b"]"#,
2018                "a.rs": "let one = two",
2019                "other.rs": "",
2020            }),
2021        )
2022        .await;
2023        let project_a = cx_a.update(|cx| {
2024            Project::local(
2025                client_a.clone(),
2026                client_a.user_store.clone(),
2027                lang_registry.clone(),
2028                fs.clone(),
2029                cx,
2030            )
2031        });
2032        let (worktree_a, _) = project_a
2033            .update(&mut cx_a, |p, cx| {
2034                p.find_or_create_local_worktree("/a", false, cx)
2035            })
2036            .await
2037            .unwrap();
2038        worktree_a
2039            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2040            .await;
2041        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2042        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2043        project_a
2044            .update(&mut cx_a, |p, cx| p.share(cx))
2045            .await
2046            .unwrap();
2047
2048        // Cause the language server to start.
2049        let _ = cx_a
2050            .background()
2051            .spawn(project_a.update(&mut cx_a, |project, cx| {
2052                project.open_buffer(
2053                    ProjectPath {
2054                        worktree_id,
2055                        path: Path::new("other.rs").into(),
2056                    },
2057                    cx,
2058                )
2059            }))
2060            .await
2061            .unwrap();
2062
2063        // Simulate a language server reporting errors for a file.
2064        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2065        fake_language_server
2066            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2067                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2068                version: None,
2069                diagnostics: vec![lsp::Diagnostic {
2070                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2071                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2072                    message: "message 1".to_string(),
2073                    ..Default::default()
2074                }],
2075            })
2076            .await;
2077
2078        // Wait for server to see the diagnostics update.
2079        server
2080            .condition(|store| {
2081                let worktree = store
2082                    .project(project_id)
2083                    .unwrap()
2084                    .worktrees
2085                    .get(&worktree_id.to_proto())
2086                    .unwrap();
2087
2088                !worktree
2089                    .share
2090                    .as_ref()
2091                    .unwrap()
2092                    .diagnostic_summaries
2093                    .is_empty()
2094            })
2095            .await;
2096
2097        // Join the worktree as client B.
2098        let project_b = Project::remote(
2099            project_id,
2100            client_b.clone(),
2101            client_b.user_store.clone(),
2102            lang_registry.clone(),
2103            fs.clone(),
2104            &mut cx_b.to_async(),
2105        )
2106        .await
2107        .unwrap();
2108
2109        project_b.read_with(&cx_b, |project, cx| {
2110            assert_eq!(
2111                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2112                &[(
2113                    ProjectPath {
2114                        worktree_id,
2115                        path: Arc::from(Path::new("a.rs")),
2116                    },
2117                    DiagnosticSummary {
2118                        error_count: 1,
2119                        warning_count: 0,
2120                        ..Default::default()
2121                    },
2122                )]
2123            )
2124        });
2125
2126        // Simulate a language server reporting more errors for a file.
2127        fake_language_server
2128            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2129                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2130                version: None,
2131                diagnostics: vec![
2132                    lsp::Diagnostic {
2133                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2134                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2135                        message: "message 1".to_string(),
2136                        ..Default::default()
2137                    },
2138                    lsp::Diagnostic {
2139                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2140                        range: lsp::Range::new(
2141                            lsp::Position::new(0, 10),
2142                            lsp::Position::new(0, 13),
2143                        ),
2144                        message: "message 2".to_string(),
2145                        ..Default::default()
2146                    },
2147                ],
2148            })
2149            .await;
2150
2151        // Client b gets the updated summaries
2152        project_b
2153            .condition(&cx_b, |project, cx| {
2154                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2155                    == &[(
2156                        ProjectPath {
2157                            worktree_id,
2158                            path: Arc::from(Path::new("a.rs")),
2159                        },
2160                        DiagnosticSummary {
2161                            error_count: 1,
2162                            warning_count: 1,
2163                            ..Default::default()
2164                        },
2165                    )]
2166            })
2167            .await;
2168
2169        // Open the file with the errors on client B. They should be present.
2170        let buffer_b = cx_b
2171            .background()
2172            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2173            .await
2174            .unwrap();
2175
2176        buffer_b.read_with(&cx_b, |buffer, _| {
2177            assert_eq!(
2178                buffer
2179                    .snapshot()
2180                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2181                    .map(|entry| entry)
2182                    .collect::<Vec<_>>(),
2183                &[
2184                    DiagnosticEntry {
2185                        range: Point::new(0, 4)..Point::new(0, 7),
2186                        diagnostic: Diagnostic {
2187                            group_id: 0,
2188                            message: "message 1".to_string(),
2189                            severity: lsp::DiagnosticSeverity::ERROR,
2190                            is_primary: true,
2191                            ..Default::default()
2192                        }
2193                    },
2194                    DiagnosticEntry {
2195                        range: Point::new(0, 10)..Point::new(0, 13),
2196                        diagnostic: Diagnostic {
2197                            group_id: 1,
2198                            severity: lsp::DiagnosticSeverity::WARNING,
2199                            message: "message 2".to_string(),
2200                            is_primary: true,
2201                            ..Default::default()
2202                        }
2203                    }
2204                ]
2205            );
2206        });
2207    }
2208
2209    #[gpui::test(iterations = 10)]
2210    async fn test_collaborating_with_completion(
2211        mut cx_a: TestAppContext,
2212        mut cx_b: TestAppContext,
2213        last_iteration: bool,
2214    ) {
2215        cx_a.foreground().forbid_parking();
2216        let mut lang_registry = Arc::new(LanguageRegistry::new());
2217        let fs = Arc::new(FakeFs::new(cx_a.background()));
2218
2219        // Set up a fake language server.
2220        let (language_server_config, mut fake_language_servers) =
2221            LanguageServerConfig::fake_with_capabilities(lsp::ServerCapabilities {
2222                completion_provider: Some(lsp::CompletionOptions {
2223                    trigger_characters: Some(vec![".".to_string()]),
2224                    ..Default::default()
2225                }),
2226                ..Default::default()
2227            });
2228        Arc::get_mut(&mut lang_registry)
2229            .unwrap()
2230            .add(Arc::new(Language::new(
2231                LanguageConfig {
2232                    name: "Rust".to_string(),
2233                    path_suffixes: vec!["rs".to_string()],
2234                    language_server: Some(language_server_config),
2235                    ..Default::default()
2236                },
2237                Some(tree_sitter_rust::language()),
2238            )));
2239
2240        // Connect to a server as 2 clients.
2241        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2242        let client_a = server.create_client(&mut cx_a, "user_a").await;
2243        let client_b = server.create_client(&mut cx_b, "user_b").await;
2244
2245        // Share a project as client A
2246        fs.insert_tree(
2247            "/a",
2248            json!({
2249                ".zed.toml": r#"collaborators = ["user_b"]"#,
2250                "main.rs": "fn main() { a }",
2251                "other.rs": "",
2252            }),
2253        )
2254        .await;
2255        let project_a = cx_a.update(|cx| {
2256            Project::local(
2257                client_a.clone(),
2258                client_a.user_store.clone(),
2259                lang_registry.clone(),
2260                fs.clone(),
2261                cx,
2262            )
2263        });
2264        let (worktree_a, _) = project_a
2265            .update(&mut cx_a, |p, cx| {
2266                p.find_or_create_local_worktree("/a", false, cx)
2267            })
2268            .await
2269            .unwrap();
2270        worktree_a
2271            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2272            .await;
2273        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2274        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2275        project_a
2276            .update(&mut cx_a, |p, cx| p.share(cx))
2277            .await
2278            .unwrap();
2279
2280        // Join the worktree as client B.
2281        let project_b = Project::remote(
2282            project_id,
2283            client_b.clone(),
2284            client_b.user_store.clone(),
2285            lang_registry.clone(),
2286            fs.clone(),
2287            &mut cx_b.to_async(),
2288        )
2289        .await
2290        .unwrap();
2291
2292        // Open a file in an editor as the guest.
2293        let buffer_b = project_b
2294            .update(&mut cx_b, |p, cx| {
2295                p.open_buffer((worktree_id, "main.rs"), cx)
2296            })
2297            .await
2298            .unwrap();
2299        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2300        let editor_b = cx_b.add_view(window_b, |cx| {
2301            Editor::for_buffer(
2302                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2303                Arc::new(|cx| EditorSettings::test(cx)),
2304                Some(project_b.clone()),
2305                cx,
2306            )
2307        });
2308
2309        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2310        buffer_b
2311            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2312            .await;
2313
2314        // Type a completion trigger character as the guest.
2315        editor_b.update(&mut cx_b, |editor, cx| {
2316            editor.select_ranges([13..13], None, cx);
2317            editor.handle_input(&Input(".".into()), cx);
2318            cx.focus(&editor_b);
2319        });
2320
2321        // Receive a completion request as the host's language server.
2322        // Return some completions from the host's language server.
2323        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2324            assert_eq!(
2325                params.text_document_position.text_document.uri,
2326                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2327            );
2328            assert_eq!(
2329                params.text_document_position.position,
2330                lsp::Position::new(0, 14),
2331            );
2332
2333            Some(lsp::CompletionResponse::Array(vec![
2334                lsp::CompletionItem {
2335                    label: "first_method(…)".into(),
2336                    detail: Some("fn(&mut self, B) -> C".into()),
2337                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2338                        new_text: "first_method($1)".to_string(),
2339                        range: lsp::Range::new(
2340                            lsp::Position::new(0, 14),
2341                            lsp::Position::new(0, 14),
2342                        ),
2343                    })),
2344                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2345                    ..Default::default()
2346                },
2347                lsp::CompletionItem {
2348                    label: "second_method(…)".into(),
2349                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2350                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2351                        new_text: "second_method()".to_string(),
2352                        range: lsp::Range::new(
2353                            lsp::Position::new(0, 14),
2354                            lsp::Position::new(0, 14),
2355                        ),
2356                    })),
2357                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2358                    ..Default::default()
2359                },
2360            ]))
2361        });
2362
2363        // Open the buffer on the host.
2364        let buffer_a = project_a
2365            .update(&mut cx_a, |p, cx| {
2366                p.open_buffer((worktree_id, "main.rs"), cx)
2367            })
2368            .await
2369            .unwrap();
2370        buffer_a
2371            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2372            .await;
2373
2374        // Confirm a completion on the guest.
2375        editor_b
2376            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2377            .await;
2378        editor_b.update(&mut cx_b, |editor, cx| {
2379            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2380            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2381        });
2382
2383        // Return a resolved completion from the host's language server.
2384        // The resolved completion has an additional text edit.
2385        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2386            assert_eq!(params.label, "first_method(…)");
2387            lsp::CompletionItem {
2388                label: "first_method(…)".into(),
2389                detail: Some("fn(&mut self, B) -> C".into()),
2390                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2391                    new_text: "first_method($1)".to_string(),
2392                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2393                })),
2394                additional_text_edits: Some(vec![lsp::TextEdit {
2395                    new_text: "use d::SomeTrait;\n".to_string(),
2396                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2397                }]),
2398                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2399                ..Default::default()
2400            }
2401        });
2402
2403        // The additional edit is applied.
2404        buffer_a
2405            .condition(&cx_a, |buffer, _| {
2406                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2407            })
2408            .await;
2409        buffer_b
2410            .condition(&cx_b, |buffer, _| {
2411                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2412            })
2413            .await;
2414    }
2415
2416    #[gpui::test(iterations = 10)]
2417    async fn test_formatting_buffer(
2418        mut cx_a: TestAppContext,
2419        mut cx_b: TestAppContext,
2420        last_iteration: bool,
2421    ) {
2422        cx_a.foreground().forbid_parking();
2423        let mut lang_registry = Arc::new(LanguageRegistry::new());
2424        let fs = Arc::new(FakeFs::new(cx_a.background()));
2425
2426        // Set up a fake language server.
2427        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2428        Arc::get_mut(&mut lang_registry)
2429            .unwrap()
2430            .add(Arc::new(Language::new(
2431                LanguageConfig {
2432                    name: "Rust".to_string(),
2433                    path_suffixes: vec!["rs".to_string()],
2434                    language_server: Some(language_server_config),
2435                    ..Default::default()
2436                },
2437                Some(tree_sitter_rust::language()),
2438            )));
2439
2440        // Connect to a server as 2 clients.
2441        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2442        let client_a = server.create_client(&mut cx_a, "user_a").await;
2443        let client_b = server.create_client(&mut cx_b, "user_b").await;
2444
2445        // Share a project as client A
2446        fs.insert_tree(
2447            "/a",
2448            json!({
2449                ".zed.toml": r#"collaborators = ["user_b"]"#,
2450                "a.rs": "let one = two",
2451            }),
2452        )
2453        .await;
2454        let project_a = cx_a.update(|cx| {
2455            Project::local(
2456                client_a.clone(),
2457                client_a.user_store.clone(),
2458                lang_registry.clone(),
2459                fs.clone(),
2460                cx,
2461            )
2462        });
2463        let (worktree_a, _) = project_a
2464            .update(&mut cx_a, |p, cx| {
2465                p.find_or_create_local_worktree("/a", false, cx)
2466            })
2467            .await
2468            .unwrap();
2469        worktree_a
2470            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2471            .await;
2472        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2473        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2474        project_a
2475            .update(&mut cx_a, |p, cx| p.share(cx))
2476            .await
2477            .unwrap();
2478
2479        // Join the worktree as client B.
2480        let project_b = Project::remote(
2481            project_id,
2482            client_b.clone(),
2483            client_b.user_store.clone(),
2484            lang_registry.clone(),
2485            fs.clone(),
2486            &mut cx_b.to_async(),
2487        )
2488        .await
2489        .unwrap();
2490
2491        let buffer_b = cx_b
2492            .background()
2493            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2494            .await
2495            .unwrap();
2496
2497        let format = project_b.update(&mut cx_b, |project, cx| {
2498            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2499        });
2500
2501        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2502        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2503            Some(vec![
2504                lsp::TextEdit {
2505                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2506                    new_text: "h".to_string(),
2507                },
2508                lsp::TextEdit {
2509                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2510                    new_text: "y".to_string(),
2511                },
2512            ])
2513        });
2514
2515        format.await.unwrap();
2516        assert_eq!(
2517            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2518            "let honey = two"
2519        );
2520    }
2521
2522    #[gpui::test(iterations = 10)]
2523    async fn test_definition(
2524        mut cx_a: TestAppContext,
2525        mut cx_b: TestAppContext,
2526        last_iteration: bool,
2527    ) {
2528        cx_a.foreground().forbid_parking();
2529        let mut lang_registry = Arc::new(LanguageRegistry::new());
2530        let fs = Arc::new(FakeFs::new(cx_a.background()));
2531        fs.insert_tree(
2532            "/root-1",
2533            json!({
2534                ".zed.toml": r#"collaborators = ["user_b"]"#,
2535                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2536            }),
2537        )
2538        .await;
2539        fs.insert_tree(
2540            "/root-2",
2541            json!({
2542                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2543            }),
2544        )
2545        .await;
2546
2547        // Set up a fake language server.
2548        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2549        Arc::get_mut(&mut lang_registry)
2550            .unwrap()
2551            .add(Arc::new(Language::new(
2552                LanguageConfig {
2553                    name: "Rust".to_string(),
2554                    path_suffixes: vec!["rs".to_string()],
2555                    language_server: Some(language_server_config),
2556                    ..Default::default()
2557                },
2558                Some(tree_sitter_rust::language()),
2559            )));
2560
2561        // Connect to a server as 2 clients.
2562        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2563        let client_a = server.create_client(&mut cx_a, "user_a").await;
2564        let client_b = server.create_client(&mut cx_b, "user_b").await;
2565
2566        // Share a project as client A
2567        let project_a = cx_a.update(|cx| {
2568            Project::local(
2569                client_a.clone(),
2570                client_a.user_store.clone(),
2571                lang_registry.clone(),
2572                fs.clone(),
2573                cx,
2574            )
2575        });
2576        let (worktree_a, _) = project_a
2577            .update(&mut cx_a, |p, cx| {
2578                p.find_or_create_local_worktree("/root-1", false, cx)
2579            })
2580            .await
2581            .unwrap();
2582        worktree_a
2583            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2584            .await;
2585        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2586        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2587        project_a
2588            .update(&mut cx_a, |p, cx| p.share(cx))
2589            .await
2590            .unwrap();
2591
2592        // Join the worktree as client B.
2593        let project_b = Project::remote(
2594            project_id,
2595            client_b.clone(),
2596            client_b.user_store.clone(),
2597            lang_registry.clone(),
2598            fs.clone(),
2599            &mut cx_b.to_async(),
2600        )
2601        .await
2602        .unwrap();
2603
2604        // Open the file on client B.
2605        let buffer_b = cx_b
2606            .background()
2607            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2608            .await
2609            .unwrap();
2610
2611        // Request the definition of a symbol as the guest.
2612        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2613
2614        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2615        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2616            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2617                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2618                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2619            )))
2620        });
2621
2622        let definitions_1 = definitions_1.await.unwrap();
2623        cx_b.read(|cx| {
2624            assert_eq!(definitions_1.len(), 1);
2625            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2626            let target_buffer = definitions_1[0].target_buffer.read(cx);
2627            assert_eq!(
2628                target_buffer.text(),
2629                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2630            );
2631            assert_eq!(
2632                definitions_1[0].target_range.to_point(target_buffer),
2633                Point::new(0, 6)..Point::new(0, 9)
2634            );
2635        });
2636
2637        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2638        // the previous call to `definition`.
2639        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2640        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2641            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2642                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2643                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2644            )))
2645        });
2646
2647        let definitions_2 = definitions_2.await.unwrap();
2648        cx_b.read(|cx| {
2649            assert_eq!(definitions_2.len(), 1);
2650            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2651            let target_buffer = definitions_2[0].target_buffer.read(cx);
2652            assert_eq!(
2653                target_buffer.text(),
2654                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2655            );
2656            assert_eq!(
2657                definitions_2[0].target_range.to_point(target_buffer),
2658                Point::new(1, 6)..Point::new(1, 11)
2659            );
2660        });
2661        assert_eq!(
2662            definitions_1[0].target_buffer,
2663            definitions_2[0].target_buffer
2664        );
2665
2666        cx_b.update(|_| {
2667            drop(definitions_1);
2668            drop(definitions_2);
2669        });
2670        project_b
2671            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2672            .await;
2673    }
2674
2675    #[gpui::test(iterations = 10)]
2676    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2677        mut cx_a: TestAppContext,
2678        mut cx_b: TestAppContext,
2679        mut rng: StdRng,
2680        last_iteration: bool,
2681    ) {
2682        cx_a.foreground().forbid_parking();
2683        let mut lang_registry = Arc::new(LanguageRegistry::new());
2684        let fs = Arc::new(FakeFs::new(cx_a.background()));
2685        fs.insert_tree(
2686            "/root",
2687            json!({
2688                ".zed.toml": r#"collaborators = ["user_b"]"#,
2689                "a.rs": "const ONE: usize = b::TWO;",
2690                "b.rs": "const TWO: usize = 2",
2691            }),
2692        )
2693        .await;
2694
2695        // Set up a fake language server.
2696        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2697
2698        Arc::get_mut(&mut lang_registry)
2699            .unwrap()
2700            .add(Arc::new(Language::new(
2701                LanguageConfig {
2702                    name: "Rust".to_string(),
2703                    path_suffixes: vec!["rs".to_string()],
2704                    language_server: Some(language_server_config),
2705                    ..Default::default()
2706                },
2707                Some(tree_sitter_rust::language()),
2708            )));
2709
2710        // Connect to a server as 2 clients.
2711        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2712        let client_a = server.create_client(&mut cx_a, "user_a").await;
2713        let client_b = server.create_client(&mut cx_b, "user_b").await;
2714
2715        // Share a project as client A
2716        let project_a = cx_a.update(|cx| {
2717            Project::local(
2718                client_a.clone(),
2719                client_a.user_store.clone(),
2720                lang_registry.clone(),
2721                fs.clone(),
2722                cx,
2723            )
2724        });
2725
2726        let (worktree_a, _) = project_a
2727            .update(&mut cx_a, |p, cx| {
2728                p.find_or_create_local_worktree("/root", false, cx)
2729            })
2730            .await
2731            .unwrap();
2732        worktree_a
2733            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2734            .await;
2735        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2736        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2737        project_a
2738            .update(&mut cx_a, |p, cx| p.share(cx))
2739            .await
2740            .unwrap();
2741
2742        // Join the worktree as client B.
2743        let project_b = Project::remote(
2744            project_id,
2745            client_b.clone(),
2746            client_b.user_store.clone(),
2747            lang_registry.clone(),
2748            fs.clone(),
2749            &mut cx_b.to_async(),
2750        )
2751        .await
2752        .unwrap();
2753
2754        let buffer_b1 = cx_b
2755            .background()
2756            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2757            .await
2758            .unwrap();
2759
2760        let definitions;
2761        let buffer_b2;
2762        if rng.gen() {
2763            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2764            buffer_b2 =
2765                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2766        } else {
2767            buffer_b2 =
2768                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2769            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2770        }
2771
2772        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2773        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2774            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2775                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2776                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2777            )))
2778        });
2779
2780        let buffer_b2 = buffer_b2.await.unwrap();
2781        let definitions = definitions.await.unwrap();
2782        assert_eq!(definitions.len(), 1);
2783        assert_eq!(definitions[0].target_buffer, buffer_b2);
2784    }
2785
2786    #[gpui::test(iterations = 10)]
2787    async fn test_collaborating_with_code_actions(
2788        mut cx_a: TestAppContext,
2789        mut cx_b: TestAppContext,
2790        last_iteration: bool,
2791    ) {
2792        cx_a.foreground().forbid_parking();
2793        let mut lang_registry = Arc::new(LanguageRegistry::new());
2794        let fs = Arc::new(FakeFs::new(cx_a.background()));
2795        let mut path_openers_b = Vec::new();
2796        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2797
2798        // Set up a fake language server.
2799        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2800        Arc::get_mut(&mut lang_registry)
2801            .unwrap()
2802            .add(Arc::new(Language::new(
2803                LanguageConfig {
2804                    name: "Rust".to_string(),
2805                    path_suffixes: vec!["rs".to_string()],
2806                    language_server: Some(language_server_config),
2807                    ..Default::default()
2808                },
2809                Some(tree_sitter_rust::language()),
2810            )));
2811
2812        // Connect to a server as 2 clients.
2813        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2814        let client_a = server.create_client(&mut cx_a, "user_a").await;
2815        let client_b = server.create_client(&mut cx_b, "user_b").await;
2816
2817        // Share a project as client A
2818        fs.insert_tree(
2819            "/a",
2820            json!({
2821                ".zed.toml": r#"collaborators = ["user_b"]"#,
2822                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2823                "other.rs": "pub fn foo() -> usize { 4 }",
2824            }),
2825        )
2826        .await;
2827        let project_a = cx_a.update(|cx| {
2828            Project::local(
2829                client_a.clone(),
2830                client_a.user_store.clone(),
2831                lang_registry.clone(),
2832                fs.clone(),
2833                cx,
2834            )
2835        });
2836        let (worktree_a, _) = project_a
2837            .update(&mut cx_a, |p, cx| {
2838                p.find_or_create_local_worktree("/a", false, cx)
2839            })
2840            .await
2841            .unwrap();
2842        worktree_a
2843            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2844            .await;
2845        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2846        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2847        project_a
2848            .update(&mut cx_a, |p, cx| p.share(cx))
2849            .await
2850            .unwrap();
2851
2852        // Join the worktree as client B.
2853        let project_b = Project::remote(
2854            project_id,
2855            client_b.clone(),
2856            client_b.user_store.clone(),
2857            lang_registry.clone(),
2858            fs.clone(),
2859            &mut cx_b.to_async(),
2860        )
2861        .await
2862        .unwrap();
2863        let mut params = cx_b.update(WorkspaceParams::test);
2864        params.languages = lang_registry.clone();
2865        params.client = client_b.client.clone();
2866        params.user_store = client_b.user_store.clone();
2867        params.project = project_b;
2868        params.path_openers = path_openers_b.into();
2869
2870        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2871        let editor_b = workspace_b
2872            .update(&mut cx_b, |workspace, cx| {
2873                workspace.open_path((worktree_id, "main.rs").into(), cx)
2874            })
2875            .await
2876            .unwrap()
2877            .downcast::<Editor>()
2878            .unwrap();
2879
2880        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2881        fake_language_server
2882            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2883                assert_eq!(
2884                    params.text_document.uri,
2885                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2886                );
2887                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2888                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2889                None
2890            })
2891            .next()
2892            .await;
2893
2894        // Move cursor to a location that contains code actions.
2895        editor_b.update(&mut cx_b, |editor, cx| {
2896            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2897            cx.focus(&editor_b);
2898        });
2899        fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2900            assert_eq!(
2901                params.text_document.uri,
2902                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2903            );
2904            assert_eq!(params.range.start, lsp::Position::new(1, 31));
2905            assert_eq!(params.range.end, lsp::Position::new(1, 31));
2906
2907            Some(vec![lsp::CodeActionOrCommand::CodeAction(
2908                lsp::CodeAction {
2909                    title: "Inline into all callers".to_string(),
2910                    edit: Some(lsp::WorkspaceEdit {
2911                        changes: Some(
2912                            [
2913                                (
2914                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2915                                    vec![lsp::TextEdit::new(
2916                                        lsp::Range::new(
2917                                            lsp::Position::new(1, 22),
2918                                            lsp::Position::new(1, 34),
2919                                        ),
2920                                        "4".to_string(),
2921                                    )],
2922                                ),
2923                                (
2924                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
2925                                    vec![lsp::TextEdit::new(
2926                                        lsp::Range::new(
2927                                            lsp::Position::new(0, 0),
2928                                            lsp::Position::new(0, 27),
2929                                        ),
2930                                        "".to_string(),
2931                                    )],
2932                                ),
2933                            ]
2934                            .into_iter()
2935                            .collect(),
2936                        ),
2937                        ..Default::default()
2938                    }),
2939                    data: Some(json!({
2940                        "codeActionParams": {
2941                            "range": {
2942                                "start": {"line": 1, "column": 31},
2943                                "end": {"line": 1, "column": 31},
2944                            }
2945                        }
2946                    })),
2947                    ..Default::default()
2948                },
2949            )])
2950        });
2951
2952        // Toggle code actions and wait for them to display.
2953        editor_b.update(&mut cx_b, |editor, cx| {
2954            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2955        });
2956        editor_b
2957            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2958            .await;
2959
2960        // Confirming the code action will trigger a resolve request.
2961        let confirm_action = workspace_b
2962            .update(&mut cx_b, |workspace, cx| {
2963                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2964            })
2965            .unwrap();
2966        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2967            lsp::CodeAction {
2968                title: "Inline into all callers".to_string(),
2969                edit: Some(lsp::WorkspaceEdit {
2970                    changes: Some(
2971                        [
2972                            (
2973                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2974                                vec![lsp::TextEdit::new(
2975                                    lsp::Range::new(
2976                                        lsp::Position::new(1, 22),
2977                                        lsp::Position::new(1, 34),
2978                                    ),
2979                                    "4".to_string(),
2980                                )],
2981                            ),
2982                            (
2983                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2984                                vec![lsp::TextEdit::new(
2985                                    lsp::Range::new(
2986                                        lsp::Position::new(0, 0),
2987                                        lsp::Position::new(0, 27),
2988                                    ),
2989                                    "".to_string(),
2990                                )],
2991                            ),
2992                        ]
2993                        .into_iter()
2994                        .collect(),
2995                    ),
2996                    ..Default::default()
2997                }),
2998                ..Default::default()
2999            }
3000        });
3001
3002        // After the action is confirmed, an editor containing both modified files is opened.
3003        confirm_action.await.unwrap();
3004        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3005            workspace
3006                .active_item(cx)
3007                .unwrap()
3008                .downcast::<Editor>()
3009                .unwrap()
3010        });
3011        code_action_editor.update(&mut cx_b, |editor, cx| {
3012            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3013            editor.undo(&Undo, cx);
3014            assert_eq!(
3015                editor.text(cx),
3016                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3017            );
3018            editor.redo(&Redo, cx);
3019            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3020        });
3021    }
3022
3023    #[gpui::test(iterations = 10)]
3024    async fn test_basic_chat(
3025        mut cx_a: TestAppContext,
3026        mut cx_b: TestAppContext,
3027        last_iteration: bool,
3028    ) {
3029        cx_a.foreground().forbid_parking();
3030
3031        // Connect to a server as 2 clients.
3032        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3033        let client_a = server.create_client(&mut cx_a, "user_a").await;
3034        let client_b = server.create_client(&mut cx_b, "user_b").await;
3035
3036        // Create an org that includes these 2 users.
3037        let db = &server.app_state.db;
3038        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3039        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3040            .await
3041            .unwrap();
3042        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3043            .await
3044            .unwrap();
3045
3046        // Create a channel that includes all the users.
3047        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3048        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3049            .await
3050            .unwrap();
3051        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3052            .await
3053            .unwrap();
3054        db.create_channel_message(
3055            channel_id,
3056            client_b.current_user_id(&cx_b),
3057            "hello A, it's B.",
3058            OffsetDateTime::now_utc(),
3059            1,
3060        )
3061        .await
3062        .unwrap();
3063
3064        let channels_a = cx_a
3065            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3066        channels_a
3067            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3068            .await;
3069        channels_a.read_with(&cx_a, |list, _| {
3070            assert_eq!(
3071                list.available_channels().unwrap(),
3072                &[ChannelDetails {
3073                    id: channel_id.to_proto(),
3074                    name: "test-channel".to_string()
3075                }]
3076            )
3077        });
3078        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3079            this.get_channel(channel_id.to_proto(), cx).unwrap()
3080        });
3081        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3082        channel_a
3083            .condition(&cx_a, |channel, _| {
3084                channel_messages(channel)
3085                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3086            })
3087            .await;
3088
3089        let channels_b = cx_b
3090            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3091        channels_b
3092            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3093            .await;
3094        channels_b.read_with(&cx_b, |list, _| {
3095            assert_eq!(
3096                list.available_channels().unwrap(),
3097                &[ChannelDetails {
3098                    id: channel_id.to_proto(),
3099                    name: "test-channel".to_string()
3100                }]
3101            )
3102        });
3103
3104        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3105            this.get_channel(channel_id.to_proto(), cx).unwrap()
3106        });
3107        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3108        channel_b
3109            .condition(&cx_b, |channel, _| {
3110                channel_messages(channel)
3111                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3112            })
3113            .await;
3114
3115        channel_a
3116            .update(&mut cx_a, |channel, cx| {
3117                channel
3118                    .send_message("oh, hi B.".to_string(), cx)
3119                    .unwrap()
3120                    .detach();
3121                let task = channel.send_message("sup".to_string(), cx).unwrap();
3122                assert_eq!(
3123                    channel_messages(channel),
3124                    &[
3125                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3126                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3127                        ("user_a".to_string(), "sup".to_string(), true)
3128                    ]
3129                );
3130                task
3131            })
3132            .await
3133            .unwrap();
3134
3135        channel_b
3136            .condition(&cx_b, |channel, _| {
3137                channel_messages(channel)
3138                    == [
3139                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3140                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3141                        ("user_a".to_string(), "sup".to_string(), false),
3142                    ]
3143            })
3144            .await;
3145
3146        assert_eq!(
3147            server
3148                .state()
3149                .await
3150                .channel(channel_id)
3151                .unwrap()
3152                .connection_ids
3153                .len(),
3154            2
3155        );
3156        cx_b.update(|_| drop(channel_b));
3157        server
3158            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3159            .await;
3160
3161        cx_a.update(|_| drop(channel_a));
3162        server
3163            .condition(|state| state.channel(channel_id).is_none())
3164            .await;
3165    }
3166
3167    #[gpui::test(iterations = 10)]
3168    async fn test_chat_message_validation(mut cx_a: TestAppContext, last_iteration: bool) {
3169        cx_a.foreground().forbid_parking();
3170
3171        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3172        let client_a = server.create_client(&mut cx_a, "user_a").await;
3173
3174        let db = &server.app_state.db;
3175        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3176        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3177        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3178            .await
3179            .unwrap();
3180        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3181            .await
3182            .unwrap();
3183
3184        let channels_a = cx_a
3185            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3186        channels_a
3187            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3188            .await;
3189        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3190            this.get_channel(channel_id.to_proto(), cx).unwrap()
3191        });
3192
3193        // Messages aren't allowed to be too long.
3194        channel_a
3195            .update(&mut cx_a, |channel, cx| {
3196                let long_body = "this is long.\n".repeat(1024);
3197                channel.send_message(long_body, cx).unwrap()
3198            })
3199            .await
3200            .unwrap_err();
3201
3202        // Messages aren't allowed to be blank.
3203        channel_a.update(&mut cx_a, |channel, cx| {
3204            channel.send_message(String::new(), cx).unwrap_err()
3205        });
3206
3207        // Leading and trailing whitespace are trimmed.
3208        channel_a
3209            .update(&mut cx_a, |channel, cx| {
3210                channel
3211                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3212                    .unwrap()
3213            })
3214            .await
3215            .unwrap();
3216        assert_eq!(
3217            db.get_channel_messages(channel_id, 10, None)
3218                .await
3219                .unwrap()
3220                .iter()
3221                .map(|m| &m.body)
3222                .collect::<Vec<_>>(),
3223            &["surrounded by whitespace"]
3224        );
3225    }
3226
3227    #[gpui::test(iterations = 10)]
3228    async fn test_chat_reconnection(
3229        mut cx_a: TestAppContext,
3230        mut cx_b: TestAppContext,
3231        last_iteration: bool,
3232    ) {
3233        cx_a.foreground().forbid_parking();
3234
3235        // Connect to a server as 2 clients.
3236        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3237        let client_a = server.create_client(&mut cx_a, "user_a").await;
3238        let client_b = server.create_client(&mut cx_b, "user_b").await;
3239        let mut status_b = client_b.status();
3240
3241        // Create an org that includes these 2 users.
3242        let db = &server.app_state.db;
3243        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3244        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3245            .await
3246            .unwrap();
3247        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3248            .await
3249            .unwrap();
3250
3251        // Create a channel that includes all the users.
3252        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3253        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3254            .await
3255            .unwrap();
3256        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3257            .await
3258            .unwrap();
3259        db.create_channel_message(
3260            channel_id,
3261            client_b.current_user_id(&cx_b),
3262            "hello A, it's B.",
3263            OffsetDateTime::now_utc(),
3264            2,
3265        )
3266        .await
3267        .unwrap();
3268
3269        let channels_a = cx_a
3270            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3271        channels_a
3272            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3273            .await;
3274
3275        channels_a.read_with(&cx_a, |list, _| {
3276            assert_eq!(
3277                list.available_channels().unwrap(),
3278                &[ChannelDetails {
3279                    id: channel_id.to_proto(),
3280                    name: "test-channel".to_string()
3281                }]
3282            )
3283        });
3284        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3285            this.get_channel(channel_id.to_proto(), cx).unwrap()
3286        });
3287        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3288        channel_a
3289            .condition(&cx_a, |channel, _| {
3290                channel_messages(channel)
3291                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3292            })
3293            .await;
3294
3295        let channels_b = cx_b
3296            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3297        channels_b
3298            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3299            .await;
3300        channels_b.read_with(&cx_b, |list, _| {
3301            assert_eq!(
3302                list.available_channels().unwrap(),
3303                &[ChannelDetails {
3304                    id: channel_id.to_proto(),
3305                    name: "test-channel".to_string()
3306                }]
3307            )
3308        });
3309
3310        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3311            this.get_channel(channel_id.to_proto(), cx).unwrap()
3312        });
3313        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3314        channel_b
3315            .condition(&cx_b, |channel, _| {
3316                channel_messages(channel)
3317                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3318            })
3319            .await;
3320
3321        // Disconnect client B, ensuring we can still access its cached channel data.
3322        server.forbid_connections();
3323        server.disconnect_client(client_b.current_user_id(&cx_b));
3324        while !matches!(
3325            status_b.next().await,
3326            Some(client::Status::ReconnectionError { .. })
3327        ) {}
3328
3329        channels_b.read_with(&cx_b, |channels, _| {
3330            assert_eq!(
3331                channels.available_channels().unwrap(),
3332                [ChannelDetails {
3333                    id: channel_id.to_proto(),
3334                    name: "test-channel".to_string()
3335                }]
3336            )
3337        });
3338        channel_b.read_with(&cx_b, |channel, _| {
3339            assert_eq!(
3340                channel_messages(channel),
3341                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3342            )
3343        });
3344
3345        // Send a message from client B while it is disconnected.
3346        channel_b
3347            .update(&mut cx_b, |channel, cx| {
3348                let task = channel
3349                    .send_message("can you see this?".to_string(), cx)
3350                    .unwrap();
3351                assert_eq!(
3352                    channel_messages(channel),
3353                    &[
3354                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3355                        ("user_b".to_string(), "can you see this?".to_string(), true)
3356                    ]
3357                );
3358                task
3359            })
3360            .await
3361            .unwrap_err();
3362
3363        // Send a message from client A while B is disconnected.
3364        channel_a
3365            .update(&mut cx_a, |channel, cx| {
3366                channel
3367                    .send_message("oh, hi B.".to_string(), cx)
3368                    .unwrap()
3369                    .detach();
3370                let task = channel.send_message("sup".to_string(), cx).unwrap();
3371                assert_eq!(
3372                    channel_messages(channel),
3373                    &[
3374                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3375                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3376                        ("user_a".to_string(), "sup".to_string(), true)
3377                    ]
3378                );
3379                task
3380            })
3381            .await
3382            .unwrap();
3383
3384        // Give client B a chance to reconnect.
3385        server.allow_connections();
3386        cx_b.foreground().advance_clock(Duration::from_secs(10));
3387
3388        // Verify that B sees the new messages upon reconnection, as well as the message client B
3389        // sent while offline.
3390        channel_b
3391            .condition(&cx_b, |channel, _| {
3392                channel_messages(channel)
3393                    == [
3394                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3395                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3396                        ("user_a".to_string(), "sup".to_string(), false),
3397                        ("user_b".to_string(), "can you see this?".to_string(), false),
3398                    ]
3399            })
3400            .await;
3401
3402        // Ensure client A and B can communicate normally after reconnection.
3403        channel_a
3404            .update(&mut cx_a, |channel, cx| {
3405                channel.send_message("you online?".to_string(), cx).unwrap()
3406            })
3407            .await
3408            .unwrap();
3409        channel_b
3410            .condition(&cx_b, |channel, _| {
3411                channel_messages(channel)
3412                    == [
3413                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3414                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3415                        ("user_a".to_string(), "sup".to_string(), false),
3416                        ("user_b".to_string(), "can you see this?".to_string(), false),
3417                        ("user_a".to_string(), "you online?".to_string(), false),
3418                    ]
3419            })
3420            .await;
3421
3422        channel_b
3423            .update(&mut cx_b, |channel, cx| {
3424                channel.send_message("yep".to_string(), cx).unwrap()
3425            })
3426            .await
3427            .unwrap();
3428        channel_a
3429            .condition(&cx_a, |channel, _| {
3430                channel_messages(channel)
3431                    == [
3432                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3433                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3434                        ("user_a".to_string(), "sup".to_string(), false),
3435                        ("user_b".to_string(), "can you see this?".to_string(), false),
3436                        ("user_a".to_string(), "you online?".to_string(), false),
3437                        ("user_b".to_string(), "yep".to_string(), false),
3438                    ]
3439            })
3440            .await;
3441    }
3442
3443    #[gpui::test(iterations = 10)]
3444    async fn test_contacts(
3445        mut cx_a: TestAppContext,
3446        mut cx_b: TestAppContext,
3447        mut cx_c: TestAppContext,
3448        last_iteration: bool,
3449    ) {
3450        cx_a.foreground().forbid_parking();
3451        let lang_registry = Arc::new(LanguageRegistry::new());
3452        let fs = Arc::new(FakeFs::new(cx_a.background()));
3453
3454        // Connect to a server as 3 clients.
3455        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3456        let client_a = server.create_client(&mut cx_a, "user_a").await;
3457        let client_b = server.create_client(&mut cx_b, "user_b").await;
3458        let client_c = server.create_client(&mut cx_c, "user_c").await;
3459
3460        // Share a worktree as client A.
3461        fs.insert_tree(
3462            "/a",
3463            json!({
3464                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3465            }),
3466        )
3467        .await;
3468
3469        let project_a = cx_a.update(|cx| {
3470            Project::local(
3471                client_a.clone(),
3472                client_a.user_store.clone(),
3473                lang_registry.clone(),
3474                fs.clone(),
3475                cx,
3476            )
3477        });
3478        let (worktree_a, _) = project_a
3479            .update(&mut cx_a, |p, cx| {
3480                p.find_or_create_local_worktree("/a", false, cx)
3481            })
3482            .await
3483            .unwrap();
3484        worktree_a
3485            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3486            .await;
3487
3488        client_a
3489            .user_store
3490            .condition(&cx_a, |user_store, _| {
3491                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3492            })
3493            .await;
3494        client_b
3495            .user_store
3496            .condition(&cx_b, |user_store, _| {
3497                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3498            })
3499            .await;
3500        client_c
3501            .user_store
3502            .condition(&cx_c, |user_store, _| {
3503                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3504            })
3505            .await;
3506
3507        let project_id = project_a
3508            .update(&mut cx_a, |project, _| project.next_remote_id())
3509            .await;
3510        project_a
3511            .update(&mut cx_a, |project, cx| project.share(cx))
3512            .await
3513            .unwrap();
3514
3515        let _project_b = Project::remote(
3516            project_id,
3517            client_b.clone(),
3518            client_b.user_store.clone(),
3519            lang_registry.clone(),
3520            fs.clone(),
3521            &mut cx_b.to_async(),
3522        )
3523        .await
3524        .unwrap();
3525
3526        client_a
3527            .user_store
3528            .condition(&cx_a, |user_store, _| {
3529                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3530            })
3531            .await;
3532        client_b
3533            .user_store
3534            .condition(&cx_b, |user_store, _| {
3535                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3536            })
3537            .await;
3538        client_c
3539            .user_store
3540            .condition(&cx_c, |user_store, _| {
3541                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3542            })
3543            .await;
3544
3545        project_a
3546            .condition(&cx_a, |project, _| {
3547                project.collaborators().contains_key(&client_b.peer_id)
3548            })
3549            .await;
3550
3551        cx_a.update(move |_| drop(project_a));
3552        client_a
3553            .user_store
3554            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3555            .await;
3556        client_b
3557            .user_store
3558            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3559            .await;
3560        client_c
3561            .user_store
3562            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3563            .await;
3564
3565        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3566            user_store
3567                .contacts()
3568                .iter()
3569                .map(|contact| {
3570                    let worktrees = contact
3571                        .projects
3572                        .iter()
3573                        .map(|p| {
3574                            (
3575                                p.worktree_root_names[0].as_str(),
3576                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3577                            )
3578                        })
3579                        .collect();
3580                    (contact.user.github_login.as_str(), worktrees)
3581                })
3582                .collect()
3583        }
3584    }
3585
3586    #[gpui::test(iterations = 100)]
3587    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng, last_iteration: bool) {
3588        cx.foreground().forbid_parking();
3589        let max_peers = env::var("MAX_PEERS")
3590            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3591            .unwrap_or(5);
3592        let max_operations = env::var("OPERATIONS")
3593            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3594            .unwrap_or(10);
3595
3596        let rng = Rc::new(RefCell::new(rng));
3597        let lang_registry = Arc::new(LanguageRegistry::new());
3598        let fs = Arc::new(FakeFs::new(cx.background()));
3599        fs.insert_tree(
3600            "/_collab",
3601            json!({
3602                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3603            }),
3604        )
3605        .await;
3606
3607        let operations = Rc::new(Cell::new(0));
3608        let mut server = TestServer::start(cx.foreground(), last_iteration).await;
3609        let mut clients = Vec::new();
3610
3611        let mut next_entity_id = 100000;
3612        let mut host_cx = TestAppContext::new(
3613            cx.foreground_platform(),
3614            cx.platform(),
3615            cx.foreground(),
3616            cx.background(),
3617            cx.font_cache(),
3618            next_entity_id,
3619        );
3620        let host = server.create_client(&mut host_cx, "host").await;
3621        let host_project = host_cx.update(|cx| {
3622            Project::local(
3623                host.client.clone(),
3624                host.user_store.clone(),
3625                lang_registry.clone(),
3626                fs.clone(),
3627                cx,
3628            )
3629        });
3630        let host_project_id = host_project
3631            .update(&mut host_cx, |p, _| p.next_remote_id())
3632            .await;
3633
3634        let (collab_worktree, _) = host_project
3635            .update(&mut host_cx, |project, cx| {
3636                project.find_or_create_local_worktree("/_collab", false, cx)
3637            })
3638            .await
3639            .unwrap();
3640        collab_worktree
3641            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3642            .await;
3643        host_project
3644            .update(&mut host_cx, |project, cx| project.share(cx))
3645            .await
3646            .unwrap();
3647
3648        clients.push(cx.foreground().spawn(host.simulate_host(
3649            host_project.clone(),
3650            operations.clone(),
3651            max_operations,
3652            rng.clone(),
3653            host_cx.clone(),
3654        )));
3655
3656        while operations.get() < max_operations {
3657            cx.background().simulate_random_delay().await;
3658            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3659                operations.set(operations.get() + 1);
3660
3661                let guest_id = clients.len();
3662                log::info!("Adding guest {}", guest_id);
3663                next_entity_id += 100000;
3664                let mut guest_cx = TestAppContext::new(
3665                    cx.foreground_platform(),
3666                    cx.platform(),
3667                    cx.foreground(),
3668                    cx.background(),
3669                    cx.font_cache(),
3670                    next_entity_id,
3671                );
3672                let guest = server
3673                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3674                    .await;
3675                let guest_project = Project::remote(
3676                    host_project_id,
3677                    guest.client.clone(),
3678                    guest.user_store.clone(),
3679                    lang_registry.clone(),
3680                    fs.clone(),
3681                    &mut guest_cx.to_async(),
3682                )
3683                .await
3684                .unwrap();
3685                clients.push(cx.foreground().spawn(guest.simulate_guest(
3686                    guest_id,
3687                    guest_project,
3688                    operations.clone(),
3689                    max_operations,
3690                    rng.clone(),
3691                    guest_cx,
3692                )));
3693
3694                log::info!("Guest {} added", guest_id);
3695            }
3696        }
3697
3698        let clients = futures::future::join_all(clients).await;
3699        cx.foreground().run_until_parked();
3700
3701        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3702            project
3703                .worktrees(cx)
3704                .map(|worktree| {
3705                    let snapshot = worktree.read(cx).snapshot();
3706                    (snapshot.id(), snapshot)
3707                })
3708                .collect::<BTreeMap<_, _>>()
3709        });
3710
3711        for (guest_client, guest_cx) in clients.iter().skip(1) {
3712            let guest_id = guest_client.client.id();
3713            let worktree_snapshots =
3714                guest_client
3715                    .project
3716                    .as_ref()
3717                    .unwrap()
3718                    .read_with(guest_cx, |project, cx| {
3719                        project
3720                            .worktrees(cx)
3721                            .map(|worktree| {
3722                                let snapshot = worktree.read(cx).snapshot();
3723                                (snapshot.id(), snapshot)
3724                            })
3725                            .collect::<BTreeMap<_, _>>()
3726                    });
3727
3728            assert_eq!(
3729                worktree_snapshots.keys().collect::<Vec<_>>(),
3730                host_worktree_snapshots.keys().collect::<Vec<_>>(),
3731                "guest {} has different worktrees than the host",
3732                guest_id
3733            );
3734            for (id, host_snapshot) in &host_worktree_snapshots {
3735                let guest_snapshot = &worktree_snapshots[id];
3736                assert_eq!(
3737                    guest_snapshot.root_name(),
3738                    host_snapshot.root_name(),
3739                    "guest {} has different root name than the host for worktree {}",
3740                    guest_id,
3741                    id
3742                );
3743                assert_eq!(
3744                    guest_snapshot.entries(false).collect::<Vec<_>>(),
3745                    host_snapshot.entries(false).collect::<Vec<_>>(),
3746                    "guest {} has different snapshot than the host for worktree {}",
3747                    guest_id,
3748                    id
3749                );
3750            }
3751
3752            guest_client
3753                .project
3754                .as_ref()
3755                .unwrap()
3756                .read_with(guest_cx, |project, _| {
3757                    assert!(
3758                        !project.has_buffered_operations(),
3759                        "guest {} has buffered operations ",
3760                        guest_id,
3761                    );
3762                });
3763
3764            for guest_buffer in &guest_client.buffers {
3765                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
3766                let host_buffer = host_project.read_with(&host_cx, |project, _| {
3767                    project
3768                        .shared_buffer(guest_client.peer_id, buffer_id)
3769                        .expect(&format!(
3770                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
3771                            guest_id, guest_client.peer_id, buffer_id
3772                        ))
3773                });
3774                assert_eq!(
3775                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
3776                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
3777                    "guest {} buffer {} differs from the host's buffer",
3778                    guest_id,
3779                    buffer_id,
3780                );
3781            }
3782        }
3783    }
3784
3785    struct TestServer {
3786        peer: Arc<Peer>,
3787        app_state: Arc<AppState>,
3788        server: Arc<Server>,
3789        foreground: Rc<executor::Foreground>,
3790        notifications: mpsc::Receiver<()>,
3791        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3792        forbid_connections: Arc<AtomicBool>,
3793        _test_db: TestDb,
3794    }
3795
3796    impl TestServer {
3797        async fn start(foreground: Rc<executor::Foreground>, clean_db_pool_on_drop: bool) -> Self {
3798            let mut test_db = TestDb::new();
3799            test_db.set_clean_pool_on_drop(clean_db_pool_on_drop);
3800            let app_state = Self::build_app_state(&test_db).await;
3801            let peer = Peer::new();
3802            let notifications = mpsc::channel(128);
3803            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3804            Self {
3805                peer,
3806                app_state,
3807                server,
3808                foreground,
3809                notifications: notifications.1,
3810                connection_killers: Default::default(),
3811                forbid_connections: Default::default(),
3812                _test_db: test_db,
3813            }
3814        }
3815
3816        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3817            let http = FakeHttpClient::with_404_response();
3818            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3819            let client_name = name.to_string();
3820            let mut client = Client::new(http.clone());
3821            let server = self.server.clone();
3822            let connection_killers = self.connection_killers.clone();
3823            let forbid_connections = self.forbid_connections.clone();
3824            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3825
3826            Arc::get_mut(&mut client)
3827                .unwrap()
3828                .override_authenticate(move |cx| {
3829                    cx.spawn(|_| async move {
3830                        let access_token = "the-token".to_string();
3831                        Ok(Credentials {
3832                            user_id: user_id.0 as u64,
3833                            access_token,
3834                        })
3835                    })
3836                })
3837                .override_establish_connection(move |credentials, cx| {
3838                    assert_eq!(credentials.user_id, user_id.0 as u64);
3839                    assert_eq!(credentials.access_token, "the-token");
3840
3841                    let server = server.clone();
3842                    let connection_killers = connection_killers.clone();
3843                    let forbid_connections = forbid_connections.clone();
3844                    let client_name = client_name.clone();
3845                    let connection_id_tx = connection_id_tx.clone();
3846                    cx.spawn(move |cx| async move {
3847                        if forbid_connections.load(SeqCst) {
3848                            Err(EstablishConnectionError::other(anyhow!(
3849                                "server is forbidding connections"
3850                            )))
3851                        } else {
3852                            let (client_conn, server_conn, kill_conn) =
3853                                Connection::in_memory(cx.background());
3854                            connection_killers.lock().insert(user_id, kill_conn);
3855                            cx.background()
3856                                .spawn(server.handle_connection(
3857                                    server_conn,
3858                                    client_name,
3859                                    user_id,
3860                                    Some(connection_id_tx),
3861                                    cx.background(),
3862                                ))
3863                                .detach();
3864                            Ok(client_conn)
3865                        }
3866                    })
3867                });
3868
3869            client
3870                .authenticate_and_connect(&cx.to_async())
3871                .await
3872                .unwrap();
3873
3874            Channel::init(&client);
3875            Project::init(&client);
3876
3877            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3878            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3879            let mut authed_user =
3880                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3881            while authed_user.next().await.unwrap().is_none() {}
3882
3883            TestClient {
3884                client,
3885                peer_id,
3886                user_store,
3887                project: Default::default(),
3888                buffers: Default::default(),
3889            }
3890        }
3891
3892        fn disconnect_client(&self, user_id: UserId) {
3893            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3894                let _ = kill_conn.try_send(Some(()));
3895            }
3896        }
3897
3898        fn forbid_connections(&self) {
3899            self.forbid_connections.store(true, SeqCst);
3900        }
3901
3902        fn allow_connections(&self) {
3903            self.forbid_connections.store(false, SeqCst);
3904        }
3905
3906        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3907            let mut config = Config::default();
3908            config.session_secret = "a".repeat(32);
3909            config.database_url = test_db.url.clone();
3910            let github_client = github::AppClient::test();
3911            Arc::new(AppState {
3912                db: test_db.db().clone(),
3913                handlebars: Default::default(),
3914                auth_client: auth::build_client("", ""),
3915                repo_client: github::RepoClient::test(&github_client),
3916                github_client,
3917                config,
3918            })
3919        }
3920
3921        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3922            self.server.store.read()
3923        }
3924
3925        async fn condition<F>(&mut self, mut predicate: F)
3926        where
3927            F: FnMut(&Store) -> bool,
3928        {
3929            async_std::future::timeout(Duration::from_millis(500), async {
3930                while !(predicate)(&*self.server.store.read()) {
3931                    self.foreground.start_waiting();
3932                    self.notifications.next().await;
3933                    self.foreground.finish_waiting();
3934                }
3935            })
3936            .await
3937            .expect("condition timed out");
3938        }
3939    }
3940
3941    impl Drop for TestServer {
3942        fn drop(&mut self) {
3943            self.peer.reset();
3944        }
3945    }
3946
3947    struct TestClient {
3948        client: Arc<Client>,
3949        pub peer_id: PeerId,
3950        pub user_store: ModelHandle<UserStore>,
3951        project: Option<ModelHandle<Project>>,
3952        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3953    }
3954
3955    impl Deref for TestClient {
3956        type Target = Arc<Client>;
3957
3958        fn deref(&self) -> &Self::Target {
3959            &self.client
3960        }
3961    }
3962
3963    impl TestClient {
3964        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3965            UserId::from_proto(
3966                self.user_store
3967                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3968            )
3969        }
3970
3971        async fn simulate_host(
3972            mut self,
3973            project: ModelHandle<Project>,
3974            operations: Rc<Cell<usize>>,
3975            max_operations: usize,
3976            rng: Rc<RefCell<StdRng>>,
3977            mut cx: TestAppContext,
3978        ) -> (Self, TestAppContext) {
3979            let fs = project.read_with(&cx, |project, _| project.fs().clone());
3980            let mut files: Vec<PathBuf> = Default::default();
3981            while operations.get() < max_operations {
3982                operations.set(operations.get() + 1);
3983
3984                let distribution = rng.borrow_mut().gen_range(0..100);
3985                match distribution {
3986                    0..=20 if !files.is_empty() => {
3987                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
3988                        while let Some(parent_path) = path.parent() {
3989                            path = parent_path;
3990                            if rng.borrow_mut().gen() {
3991                                break;
3992                            }
3993                        }
3994
3995                        log::info!("Host: find/create local worktree {:?}", path);
3996                        project
3997                            .update(&mut cx, |project, cx| {
3998                                project.find_or_create_local_worktree(path, false, cx)
3999                            })
4000                            .await
4001                            .unwrap();
4002                    }
4003                    10..=80 if !files.is_empty() => {
4004                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4005                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4006                            let (worktree, path) = project
4007                                .update(&mut cx, |project, cx| {
4008                                    project.find_or_create_local_worktree(file, false, cx)
4009                                })
4010                                .await
4011                                .unwrap();
4012                            let project_path =
4013                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4014                            log::info!("Host: opening path {:?}", project_path);
4015                            let buffer = project
4016                                .update(&mut cx, |project, cx| {
4017                                    project.open_buffer(project_path, cx)
4018                                })
4019                                .await
4020                                .unwrap();
4021                            self.buffers.insert(buffer.clone());
4022                            buffer
4023                        } else {
4024                            self.buffers
4025                                .iter()
4026                                .choose(&mut *rng.borrow_mut())
4027                                .unwrap()
4028                                .clone()
4029                        };
4030
4031                        if rng.borrow_mut().gen_bool(0.1) {
4032                            cx.update(|cx| {
4033                                log::info!(
4034                                    "Host: dropping buffer {:?}",
4035                                    buffer.read(cx).file().unwrap().full_path(cx)
4036                                );
4037                                self.buffers.remove(&buffer);
4038                                drop(buffer);
4039                            });
4040                        } else {
4041                            buffer.update(&mut cx, |buffer, cx| {
4042                                log::info!(
4043                                    "Host: updating buffer {:?}",
4044                                    buffer.file().unwrap().full_path(cx)
4045                                );
4046                                buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4047                            });
4048                        }
4049                    }
4050                    _ => loop {
4051                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
4052                        let mut path = PathBuf::new();
4053                        path.push("/");
4054                        for _ in 0..path_component_count {
4055                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4056                            path.push(std::str::from_utf8(&[letter]).unwrap());
4057                        }
4058                        let parent_path = path.parent().unwrap();
4059
4060                        log::info!("Host: creating file {:?}", path);
4061                        if fs.create_dir(&parent_path).await.is_ok()
4062                            && fs.create_file(&path, Default::default()).await.is_ok()
4063                        {
4064                            files.push(path);
4065                            break;
4066                        } else {
4067                            log::info!("Host: cannot create file");
4068                        }
4069                    },
4070                }
4071
4072                cx.background().simulate_random_delay().await;
4073            }
4074
4075            self.project = Some(project);
4076            (self, cx)
4077        }
4078
4079        pub async fn simulate_guest(
4080            mut self,
4081            guest_id: usize,
4082            project: ModelHandle<Project>,
4083            operations: Rc<Cell<usize>>,
4084            max_operations: usize,
4085            rng: Rc<RefCell<StdRng>>,
4086            mut cx: TestAppContext,
4087        ) -> (Self, TestAppContext) {
4088            while operations.get() < max_operations {
4089                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4090                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4091                        project
4092                            .worktrees(&cx)
4093                            .filter(|worktree| {
4094                                worktree.read(cx).entries(false).any(|e| e.is_file())
4095                            })
4096                            .choose(&mut *rng.borrow_mut())
4097                    }) {
4098                        worktree
4099                    } else {
4100                        cx.background().simulate_random_delay().await;
4101                        continue;
4102                    };
4103
4104                    operations.set(operations.get() + 1);
4105                    let project_path = worktree.read_with(&cx, |worktree, _| {
4106                        let entry = worktree
4107                            .entries(false)
4108                            .filter(|e| e.is_file())
4109                            .choose(&mut *rng.borrow_mut())
4110                            .unwrap();
4111                        (worktree.id(), entry.path.clone())
4112                    });
4113                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4114                    let buffer = project
4115                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4116                        .await
4117                        .unwrap();
4118                    self.buffers.insert(buffer.clone());
4119                    buffer
4120                } else {
4121                    self.buffers
4122                        .iter()
4123                        .choose(&mut *rng.borrow_mut())
4124                        .unwrap()
4125                        .clone()
4126                };
4127
4128                if rng.borrow_mut().gen_bool(0.1) {
4129                    cx.update(|cx| {
4130                        log::info!(
4131                            "Guest {}: dropping buffer {:?}",
4132                            guest_id,
4133                            buffer.read(cx).file().unwrap().full_path(cx)
4134                        );
4135                        self.buffers.remove(&buffer);
4136                        drop(buffer);
4137                    });
4138                } else {
4139                    buffer.update(&mut cx, |buffer, cx| {
4140                        log::info!(
4141                            "Guest {}: updating buffer {:?}",
4142                            guest_id,
4143                            buffer.file().unwrap().full_path(cx)
4144                        );
4145                        buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4146                    });
4147                }
4148
4149                cx.background().simulate_random_delay().await;
4150            }
4151
4152            self.project = Some(project);
4153            (self, cx)
4154        }
4155    }
4156
4157    impl Executor for Arc<gpui::executor::Background> {
4158        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4159            self.spawn(future).detach();
4160        }
4161    }
4162
4163    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4164        channel
4165            .messages()
4166            .cursor::<()>()
4167            .map(|m| {
4168                (
4169                    m.sender.github_login.clone(),
4170                    m.body.clone(),
4171                    m.is_pending(),
4172                )
4173            })
4174            .collect()
4175    }
4176
4177    struct EmptyView;
4178
4179    impl gpui::Entity for EmptyView {
4180        type Event = ();
4181    }
4182
4183    impl gpui::View for EmptyView {
4184        fn ui_name() -> &'static str {
4185            "empty view"
4186        }
4187
4188        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4189            gpui::Element::boxed(gpui::elements::Empty)
4190        }
4191    }
4192}