rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44pub trait Executor {
  45    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  46}
  47
  48pub struct RealExecutor;
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_request_handler(Server::ping)
  69            .add_request_handler(Server::register_project)
  70            .add_message_handler(Server::unregister_project)
  71            .add_request_handler(Server::share_project)
  72            .add_message_handler(Server::unshare_project)
  73            .add_request_handler(Server::join_project)
  74            .add_message_handler(Server::leave_project)
  75            .add_request_handler(Server::register_worktree)
  76            .add_message_handler(Server::unregister_worktree)
  77            .add_request_handler(Server::share_worktree)
  78            .add_request_handler(Server::update_worktree)
  79            .add_message_handler(Server::update_diagnostic_summary)
  80            .add_message_handler(Server::disk_based_diagnostics_updating)
  81            .add_message_handler(Server::disk_based_diagnostics_updated)
  82            .add_request_handler(Server::get_definition)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::get_channels)
  96            .add_request_handler(Server::get_users)
  97            .add_request_handler(Server::join_channel)
  98            .add_message_handler(Server::leave_channel)
  99            .add_request_handler(Server::send_channel_message)
 100            .add_request_handler(Server::get_channel_messages);
 101
 102        Arc::new(server)
 103    }
 104
 105    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 106    where
 107        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 108        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 109        M: EnvelopedMessage,
 110    {
 111        let prev_handler = self.handlers.insert(
 112            TypeId::of::<M>(),
 113            Box::new(move |server, envelope| {
 114                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 115                (handler)(server, *envelope).boxed()
 116            }),
 117        );
 118        if prev_handler.is_some() {
 119            panic!("registered a handler for the same message twice");
 120        }
 121        self
 122    }
 123
 124    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 128        M: RequestMessage,
 129    {
 130        self.add_message_handler(move |server, envelope| {
 131            let receipt = envelope.receipt();
 132            let response = (handler)(server.clone(), envelope);
 133            async move {
 134                match response.await {
 135                    Ok(response) => {
 136                        server.peer.respond(receipt, response)?;
 137                        Ok(())
 138                    }
 139                    Err(error) => {
 140                        server.peer.respond_with_error(
 141                            receipt,
 142                            proto::Error {
 143                                message: error.to_string(),
 144                            },
 145                        )?;
 146                        Err(error)
 147                    }
 148                }
 149            }
 150        })
 151    }
 152
 153    pub fn handle_connection<E: Executor>(
 154        self: &Arc<Self>,
 155        connection: Connection,
 156        addr: String,
 157        user_id: UserId,
 158        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 159        executor: E,
 160    ) -> impl Future<Output = ()> {
 161        let mut this = self.clone();
 162        async move {
 163            let (connection_id, handle_io, mut incoming_rx) =
 164                this.peer.add_connection(connection).await;
 165
 166            if let Some(send_connection_id) = send_connection_id.as_mut() {
 167                let _ = send_connection_id.send(connection_id).await;
 168            }
 169
 170            this.state_mut().add_connection(connection_id, user_id);
 171            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 172                log::error!("error updating contacts for {:?}: {}", user_id, err);
 173            }
 174
 175            let handle_io = handle_io.fuse();
 176            futures::pin_mut!(handle_io);
 177            loop {
 178                let next_message = incoming_rx.next().fuse();
 179                futures::pin_mut!(next_message);
 180                futures::select_biased! {
 181                    result = handle_io => {
 182                        if let Err(err) = result {
 183                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 184                        }
 185                        break;
 186                    }
 187                    message = next_message => {
 188                        if let Some(message) = message {
 189                            let start_time = Instant::now();
 190                            let type_name = message.payload_type_name();
 191                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 192                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 193                                let notifications = this.notifications.clone();
 194                                let is_background = message.is_background();
 195                                let handle_message = (handler)(this.clone(), message);
 196                                let handle_message = async move {
 197                                    if let Err(err) = handle_message.await {
 198                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 199                                    } else {
 200                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 201                                    }
 202                                    if let Some(mut notifications) = notifications {
 203                                        let _ = notifications.send(()).await;
 204                                    }
 205                                };
 206                                if is_background {
 207                                    executor.spawn_detached(handle_message);
 208                                } else {
 209                                    handle_message.await;
 210                                }
 211                            } else {
 212                                log::warn!("unhandled message: {}", type_name);
 213                            }
 214                        } else {
 215                            log::info!("rpc connection closed {:?}", addr);
 216                            break;
 217                        }
 218                    }
 219                }
 220            }
 221
 222            if let Err(err) = this.sign_out(connection_id).await {
 223                log::error!("error signing out connection {:?} - {:?}", addr, err);
 224            }
 225        }
 226    }
 227
 228    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 229        self.peer.disconnect(connection_id);
 230        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 231
 232        for (project_id, project) in removed_connection.hosted_projects {
 233            if let Some(share) = project.share {
 234                broadcast(
 235                    connection_id,
 236                    share.guests.keys().copied().collect(),
 237                    |conn_id| {
 238                        self.peer
 239                            .send(conn_id, proto::UnshareProject { project_id })
 240                    },
 241                )?;
 242            }
 243        }
 244
 245        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 246            broadcast(connection_id, peer_ids, |conn_id| {
 247                self.peer.send(
 248                    conn_id,
 249                    proto::RemoveProjectCollaborator {
 250                        project_id,
 251                        peer_id: connection_id.0,
 252                    },
 253                )
 254            })?;
 255        }
 256
 257        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 258        Ok(())
 259    }
 260
 261    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 262        Ok(proto::Ack {})
 263    }
 264
 265    async fn register_project(
 266        mut self: Arc<Server>,
 267        request: TypedEnvelope<proto::RegisterProject>,
 268    ) -> tide::Result<proto::RegisterProjectResponse> {
 269        let project_id = {
 270            let mut state = self.state_mut();
 271            let user_id = state.user_id_for_connection(request.sender_id)?;
 272            state.register_project(request.sender_id, user_id)
 273        };
 274        Ok(proto::RegisterProjectResponse { project_id })
 275    }
 276
 277    async fn unregister_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::UnregisterProject>,
 280    ) -> tide::Result<()> {
 281        let project = self
 282            .state_mut()
 283            .unregister_project(request.payload.project_id, request.sender_id)?;
 284        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 285        Ok(())
 286    }
 287
 288    async fn share_project(
 289        mut self: Arc<Server>,
 290        request: TypedEnvelope<proto::ShareProject>,
 291    ) -> tide::Result<proto::Ack> {
 292        self.state_mut()
 293            .share_project(request.payload.project_id, request.sender_id);
 294        Ok(proto::Ack {})
 295    }
 296
 297    async fn unshare_project(
 298        mut self: Arc<Server>,
 299        request: TypedEnvelope<proto::UnshareProject>,
 300    ) -> tide::Result<()> {
 301        let project_id = request.payload.project_id;
 302        let project = self
 303            .state_mut()
 304            .unshare_project(project_id, request.sender_id)?;
 305
 306        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 307            self.peer
 308                .send(conn_id, proto::UnshareProject { project_id })
 309        })?;
 310        self.update_contacts_for_users(&project.authorized_user_ids)?;
 311        Ok(())
 312    }
 313
 314    async fn join_project(
 315        mut self: Arc<Server>,
 316        request: TypedEnvelope<proto::JoinProject>,
 317    ) -> tide::Result<proto::JoinProjectResponse> {
 318        let project_id = request.payload.project_id;
 319
 320        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 321        let (response, connection_ids, contact_user_ids) = self
 322            .state_mut()
 323            .join_project(request.sender_id, user_id, project_id)
 324            .and_then(|joined| {
 325                let share = joined.project.share()?;
 326                let peer_count = share.guests.len();
 327                let mut collaborators = Vec::with_capacity(peer_count);
 328                collaborators.push(proto::Collaborator {
 329                    peer_id: joined.project.host_connection_id.0,
 330                    replica_id: 0,
 331                    user_id: joined.project.host_user_id.to_proto(),
 332                });
 333                let worktrees = joined
 334                    .project
 335                    .worktrees
 336                    .iter()
 337                    .filter_map(|(id, worktree)| {
 338                        worktree.share.as_ref().map(|share| proto::Worktree {
 339                            id: *id,
 340                            root_name: worktree.root_name.clone(),
 341                            entries: share.entries.values().cloned().collect(),
 342                            diagnostic_summaries: share
 343                                .diagnostic_summaries
 344                                .values()
 345                                .cloned()
 346                                .collect(),
 347                            weak: worktree.weak,
 348                            next_update_id: share.next_update_id as u64,
 349                        })
 350                    })
 351                    .collect();
 352                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 353                    if *peer_conn_id != request.sender_id {
 354                        collaborators.push(proto::Collaborator {
 355                            peer_id: peer_conn_id.0,
 356                            replica_id: *peer_replica_id as u32,
 357                            user_id: peer_user_id.to_proto(),
 358                        });
 359                    }
 360                }
 361                let response = proto::JoinProjectResponse {
 362                    worktrees,
 363                    replica_id: joined.replica_id as u32,
 364                    collaborators,
 365                };
 366                let connection_ids = joined.project.connection_ids();
 367                let contact_user_ids = joined.project.authorized_user_ids();
 368                Ok((response, connection_ids, contact_user_ids))
 369            })?;
 370
 371        broadcast(request.sender_id, connection_ids, |conn_id| {
 372            self.peer.send(
 373                conn_id,
 374                proto::AddProjectCollaborator {
 375                    project_id,
 376                    collaborator: Some(proto::Collaborator {
 377                        peer_id: request.sender_id.0,
 378                        replica_id: response.replica_id,
 379                        user_id: user_id.to_proto(),
 380                    }),
 381                },
 382            )
 383        })?;
 384        self.update_contacts_for_users(&contact_user_ids)?;
 385        Ok(response)
 386    }
 387
 388    async fn leave_project(
 389        mut self: Arc<Server>,
 390        request: TypedEnvelope<proto::LeaveProject>,
 391    ) -> tide::Result<()> {
 392        let sender_id = request.sender_id;
 393        let project_id = request.payload.project_id;
 394        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 395
 396        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::RemoveProjectCollaborator {
 400                    project_id,
 401                    peer_id: sender_id.0,
 402                },
 403            )
 404        })?;
 405        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 406
 407        Ok(())
 408    }
 409
 410    async fn register_worktree(
 411        mut self: Arc<Server>,
 412        request: TypedEnvelope<proto::RegisterWorktree>,
 413    ) -> tide::Result<proto::Ack> {
 414        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 415
 416        let mut contact_user_ids = HashSet::default();
 417        contact_user_ids.insert(host_user_id);
 418        for github_login in request.payload.authorized_logins {
 419            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 420            contact_user_ids.insert(contact_user_id);
 421        }
 422
 423        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 424        self.state_mut().register_worktree(
 425            request.payload.project_id,
 426            request.payload.worktree_id,
 427            request.sender_id,
 428            Worktree {
 429                authorized_user_ids: contact_user_ids.clone(),
 430                root_name: request.payload.root_name,
 431                share: None,
 432                weak: false,
 433            },
 434        )?;
 435        self.update_contacts_for_users(&contact_user_ids)?;
 436        Ok(proto::Ack {})
 437    }
 438
 439    async fn unregister_worktree(
 440        mut self: Arc<Server>,
 441        request: TypedEnvelope<proto::UnregisterWorktree>,
 442    ) -> tide::Result<()> {
 443        let project_id = request.payload.project_id;
 444        let worktree_id = request.payload.worktree_id;
 445        let (worktree, guest_connection_ids) =
 446            self.state_mut()
 447                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 448        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 449            self.peer.send(
 450                conn_id,
 451                proto::UnregisterWorktree {
 452                    project_id,
 453                    worktree_id,
 454                },
 455            )
 456        })?;
 457        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 458        Ok(())
 459    }
 460
 461    async fn share_worktree(
 462        mut self: Arc<Server>,
 463        mut request: TypedEnvelope<proto::ShareWorktree>,
 464    ) -> tide::Result<proto::Ack> {
 465        let worktree = request
 466            .payload
 467            .worktree
 468            .as_mut()
 469            .ok_or_else(|| anyhow!("missing worktree"))?;
 470        let entries = worktree
 471            .entries
 472            .iter()
 473            .map(|entry| (entry.id, entry.clone()))
 474            .collect();
 475        let diagnostic_summaries = worktree
 476            .diagnostic_summaries
 477            .iter()
 478            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 479            .collect();
 480
 481        let shared_worktree = self.state_mut().share_worktree(
 482            request.payload.project_id,
 483            worktree.id,
 484            request.sender_id,
 485            entries,
 486            diagnostic_summaries,
 487            worktree.next_update_id,
 488        )?;
 489
 490        broadcast(
 491            request.sender_id,
 492            shared_worktree.connection_ids,
 493            |connection_id| {
 494                self.peer
 495                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 496            },
 497        )?;
 498        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 499
 500        Ok(proto::Ack {})
 501    }
 502
 503    async fn update_worktree(
 504        mut self: Arc<Server>,
 505        request: TypedEnvelope<proto::UpdateWorktree>,
 506    ) -> tide::Result<proto::Ack> {
 507        let connection_ids = self.state_mut().update_worktree(
 508            request.sender_id,
 509            request.payload.project_id,
 510            request.payload.worktree_id,
 511            request.payload.id,
 512            &request.payload.removed_entries,
 513            &request.payload.updated_entries,
 514        )?;
 515
 516        broadcast(request.sender_id, connection_ids, |connection_id| {
 517            self.peer
 518                .forward_send(request.sender_id, connection_id, request.payload.clone())
 519        })?;
 520
 521        Ok(proto::Ack {})
 522    }
 523
 524    async fn update_diagnostic_summary(
 525        mut self: Arc<Server>,
 526        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 527    ) -> tide::Result<()> {
 528        let summary = request
 529            .payload
 530            .summary
 531            .clone()
 532            .ok_or_else(|| anyhow!("invalid summary"))?;
 533        let receiver_ids = self.state_mut().update_diagnostic_summary(
 534            request.payload.project_id,
 535            request.payload.worktree_id,
 536            request.sender_id,
 537            summary,
 538        )?;
 539
 540        broadcast(request.sender_id, receiver_ids, |connection_id| {
 541            self.peer
 542                .forward_send(request.sender_id, connection_id, request.payload.clone())
 543        })?;
 544        Ok(())
 545    }
 546
 547    async fn disk_based_diagnostics_updating(
 548        self: Arc<Server>,
 549        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 550    ) -> tide::Result<()> {
 551        let receiver_ids = self
 552            .state()
 553            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 554        broadcast(request.sender_id, receiver_ids, |connection_id| {
 555            self.peer
 556                .forward_send(request.sender_id, connection_id, request.payload.clone())
 557        })?;
 558        Ok(())
 559    }
 560
 561    async fn disk_based_diagnostics_updated(
 562        self: Arc<Server>,
 563        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 564    ) -> tide::Result<()> {
 565        let receiver_ids = self
 566            .state()
 567            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 568        broadcast(request.sender_id, receiver_ids, |connection_id| {
 569            self.peer
 570                .forward_send(request.sender_id, connection_id, request.payload.clone())
 571        })?;
 572        Ok(())
 573    }
 574
 575    async fn get_definition(
 576        self: Arc<Server>,
 577        request: TypedEnvelope<proto::GetDefinition>,
 578    ) -> tide::Result<proto::GetDefinitionResponse> {
 579        let host_connection_id = self
 580            .state()
 581            .read_project(request.payload.project_id, request.sender_id)?
 582            .host_connection_id;
 583        Ok(self
 584            .peer
 585            .forward_request(request.sender_id, host_connection_id, request.payload)
 586            .await?)
 587    }
 588
 589    async fn open_buffer(
 590        self: Arc<Server>,
 591        request: TypedEnvelope<proto::OpenBuffer>,
 592    ) -> tide::Result<proto::OpenBufferResponse> {
 593        let host_connection_id = self
 594            .state()
 595            .read_project(request.payload.project_id, request.sender_id)?
 596            .host_connection_id;
 597        Ok(self
 598            .peer
 599            .forward_request(request.sender_id, host_connection_id, request.payload)
 600            .await?)
 601    }
 602
 603    async fn close_buffer(
 604        self: Arc<Server>,
 605        request: TypedEnvelope<proto::CloseBuffer>,
 606    ) -> tide::Result<()> {
 607        let host_connection_id = self
 608            .state()
 609            .read_project(request.payload.project_id, request.sender_id)?
 610            .host_connection_id;
 611        self.peer
 612            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 613        Ok(())
 614    }
 615
 616    async fn save_buffer(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::SaveBuffer>,
 619    ) -> tide::Result<proto::BufferSaved> {
 620        let host;
 621        let mut guests;
 622        {
 623            let state = self.state();
 624            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 625            host = project.host_connection_id;
 626            guests = project.guest_connection_ids()
 627        }
 628
 629        let response = self
 630            .peer
 631            .forward_request(request.sender_id, host, request.payload.clone())
 632            .await?;
 633
 634        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 635        broadcast(host, guests, |conn_id| {
 636            self.peer.forward_send(host, conn_id, response.clone())
 637        })?;
 638
 639        Ok(response)
 640    }
 641
 642    async fn format_buffers(
 643        self: Arc<Server>,
 644        request: TypedEnvelope<proto::FormatBuffers>,
 645    ) -> tide::Result<proto::FormatBuffersResponse> {
 646        let host = self
 647            .state()
 648            .read_project(request.payload.project_id, request.sender_id)?
 649            .host_connection_id;
 650        Ok(self
 651            .peer
 652            .forward_request(request.sender_id, host, request.payload.clone())
 653            .await?)
 654    }
 655
 656    async fn get_completions(
 657        self: Arc<Server>,
 658        request: TypedEnvelope<proto::GetCompletions>,
 659    ) -> tide::Result<proto::GetCompletionsResponse> {
 660        let host = self
 661            .state()
 662            .read_project(request.payload.project_id, request.sender_id)?
 663            .host_connection_id;
 664        Ok(self
 665            .peer
 666            .forward_request(request.sender_id, host, request.payload.clone())
 667            .await?)
 668    }
 669
 670    async fn apply_additional_edits_for_completion(
 671        self: Arc<Server>,
 672        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 673    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 674        let host = self
 675            .state()
 676            .read_project(request.payload.project_id, request.sender_id)?
 677            .host_connection_id;
 678        Ok(self
 679            .peer
 680            .forward_request(request.sender_id, host, request.payload.clone())
 681            .await?)
 682    }
 683
 684    async fn get_code_actions(
 685        self: Arc<Server>,
 686        request: TypedEnvelope<proto::GetCodeActions>,
 687    ) -> tide::Result<proto::GetCodeActionsResponse> {
 688        let host = self
 689            .state()
 690            .read_project(request.payload.project_id, request.sender_id)?
 691            .host_connection_id;
 692        Ok(self
 693            .peer
 694            .forward_request(request.sender_id, host, request.payload.clone())
 695            .await?)
 696    }
 697
 698    async fn apply_code_action(
 699        self: Arc<Server>,
 700        request: TypedEnvelope<proto::ApplyCodeAction>,
 701    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 702        let host = self
 703            .state()
 704            .read_project(request.payload.project_id, request.sender_id)?
 705            .host_connection_id;
 706        Ok(self
 707            .peer
 708            .forward_request(request.sender_id, host, request.payload.clone())
 709            .await?)
 710    }
 711
 712    async fn update_buffer(
 713        self: Arc<Server>,
 714        request: TypedEnvelope<proto::UpdateBuffer>,
 715    ) -> tide::Result<proto::Ack> {
 716        let receiver_ids = self
 717            .state()
 718            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 719        broadcast(request.sender_id, receiver_ids, |connection_id| {
 720            self.peer
 721                .forward_send(request.sender_id, connection_id, request.payload.clone())
 722        })?;
 723        Ok(proto::Ack {})
 724    }
 725
 726    async fn update_buffer_file(
 727        self: Arc<Server>,
 728        request: TypedEnvelope<proto::UpdateBufferFile>,
 729    ) -> tide::Result<()> {
 730        let receiver_ids = self
 731            .state()
 732            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 733        broadcast(request.sender_id, receiver_ids, |connection_id| {
 734            self.peer
 735                .forward_send(request.sender_id, connection_id, request.payload.clone())
 736        })?;
 737        Ok(())
 738    }
 739
 740    async fn buffer_reloaded(
 741        self: Arc<Server>,
 742        request: TypedEnvelope<proto::BufferReloaded>,
 743    ) -> tide::Result<()> {
 744        let receiver_ids = self
 745            .state()
 746            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 747        broadcast(request.sender_id, receiver_ids, |connection_id| {
 748            self.peer
 749                .forward_send(request.sender_id, connection_id, request.payload.clone())
 750        })?;
 751        Ok(())
 752    }
 753
 754    async fn buffer_saved(
 755        self: Arc<Server>,
 756        request: TypedEnvelope<proto::BufferSaved>,
 757    ) -> tide::Result<()> {
 758        let receiver_ids = self
 759            .state()
 760            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 761        broadcast(request.sender_id, receiver_ids, |connection_id| {
 762            self.peer
 763                .forward_send(request.sender_id, connection_id, request.payload.clone())
 764        })?;
 765        Ok(())
 766    }
 767
 768    async fn get_channels(
 769        self: Arc<Server>,
 770        request: TypedEnvelope<proto::GetChannels>,
 771    ) -> tide::Result<proto::GetChannelsResponse> {
 772        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 773        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 774        Ok(proto::GetChannelsResponse {
 775            channels: channels
 776                .into_iter()
 777                .map(|chan| proto::Channel {
 778                    id: chan.id.to_proto(),
 779                    name: chan.name,
 780                })
 781                .collect(),
 782        })
 783    }
 784
 785    async fn get_users(
 786        self: Arc<Server>,
 787        request: TypedEnvelope<proto::GetUsers>,
 788    ) -> tide::Result<proto::GetUsersResponse> {
 789        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 790        let users = self
 791            .app_state
 792            .db
 793            .get_users_by_ids(user_ids)
 794            .await?
 795            .into_iter()
 796            .map(|user| proto::User {
 797                id: user.id.to_proto(),
 798                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 799                github_login: user.github_login,
 800            })
 801            .collect();
 802        Ok(proto::GetUsersResponse { users })
 803    }
 804
 805    fn update_contacts_for_users<'a>(
 806        self: &Arc<Server>,
 807        user_ids: impl IntoIterator<Item = &'a UserId>,
 808    ) -> anyhow::Result<()> {
 809        let mut result = Ok(());
 810        let state = self.state();
 811        for user_id in user_ids {
 812            let contacts = state.contacts_for_user(*user_id);
 813            for connection_id in state.connection_ids_for_user(*user_id) {
 814                if let Err(error) = self.peer.send(
 815                    connection_id,
 816                    proto::UpdateContacts {
 817                        contacts: contacts.clone(),
 818                    },
 819                ) {
 820                    result = Err(error);
 821                }
 822            }
 823        }
 824        result
 825    }
 826
 827    async fn join_channel(
 828        mut self: Arc<Self>,
 829        request: TypedEnvelope<proto::JoinChannel>,
 830    ) -> tide::Result<proto::JoinChannelResponse> {
 831        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 832        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 833        if !self
 834            .app_state
 835            .db
 836            .can_user_access_channel(user_id, channel_id)
 837            .await?
 838        {
 839            Err(anyhow!("access denied"))?;
 840        }
 841
 842        self.state_mut().join_channel(request.sender_id, channel_id);
 843        let messages = self
 844            .app_state
 845            .db
 846            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 847            .await?
 848            .into_iter()
 849            .map(|msg| proto::ChannelMessage {
 850                id: msg.id.to_proto(),
 851                body: msg.body,
 852                timestamp: msg.sent_at.unix_timestamp() as u64,
 853                sender_id: msg.sender_id.to_proto(),
 854                nonce: Some(msg.nonce.as_u128().into()),
 855            })
 856            .collect::<Vec<_>>();
 857        Ok(proto::JoinChannelResponse {
 858            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 859            messages,
 860        })
 861    }
 862
 863    async fn leave_channel(
 864        mut self: Arc<Self>,
 865        request: TypedEnvelope<proto::LeaveChannel>,
 866    ) -> tide::Result<()> {
 867        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 868        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 869        if !self
 870            .app_state
 871            .db
 872            .can_user_access_channel(user_id, channel_id)
 873            .await?
 874        {
 875            Err(anyhow!("access denied"))?;
 876        }
 877
 878        self.state_mut()
 879            .leave_channel(request.sender_id, channel_id);
 880
 881        Ok(())
 882    }
 883
 884    async fn send_channel_message(
 885        self: Arc<Self>,
 886        request: TypedEnvelope<proto::SendChannelMessage>,
 887    ) -> tide::Result<proto::SendChannelMessageResponse> {
 888        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 889        let user_id;
 890        let connection_ids;
 891        {
 892            let state = self.state();
 893            user_id = state.user_id_for_connection(request.sender_id)?;
 894            connection_ids = state.channel_connection_ids(channel_id)?;
 895        }
 896
 897        // Validate the message body.
 898        let body = request.payload.body.trim().to_string();
 899        if body.len() > MAX_MESSAGE_LEN {
 900            return Err(anyhow!("message is too long"))?;
 901        }
 902        if body.is_empty() {
 903            return Err(anyhow!("message can't be blank"))?;
 904        }
 905
 906        let timestamp = OffsetDateTime::now_utc();
 907        let nonce = request
 908            .payload
 909            .nonce
 910            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 911
 912        let message_id = self
 913            .app_state
 914            .db
 915            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 916            .await?
 917            .to_proto();
 918        let message = proto::ChannelMessage {
 919            sender_id: user_id.to_proto(),
 920            id: message_id,
 921            body,
 922            timestamp: timestamp.unix_timestamp() as u64,
 923            nonce: Some(nonce),
 924        };
 925        broadcast(request.sender_id, connection_ids, |conn_id| {
 926            self.peer.send(
 927                conn_id,
 928                proto::ChannelMessageSent {
 929                    channel_id: channel_id.to_proto(),
 930                    message: Some(message.clone()),
 931                },
 932            )
 933        })?;
 934        Ok(proto::SendChannelMessageResponse {
 935            message: Some(message),
 936        })
 937    }
 938
 939    async fn get_channel_messages(
 940        self: Arc<Self>,
 941        request: TypedEnvelope<proto::GetChannelMessages>,
 942    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 943        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 944        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 945        if !self
 946            .app_state
 947            .db
 948            .can_user_access_channel(user_id, channel_id)
 949            .await?
 950        {
 951            Err(anyhow!("access denied"))?;
 952        }
 953
 954        let messages = self
 955            .app_state
 956            .db
 957            .get_channel_messages(
 958                channel_id,
 959                MESSAGE_COUNT_PER_PAGE,
 960                Some(MessageId::from_proto(request.payload.before_message_id)),
 961            )
 962            .await?
 963            .into_iter()
 964            .map(|msg| proto::ChannelMessage {
 965                id: msg.id.to_proto(),
 966                body: msg.body,
 967                timestamp: msg.sent_at.unix_timestamp() as u64,
 968                sender_id: msg.sender_id.to_proto(),
 969                nonce: Some(msg.nonce.as_u128().into()),
 970            })
 971            .collect::<Vec<_>>();
 972
 973        Ok(proto::GetChannelMessagesResponse {
 974            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 975            messages,
 976        })
 977    }
 978
 979    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 980        self.store.read()
 981    }
 982
 983    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 984        self.store.write()
 985    }
 986}
 987
 988impl Executor for RealExecutor {
 989    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 990        task::spawn(future);
 991    }
 992}
 993
 994fn broadcast<F>(
 995    sender_id: ConnectionId,
 996    receiver_ids: Vec<ConnectionId>,
 997    mut f: F,
 998) -> anyhow::Result<()>
 999where
1000    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1001{
1002    let mut result = Ok(());
1003    for receiver_id in receiver_ids {
1004        if receiver_id != sender_id {
1005            if let Err(error) = f(receiver_id) {
1006                if result.is_ok() {
1007                    result = Err(error);
1008                }
1009            }
1010        }
1011    }
1012    result
1013}
1014
1015pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1016    let server = Server::new(app.state().clone(), rpc.clone(), None);
1017    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1018        let server = server.clone();
1019        async move {
1020            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1021
1022            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1023            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1024            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1025            let client_protocol_version: Option<u32> = request
1026                .header("X-Zed-Protocol-Version")
1027                .and_then(|v| v.as_str().parse().ok());
1028
1029            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1030                return Ok(Response::new(StatusCode::UpgradeRequired));
1031            }
1032
1033            let header = match request.header("Sec-Websocket-Key") {
1034                Some(h) => h.as_str(),
1035                None => return Err(anyhow!("expected sec-websocket-key"))?,
1036            };
1037
1038            let user_id = process_auth_header(&request).await?;
1039
1040            let mut response = Response::new(StatusCode::SwitchingProtocols);
1041            response.insert_header(UPGRADE, "websocket");
1042            response.insert_header(CONNECTION, "Upgrade");
1043            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1044            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1045            response.insert_header("Sec-Websocket-Version", "13");
1046
1047            let http_res: &mut tide::http::Response = response.as_mut();
1048            let upgrade_receiver = http_res.recv_upgrade().await;
1049            let addr = request.remote().unwrap_or("unknown").to_string();
1050            task::spawn(async move {
1051                if let Some(stream) = upgrade_receiver.await {
1052                    server
1053                        .handle_connection(
1054                            Connection::new(
1055                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1056                            ),
1057                            addr,
1058                            user_id,
1059                            None,
1060                            RealExecutor,
1061                        )
1062                        .await;
1063                }
1064            });
1065
1066            Ok(response)
1067        }
1068    });
1069}
1070
1071fn header_contains_ignore_case<T>(
1072    request: &tide::Request<T>,
1073    header_name: HeaderName,
1074    value: &str,
1075) -> bool {
1076    request
1077        .header(header_name)
1078        .map(|h| {
1079            h.as_str()
1080                .split(',')
1081                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1082        })
1083        .unwrap_or(false)
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088    use super::*;
1089    use crate::{
1090        auth,
1091        db::{tests::TestDb, UserId},
1092        github, AppState, Config,
1093    };
1094    use ::rpc::Peer;
1095    use collections::BTreeMap;
1096    use gpui::{executor, ModelHandle, TestAppContext};
1097    use parking_lot::Mutex;
1098    use postage::{mpsc, watch};
1099    use rand::prelude::*;
1100    use rpc::PeerId;
1101    use serde_json::json;
1102    use sqlx::types::time::OffsetDateTime;
1103    use std::{
1104        cell::{Cell, RefCell},
1105        env,
1106        ops::Deref,
1107        path::Path,
1108        rc::Rc,
1109        sync::{
1110            atomic::{AtomicBool, Ordering::SeqCst},
1111            Arc,
1112        },
1113        time::Duration,
1114    };
1115    use zed::{
1116        client::{
1117            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1118            EstablishConnectionError, UserStore,
1119        },
1120        editor::{
1121            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1122            Redo, ToggleCodeActions, Undo,
1123        },
1124        fs::{FakeFs, Fs as _},
1125        language::{
1126            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1127            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1128        },
1129        lsp,
1130        project::{DiagnosticSummary, Project, ProjectPath},
1131        workspace::{Workspace, WorkspaceParams},
1132    };
1133
1134    #[cfg(test)]
1135    #[ctor::ctor]
1136    fn init_logger() {
1137        if std::env::var("RUST_LOG").is_ok() {
1138            env_logger::init();
1139        }
1140    }
1141
1142    #[gpui::test(iterations = 10)]
1143    async fn test_share_project(
1144        mut cx_a: TestAppContext,
1145        mut cx_b: TestAppContext,
1146        last_iteration: bool,
1147    ) {
1148        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1149        let lang_registry = Arc::new(LanguageRegistry::new());
1150        let fs = Arc::new(FakeFs::new(cx_a.background()));
1151        cx_a.foreground().forbid_parking();
1152
1153        // Connect to a server as 2 clients.
1154        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1155        let client_a = server.create_client(&mut cx_a, "user_a").await;
1156        let client_b = server.create_client(&mut cx_b, "user_b").await;
1157
1158        // Share a project as client A
1159        fs.insert_tree(
1160            "/a",
1161            json!({
1162                ".zed.toml": r#"collaborators = ["user_b"]"#,
1163                "a.txt": "a-contents",
1164                "b.txt": "b-contents",
1165            }),
1166        )
1167        .await;
1168        let project_a = cx_a.update(|cx| {
1169            Project::local(
1170                client_a.clone(),
1171                client_a.user_store.clone(),
1172                lang_registry.clone(),
1173                fs.clone(),
1174                cx,
1175            )
1176        });
1177        let (worktree_a, _) = project_a
1178            .update(&mut cx_a, |p, cx| {
1179                p.find_or_create_local_worktree("/a", false, cx)
1180            })
1181            .await
1182            .unwrap();
1183        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1184        worktree_a
1185            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1186            .await;
1187        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1188        project_a
1189            .update(&mut cx_a, |p, cx| p.share(cx))
1190            .await
1191            .unwrap();
1192
1193        // Join that project as client B
1194        let project_b = Project::remote(
1195            project_id,
1196            client_b.clone(),
1197            client_b.user_store.clone(),
1198            lang_registry.clone(),
1199            fs.clone(),
1200            &mut cx_b.to_async(),
1201        )
1202        .await
1203        .unwrap();
1204
1205        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1206            assert_eq!(
1207                project
1208                    .collaborators()
1209                    .get(&client_a.peer_id)
1210                    .unwrap()
1211                    .user
1212                    .github_login,
1213                "user_a"
1214            );
1215            project.replica_id()
1216        });
1217        project_a
1218            .condition(&cx_a, |tree, _| {
1219                tree.collaborators()
1220                    .get(&client_b.peer_id)
1221                    .map_or(false, |collaborator| {
1222                        collaborator.replica_id == replica_id_b
1223                            && collaborator.user.github_login == "user_b"
1224                    })
1225            })
1226            .await;
1227
1228        // Open the same file as client B and client A.
1229        let buffer_b = project_b
1230            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1231            .await
1232            .unwrap();
1233        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1234        buffer_b.read_with(&cx_b, |buf, cx| {
1235            assert_eq!(buf.read(cx).text(), "b-contents")
1236        });
1237        project_a.read_with(&cx_a, |project, cx| {
1238            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1239        });
1240        let buffer_a = project_a
1241            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1242            .await
1243            .unwrap();
1244
1245        let editor_b = cx_b.add_view(window_b, |cx| {
1246            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1247        });
1248
1249        // TODO
1250        // // Create a selection set as client B and see that selection set as client A.
1251        // buffer_a
1252        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1253        //     .await;
1254
1255        // Edit the buffer as client B and see that edit as client A.
1256        editor_b.update(&mut cx_b, |editor, cx| {
1257            editor.handle_input(&Input("ok, ".into()), cx)
1258        });
1259        buffer_a
1260            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1261            .await;
1262
1263        // TODO
1264        // // Remove the selection set as client B, see those selections disappear as client A.
1265        cx_b.update(move |_| drop(editor_b));
1266        // buffer_a
1267        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1268        //     .await;
1269
1270        // Close the buffer as client A, see that the buffer is closed.
1271        cx_a.update(move |_| drop(buffer_a));
1272        project_a
1273            .condition(&cx_a, |project, cx| {
1274                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1275            })
1276            .await;
1277
1278        // Dropping the client B's project removes client B from client A's collaborators.
1279        cx_b.update(move |_| drop(project_b));
1280        project_a
1281            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1282            .await;
1283    }
1284
1285    #[gpui::test(iterations = 10)]
1286    async fn test_unshare_project(
1287        mut cx_a: TestAppContext,
1288        mut cx_b: TestAppContext,
1289        last_iteration: bool,
1290    ) {
1291        let lang_registry = Arc::new(LanguageRegistry::new());
1292        let fs = Arc::new(FakeFs::new(cx_a.background()));
1293        cx_a.foreground().forbid_parking();
1294
1295        // Connect to a server as 2 clients.
1296        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1297        let client_a = server.create_client(&mut cx_a, "user_a").await;
1298        let client_b = server.create_client(&mut cx_b, "user_b").await;
1299
1300        // Share a project as client A
1301        fs.insert_tree(
1302            "/a",
1303            json!({
1304                ".zed.toml": r#"collaborators = ["user_b"]"#,
1305                "a.txt": "a-contents",
1306                "b.txt": "b-contents",
1307            }),
1308        )
1309        .await;
1310        let project_a = cx_a.update(|cx| {
1311            Project::local(
1312                client_a.clone(),
1313                client_a.user_store.clone(),
1314                lang_registry.clone(),
1315                fs.clone(),
1316                cx,
1317            )
1318        });
1319        let (worktree_a, _) = project_a
1320            .update(&mut cx_a, |p, cx| {
1321                p.find_or_create_local_worktree("/a", false, cx)
1322            })
1323            .await
1324            .unwrap();
1325        worktree_a
1326            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1327            .await;
1328        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1329        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1330        project_a
1331            .update(&mut cx_a, |p, cx| p.share(cx))
1332            .await
1333            .unwrap();
1334        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1335
1336        // Join that project as client B
1337        let project_b = Project::remote(
1338            project_id,
1339            client_b.clone(),
1340            client_b.user_store.clone(),
1341            lang_registry.clone(),
1342            fs.clone(),
1343            &mut cx_b.to_async(),
1344        )
1345        .await
1346        .unwrap();
1347        project_b
1348            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1349            .await
1350            .unwrap();
1351
1352        // Unshare the project as client A
1353        project_a
1354            .update(&mut cx_a, |project, cx| project.unshare(cx))
1355            .await
1356            .unwrap();
1357        project_b
1358            .condition(&mut cx_b, |project, _| project.is_read_only())
1359            .await;
1360        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1361        drop(project_b);
1362
1363        // Share the project again and ensure guests can still join.
1364        project_a
1365            .update(&mut cx_a, |project, cx| project.share(cx))
1366            .await
1367            .unwrap();
1368        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1369
1370        let project_c = Project::remote(
1371            project_id,
1372            client_b.clone(),
1373            client_b.user_store.clone(),
1374            lang_registry.clone(),
1375            fs.clone(),
1376            &mut cx_b.to_async(),
1377        )
1378        .await
1379        .unwrap();
1380        project_c
1381            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1382            .await
1383            .unwrap();
1384    }
1385
1386    #[gpui::test(iterations = 10)]
1387    async fn test_propagate_saves_and_fs_changes(
1388        mut cx_a: TestAppContext,
1389        mut cx_b: TestAppContext,
1390        mut cx_c: TestAppContext,
1391        last_iteration: bool,
1392    ) {
1393        let lang_registry = Arc::new(LanguageRegistry::new());
1394        let fs = Arc::new(FakeFs::new(cx_a.background()));
1395        cx_a.foreground().forbid_parking();
1396
1397        // Connect to a server as 3 clients.
1398        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1399        let client_a = server.create_client(&mut cx_a, "user_a").await;
1400        let client_b = server.create_client(&mut cx_b, "user_b").await;
1401        let client_c = server.create_client(&mut cx_c, "user_c").await;
1402
1403        // Share a worktree as client A.
1404        fs.insert_tree(
1405            "/a",
1406            json!({
1407                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1408                "file1": "",
1409                "file2": ""
1410            }),
1411        )
1412        .await;
1413        let project_a = cx_a.update(|cx| {
1414            Project::local(
1415                client_a.clone(),
1416                client_a.user_store.clone(),
1417                lang_registry.clone(),
1418                fs.clone(),
1419                cx,
1420            )
1421        });
1422        let (worktree_a, _) = project_a
1423            .update(&mut cx_a, |p, cx| {
1424                p.find_or_create_local_worktree("/a", false, cx)
1425            })
1426            .await
1427            .unwrap();
1428        worktree_a
1429            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1430            .await;
1431        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1432        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1433        project_a
1434            .update(&mut cx_a, |p, cx| p.share(cx))
1435            .await
1436            .unwrap();
1437
1438        // Join that worktree as clients B and C.
1439        let project_b = Project::remote(
1440            project_id,
1441            client_b.clone(),
1442            client_b.user_store.clone(),
1443            lang_registry.clone(),
1444            fs.clone(),
1445            &mut cx_b.to_async(),
1446        )
1447        .await
1448        .unwrap();
1449        let project_c = Project::remote(
1450            project_id,
1451            client_c.clone(),
1452            client_c.user_store.clone(),
1453            lang_registry.clone(),
1454            fs.clone(),
1455            &mut cx_c.to_async(),
1456        )
1457        .await
1458        .unwrap();
1459        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1460        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1461
1462        // Open and edit a buffer as both guests B and C.
1463        let buffer_b = project_b
1464            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1465            .await
1466            .unwrap();
1467        let buffer_c = project_c
1468            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1469            .await
1470            .unwrap();
1471        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1472        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1473
1474        // Open and edit that buffer as the host.
1475        let buffer_a = project_a
1476            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1477            .await
1478            .unwrap();
1479
1480        buffer_a
1481            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1482            .await;
1483        buffer_a.update(&mut cx_a, |buf, cx| {
1484            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1485        });
1486
1487        // Wait for edits to propagate
1488        buffer_a
1489            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1490            .await;
1491        buffer_b
1492            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1493            .await;
1494        buffer_c
1495            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1496            .await;
1497
1498        // Edit the buffer as the host and concurrently save as guest B.
1499        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1500        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1501        save_b.await.unwrap();
1502        assert_eq!(
1503            fs.load("/a/file1".as_ref()).await.unwrap(),
1504            "hi-a, i-am-c, i-am-b, i-am-a"
1505        );
1506        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1507        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1508        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1509
1510        // Make changes on host's file system, see those changes on guest worktrees.
1511        fs.rename(
1512            "/a/file1".as_ref(),
1513            "/a/file1-renamed".as_ref(),
1514            Default::default(),
1515        )
1516        .await
1517        .unwrap();
1518
1519        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1520            .await
1521            .unwrap();
1522        fs.insert_file(Path::new("/a/file4"), "4".into())
1523            .await
1524            .unwrap();
1525
1526        worktree_a
1527            .condition(&cx_a, |tree, _| {
1528                tree.paths()
1529                    .map(|p| p.to_string_lossy())
1530                    .collect::<Vec<_>>()
1531                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1532            })
1533            .await;
1534        worktree_b
1535            .condition(&cx_b, |tree, _| {
1536                tree.paths()
1537                    .map(|p| p.to_string_lossy())
1538                    .collect::<Vec<_>>()
1539                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1540            })
1541            .await;
1542        worktree_c
1543            .condition(&cx_c, |tree, _| {
1544                tree.paths()
1545                    .map(|p| p.to_string_lossy())
1546                    .collect::<Vec<_>>()
1547                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1548            })
1549            .await;
1550
1551        // Ensure buffer files are updated as well.
1552        buffer_a
1553            .condition(&cx_a, |buf, _| {
1554                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1555            })
1556            .await;
1557        buffer_b
1558            .condition(&cx_b, |buf, _| {
1559                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1560            })
1561            .await;
1562        buffer_c
1563            .condition(&cx_c, |buf, _| {
1564                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1565            })
1566            .await;
1567    }
1568
1569    #[gpui::test(iterations = 10)]
1570    async fn test_buffer_conflict_after_save(
1571        mut cx_a: TestAppContext,
1572        mut cx_b: TestAppContext,
1573        last_iteration: bool,
1574    ) {
1575        cx_a.foreground().forbid_parking();
1576        let lang_registry = Arc::new(LanguageRegistry::new());
1577        let fs = Arc::new(FakeFs::new(cx_a.background()));
1578
1579        // Connect to a server as 2 clients.
1580        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1581        let client_a = server.create_client(&mut cx_a, "user_a").await;
1582        let client_b = server.create_client(&mut cx_b, "user_b").await;
1583
1584        // Share a project as client A
1585        fs.insert_tree(
1586            "/dir",
1587            json!({
1588                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1589                "a.txt": "a-contents",
1590            }),
1591        )
1592        .await;
1593
1594        let project_a = cx_a.update(|cx| {
1595            Project::local(
1596                client_a.clone(),
1597                client_a.user_store.clone(),
1598                lang_registry.clone(),
1599                fs.clone(),
1600                cx,
1601            )
1602        });
1603        let (worktree_a, _) = project_a
1604            .update(&mut cx_a, |p, cx| {
1605                p.find_or_create_local_worktree("/dir", false, cx)
1606            })
1607            .await
1608            .unwrap();
1609        worktree_a
1610            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1611            .await;
1612        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1613        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1614        project_a
1615            .update(&mut cx_a, |p, cx| p.share(cx))
1616            .await
1617            .unwrap();
1618
1619        // Join that project as client B
1620        let project_b = Project::remote(
1621            project_id,
1622            client_b.clone(),
1623            client_b.user_store.clone(),
1624            lang_registry.clone(),
1625            fs.clone(),
1626            &mut cx_b.to_async(),
1627        )
1628        .await
1629        .unwrap();
1630
1631        // Open a buffer as client B
1632        let buffer_b = project_b
1633            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1634            .await
1635            .unwrap();
1636
1637        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1638        buffer_b.read_with(&cx_b, |buf, _| {
1639            assert!(buf.is_dirty());
1640            assert!(!buf.has_conflict());
1641        });
1642
1643        buffer_b
1644            .update(&mut cx_b, |buf, cx| buf.save(cx))
1645            .await
1646            .unwrap();
1647        buffer_b
1648            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1649            .await;
1650        buffer_b.read_with(&cx_b, |buf, _| {
1651            assert!(!buf.has_conflict());
1652        });
1653
1654        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1655        buffer_b.read_with(&cx_b, |buf, _| {
1656            assert!(buf.is_dirty());
1657            assert!(!buf.has_conflict());
1658        });
1659    }
1660
1661    #[gpui::test(iterations = 10)]
1662    async fn test_buffer_reloading(
1663        mut cx_a: TestAppContext,
1664        mut cx_b: TestAppContext,
1665        last_iteration: bool,
1666    ) {
1667        cx_a.foreground().forbid_parking();
1668        let lang_registry = Arc::new(LanguageRegistry::new());
1669        let fs = Arc::new(FakeFs::new(cx_a.background()));
1670
1671        // Connect to a server as 2 clients.
1672        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1673        let client_a = server.create_client(&mut cx_a, "user_a").await;
1674        let client_b = server.create_client(&mut cx_b, "user_b").await;
1675
1676        // Share a project as client A
1677        fs.insert_tree(
1678            "/dir",
1679            json!({
1680                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1681                "a.txt": "a-contents",
1682            }),
1683        )
1684        .await;
1685
1686        let project_a = cx_a.update(|cx| {
1687            Project::local(
1688                client_a.clone(),
1689                client_a.user_store.clone(),
1690                lang_registry.clone(),
1691                fs.clone(),
1692                cx,
1693            )
1694        });
1695        let (worktree_a, _) = project_a
1696            .update(&mut cx_a, |p, cx| {
1697                p.find_or_create_local_worktree("/dir", false, cx)
1698            })
1699            .await
1700            .unwrap();
1701        worktree_a
1702            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1703            .await;
1704        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1705        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1706        project_a
1707            .update(&mut cx_a, |p, cx| p.share(cx))
1708            .await
1709            .unwrap();
1710
1711        // Join that project as client B
1712        let project_b = Project::remote(
1713            project_id,
1714            client_b.clone(),
1715            client_b.user_store.clone(),
1716            lang_registry.clone(),
1717            fs.clone(),
1718            &mut cx_b.to_async(),
1719        )
1720        .await
1721        .unwrap();
1722        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1723
1724        // Open a buffer as client B
1725        let buffer_b = project_b
1726            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1727            .await
1728            .unwrap();
1729        buffer_b.read_with(&cx_b, |buf, _| {
1730            assert!(!buf.is_dirty());
1731            assert!(!buf.has_conflict());
1732        });
1733
1734        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1735            .await
1736            .unwrap();
1737        buffer_b
1738            .condition(&cx_b, |buf, _| {
1739                buf.text() == "new contents" && !buf.is_dirty()
1740            })
1741            .await;
1742        buffer_b.read_with(&cx_b, |buf, _| {
1743            assert!(!buf.has_conflict());
1744        });
1745    }
1746
1747    #[gpui::test(iterations = 10)]
1748    async fn test_editing_while_guest_opens_buffer(
1749        mut cx_a: TestAppContext,
1750        mut cx_b: TestAppContext,
1751        last_iteration: bool,
1752    ) {
1753        cx_a.foreground().forbid_parking();
1754        let lang_registry = Arc::new(LanguageRegistry::new());
1755        let fs = Arc::new(FakeFs::new(cx_a.background()));
1756
1757        // Connect to a server as 2 clients.
1758        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1759        let client_a = server.create_client(&mut cx_a, "user_a").await;
1760        let client_b = server.create_client(&mut cx_b, "user_b").await;
1761
1762        // Share a project as client A
1763        fs.insert_tree(
1764            "/dir",
1765            json!({
1766                ".zed.toml": r#"collaborators = ["user_b"]"#,
1767                "a.txt": "a-contents",
1768            }),
1769        )
1770        .await;
1771        let project_a = cx_a.update(|cx| {
1772            Project::local(
1773                client_a.clone(),
1774                client_a.user_store.clone(),
1775                lang_registry.clone(),
1776                fs.clone(),
1777                cx,
1778            )
1779        });
1780        let (worktree_a, _) = project_a
1781            .update(&mut cx_a, |p, cx| {
1782                p.find_or_create_local_worktree("/dir", false, cx)
1783            })
1784            .await
1785            .unwrap();
1786        worktree_a
1787            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1788            .await;
1789        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1790        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1791        project_a
1792            .update(&mut cx_a, |p, cx| p.share(cx))
1793            .await
1794            .unwrap();
1795
1796        // Join that project as client B
1797        let project_b = Project::remote(
1798            project_id,
1799            client_b.clone(),
1800            client_b.user_store.clone(),
1801            lang_registry.clone(),
1802            fs.clone(),
1803            &mut cx_b.to_async(),
1804        )
1805        .await
1806        .unwrap();
1807
1808        // Open a buffer as client A
1809        let buffer_a = project_a
1810            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1811            .await
1812            .unwrap();
1813
1814        // Start opening the same buffer as client B
1815        let buffer_b = cx_b
1816            .background()
1817            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1818
1819        // Edit the buffer as client A while client B is still opening it.
1820        cx_b.background().simulate_random_delay().await;
1821        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1822        cx_b.background().simulate_random_delay().await;
1823        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1824
1825        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1826        let buffer_b = buffer_b.await.unwrap();
1827        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1828    }
1829
1830    #[gpui::test(iterations = 10)]
1831    async fn test_leaving_worktree_while_opening_buffer(
1832        mut cx_a: TestAppContext,
1833        mut cx_b: TestAppContext,
1834        last_iteration: bool,
1835    ) {
1836        cx_a.foreground().forbid_parking();
1837        let lang_registry = Arc::new(LanguageRegistry::new());
1838        let fs = Arc::new(FakeFs::new(cx_a.background()));
1839
1840        // Connect to a server as 2 clients.
1841        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1842        let client_a = server.create_client(&mut cx_a, "user_a").await;
1843        let client_b = server.create_client(&mut cx_b, "user_b").await;
1844
1845        // Share a project as client A
1846        fs.insert_tree(
1847            "/dir",
1848            json!({
1849                ".zed.toml": r#"collaborators = ["user_b"]"#,
1850                "a.txt": "a-contents",
1851            }),
1852        )
1853        .await;
1854        let project_a = cx_a.update(|cx| {
1855            Project::local(
1856                client_a.clone(),
1857                client_a.user_store.clone(),
1858                lang_registry.clone(),
1859                fs.clone(),
1860                cx,
1861            )
1862        });
1863        let (worktree_a, _) = project_a
1864            .update(&mut cx_a, |p, cx| {
1865                p.find_or_create_local_worktree("/dir", false, cx)
1866            })
1867            .await
1868            .unwrap();
1869        worktree_a
1870            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1871            .await;
1872        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1873        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1874        project_a
1875            .update(&mut cx_a, |p, cx| p.share(cx))
1876            .await
1877            .unwrap();
1878
1879        // Join that project as client B
1880        let project_b = Project::remote(
1881            project_id,
1882            client_b.clone(),
1883            client_b.user_store.clone(),
1884            lang_registry.clone(),
1885            fs.clone(),
1886            &mut cx_b.to_async(),
1887        )
1888        .await
1889        .unwrap();
1890
1891        // See that a guest has joined as client A.
1892        project_a
1893            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1894            .await;
1895
1896        // Begin opening a buffer as client B, but leave the project before the open completes.
1897        let buffer_b = cx_b
1898            .background()
1899            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1900        cx_b.update(|_| drop(project_b));
1901        drop(buffer_b);
1902
1903        // See that the guest has left.
1904        project_a
1905            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1906            .await;
1907    }
1908
1909    #[gpui::test(iterations = 10)]
1910    async fn test_peer_disconnection(
1911        mut cx_a: TestAppContext,
1912        mut cx_b: TestAppContext,
1913        last_iteration: bool,
1914    ) {
1915        cx_a.foreground().forbid_parking();
1916        let lang_registry = Arc::new(LanguageRegistry::new());
1917        let fs = Arc::new(FakeFs::new(cx_a.background()));
1918
1919        // Connect to a server as 2 clients.
1920        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1921        let client_a = server.create_client(&mut cx_a, "user_a").await;
1922        let client_b = server.create_client(&mut cx_b, "user_b").await;
1923
1924        // Share a project as client A
1925        fs.insert_tree(
1926            "/a",
1927            json!({
1928                ".zed.toml": r#"collaborators = ["user_b"]"#,
1929                "a.txt": "a-contents",
1930                "b.txt": "b-contents",
1931            }),
1932        )
1933        .await;
1934        let project_a = cx_a.update(|cx| {
1935            Project::local(
1936                client_a.clone(),
1937                client_a.user_store.clone(),
1938                lang_registry.clone(),
1939                fs.clone(),
1940                cx,
1941            )
1942        });
1943        let (worktree_a, _) = project_a
1944            .update(&mut cx_a, |p, cx| {
1945                p.find_or_create_local_worktree("/a", false, cx)
1946            })
1947            .await
1948            .unwrap();
1949        worktree_a
1950            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1951            .await;
1952        let project_id = project_a
1953            .update(&mut cx_a, |project, _| project.next_remote_id())
1954            .await;
1955        project_a
1956            .update(&mut cx_a, |project, cx| project.share(cx))
1957            .await
1958            .unwrap();
1959
1960        // Join that project as client B
1961        let _project_b = Project::remote(
1962            project_id,
1963            client_b.clone(),
1964            client_b.user_store.clone(),
1965            lang_registry.clone(),
1966            fs.clone(),
1967            &mut cx_b.to_async(),
1968        )
1969        .await
1970        .unwrap();
1971
1972        // See that a guest has joined as client A.
1973        project_a
1974            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1975            .await;
1976
1977        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1978        client_b.disconnect(&cx_b.to_async()).unwrap();
1979        project_a
1980            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1981            .await;
1982    }
1983
1984    #[gpui::test(iterations = 10)]
1985    async fn test_collaborating_with_diagnostics(
1986        mut cx_a: TestAppContext,
1987        mut cx_b: TestAppContext,
1988        last_iteration: bool,
1989    ) {
1990        cx_a.foreground().forbid_parking();
1991        let mut lang_registry = Arc::new(LanguageRegistry::new());
1992        let fs = Arc::new(FakeFs::new(cx_a.background()));
1993
1994        // Set up a fake language server.
1995        let (language_server_config, mut fake_language_server) =
1996            LanguageServerConfig::fake(&cx_a).await;
1997        Arc::get_mut(&mut lang_registry)
1998            .unwrap()
1999            .add(Arc::new(Language::new(
2000                LanguageConfig {
2001                    name: "Rust".to_string(),
2002                    path_suffixes: vec!["rs".to_string()],
2003                    language_server: Some(language_server_config),
2004                    ..Default::default()
2005                },
2006                Some(tree_sitter_rust::language()),
2007            )));
2008
2009        // Connect to a server as 2 clients.
2010        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2011        let client_a = server.create_client(&mut cx_a, "user_a").await;
2012        let client_b = server.create_client(&mut cx_b, "user_b").await;
2013
2014        // Share a project as client A
2015        fs.insert_tree(
2016            "/a",
2017            json!({
2018                ".zed.toml": r#"collaborators = ["user_b"]"#,
2019                "a.rs": "let one = two",
2020                "other.rs": "",
2021            }),
2022        )
2023        .await;
2024        let project_a = cx_a.update(|cx| {
2025            Project::local(
2026                client_a.clone(),
2027                client_a.user_store.clone(),
2028                lang_registry.clone(),
2029                fs.clone(),
2030                cx,
2031            )
2032        });
2033        let (worktree_a, _) = project_a
2034            .update(&mut cx_a, |p, cx| {
2035                p.find_or_create_local_worktree("/a", false, cx)
2036            })
2037            .await
2038            .unwrap();
2039        worktree_a
2040            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2041            .await;
2042        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2043        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2044        project_a
2045            .update(&mut cx_a, |p, cx| p.share(cx))
2046            .await
2047            .unwrap();
2048
2049        // Cause the language server to start.
2050        let _ = cx_a
2051            .background()
2052            .spawn(project_a.update(&mut cx_a, |project, cx| {
2053                project.open_buffer(
2054                    ProjectPath {
2055                        worktree_id,
2056                        path: Path::new("other.rs").into(),
2057                    },
2058                    cx,
2059                )
2060            }))
2061            .await
2062            .unwrap();
2063
2064        // Simulate a language server reporting errors for a file.
2065        fake_language_server
2066            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2067                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2068                version: None,
2069                diagnostics: vec![lsp::Diagnostic {
2070                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2071                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2072                    message: "message 1".to_string(),
2073                    ..Default::default()
2074                }],
2075            })
2076            .await;
2077
2078        // Wait for server to see the diagnostics update.
2079        server
2080            .condition(|store| {
2081                let worktree = store
2082                    .project(project_id)
2083                    .unwrap()
2084                    .worktrees
2085                    .get(&worktree_id.to_proto())
2086                    .unwrap();
2087
2088                !worktree
2089                    .share
2090                    .as_ref()
2091                    .unwrap()
2092                    .diagnostic_summaries
2093                    .is_empty()
2094            })
2095            .await;
2096
2097        // Join the worktree as client B.
2098        let project_b = Project::remote(
2099            project_id,
2100            client_b.clone(),
2101            client_b.user_store.clone(),
2102            lang_registry.clone(),
2103            fs.clone(),
2104            &mut cx_b.to_async(),
2105        )
2106        .await
2107        .unwrap();
2108
2109        project_b.read_with(&cx_b, |project, cx| {
2110            assert_eq!(
2111                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2112                &[(
2113                    ProjectPath {
2114                        worktree_id,
2115                        path: Arc::from(Path::new("a.rs")),
2116                    },
2117                    DiagnosticSummary {
2118                        error_count: 1,
2119                        warning_count: 0,
2120                        ..Default::default()
2121                    },
2122                )]
2123            )
2124        });
2125
2126        // Simulate a language server reporting more errors for a file.
2127        fake_language_server
2128            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2129                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2130                version: None,
2131                diagnostics: vec![
2132                    lsp::Diagnostic {
2133                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2134                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2135                        message: "message 1".to_string(),
2136                        ..Default::default()
2137                    },
2138                    lsp::Diagnostic {
2139                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2140                        range: lsp::Range::new(
2141                            lsp::Position::new(0, 10),
2142                            lsp::Position::new(0, 13),
2143                        ),
2144                        message: "message 2".to_string(),
2145                        ..Default::default()
2146                    },
2147                ],
2148            })
2149            .await;
2150
2151        // Client b gets the updated summaries
2152        project_b
2153            .condition(&cx_b, |project, cx| {
2154                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2155                    == &[(
2156                        ProjectPath {
2157                            worktree_id,
2158                            path: Arc::from(Path::new("a.rs")),
2159                        },
2160                        DiagnosticSummary {
2161                            error_count: 1,
2162                            warning_count: 1,
2163                            ..Default::default()
2164                        },
2165                    )]
2166            })
2167            .await;
2168
2169        // Open the file with the errors on client B. They should be present.
2170        let buffer_b = cx_b
2171            .background()
2172            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2173            .await
2174            .unwrap();
2175
2176        buffer_b.read_with(&cx_b, |buffer, _| {
2177            assert_eq!(
2178                buffer
2179                    .snapshot()
2180                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2181                    .map(|entry| entry)
2182                    .collect::<Vec<_>>(),
2183                &[
2184                    DiagnosticEntry {
2185                        range: Point::new(0, 4)..Point::new(0, 7),
2186                        diagnostic: Diagnostic {
2187                            group_id: 0,
2188                            message: "message 1".to_string(),
2189                            severity: lsp::DiagnosticSeverity::ERROR,
2190                            is_primary: true,
2191                            ..Default::default()
2192                        }
2193                    },
2194                    DiagnosticEntry {
2195                        range: Point::new(0, 10)..Point::new(0, 13),
2196                        diagnostic: Diagnostic {
2197                            group_id: 1,
2198                            severity: lsp::DiagnosticSeverity::WARNING,
2199                            message: "message 2".to_string(),
2200                            is_primary: true,
2201                            ..Default::default()
2202                        }
2203                    }
2204                ]
2205            );
2206        });
2207    }
2208
2209    #[gpui::test(iterations = 10)]
2210    async fn test_collaborating_with_completion(
2211        mut cx_a: TestAppContext,
2212        mut cx_b: TestAppContext,
2213        last_iteration: bool,
2214    ) {
2215        cx_a.foreground().forbid_parking();
2216        let mut lang_registry = Arc::new(LanguageRegistry::new());
2217        let fs = Arc::new(FakeFs::new(cx_a.background()));
2218
2219        // Set up a fake language server.
2220        let (language_server_config, mut fake_language_server) =
2221            LanguageServerConfig::fake_with_capabilities(
2222                lsp::ServerCapabilities {
2223                    completion_provider: Some(lsp::CompletionOptions {
2224                        trigger_characters: Some(vec![".".to_string()]),
2225                        ..Default::default()
2226                    }),
2227                    ..Default::default()
2228                },
2229                &cx_a,
2230            )
2231            .await;
2232        Arc::get_mut(&mut lang_registry)
2233            .unwrap()
2234            .add(Arc::new(Language::new(
2235                LanguageConfig {
2236                    name: "Rust".to_string(),
2237                    path_suffixes: vec!["rs".to_string()],
2238                    language_server: Some(language_server_config),
2239                    ..Default::default()
2240                },
2241                Some(tree_sitter_rust::language()),
2242            )));
2243
2244        // Connect to a server as 2 clients.
2245        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2246        let client_a = server.create_client(&mut cx_a, "user_a").await;
2247        let client_b = server.create_client(&mut cx_b, "user_b").await;
2248
2249        // Share a project as client A
2250        fs.insert_tree(
2251            "/a",
2252            json!({
2253                ".zed.toml": r#"collaborators = ["user_b"]"#,
2254                "main.rs": "fn main() { a }",
2255                "other.rs": "",
2256            }),
2257        )
2258        .await;
2259        let project_a = cx_a.update(|cx| {
2260            Project::local(
2261                client_a.clone(),
2262                client_a.user_store.clone(),
2263                lang_registry.clone(),
2264                fs.clone(),
2265                cx,
2266            )
2267        });
2268        let (worktree_a, _) = project_a
2269            .update(&mut cx_a, |p, cx| {
2270                p.find_or_create_local_worktree("/a", false, cx)
2271            })
2272            .await
2273            .unwrap();
2274        worktree_a
2275            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2276            .await;
2277        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2278        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2279        project_a
2280            .update(&mut cx_a, |p, cx| p.share(cx))
2281            .await
2282            .unwrap();
2283
2284        // Join the worktree as client B.
2285        let project_b = Project::remote(
2286            project_id,
2287            client_b.clone(),
2288            client_b.user_store.clone(),
2289            lang_registry.clone(),
2290            fs.clone(),
2291            &mut cx_b.to_async(),
2292        )
2293        .await
2294        .unwrap();
2295
2296        // Open a file in an editor as the guest.
2297        let buffer_b = project_b
2298            .update(&mut cx_b, |p, cx| {
2299                p.open_buffer((worktree_id, "main.rs"), cx)
2300            })
2301            .await
2302            .unwrap();
2303        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2304        let editor_b = cx_b.add_view(window_b, |cx| {
2305            Editor::for_buffer(
2306                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2307                Arc::new(|cx| EditorSettings::test(cx)),
2308                Some(project_b.clone()),
2309                cx,
2310            )
2311        });
2312
2313        // Type a completion trigger character as the guest.
2314        editor_b.update(&mut cx_b, |editor, cx| {
2315            editor.select_ranges([13..13], None, cx);
2316            editor.handle_input(&Input(".".into()), cx);
2317            cx.focus(&editor_b);
2318        });
2319
2320        // Receive a completion request as the host's language server.
2321        // Return some completions from the host's language server.
2322        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2323            assert_eq!(
2324                params.text_document_position.text_document.uri,
2325                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2326            );
2327            assert_eq!(
2328                params.text_document_position.position,
2329                lsp::Position::new(0, 14),
2330            );
2331
2332            Some(lsp::CompletionResponse::Array(vec![
2333                lsp::CompletionItem {
2334                    label: "first_method(…)".into(),
2335                    detail: Some("fn(&mut self, B) -> C".into()),
2336                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2337                        new_text: "first_method($1)".to_string(),
2338                        range: lsp::Range::new(
2339                            lsp::Position::new(0, 14),
2340                            lsp::Position::new(0, 14),
2341                        ),
2342                    })),
2343                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2344                    ..Default::default()
2345                },
2346                lsp::CompletionItem {
2347                    label: "second_method(…)".into(),
2348                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2349                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2350                        new_text: "second_method()".to_string(),
2351                        range: lsp::Range::new(
2352                            lsp::Position::new(0, 14),
2353                            lsp::Position::new(0, 14),
2354                        ),
2355                    })),
2356                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2357                    ..Default::default()
2358                },
2359            ]))
2360        });
2361
2362        // Open the buffer on the host.
2363        let buffer_a = project_a
2364            .update(&mut cx_a, |p, cx| {
2365                p.open_buffer((worktree_id, "main.rs"), cx)
2366            })
2367            .await
2368            .unwrap();
2369        buffer_a
2370            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2371            .await;
2372
2373        // Confirm a completion on the guest.
2374        editor_b
2375            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2376            .await;
2377        editor_b.update(&mut cx_b, |editor, cx| {
2378            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2379            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2380        });
2381
2382        // Return a resolved completion from the host's language server.
2383        // The resolved completion has an additional text edit.
2384        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2385            assert_eq!(params.label, "first_method(…)");
2386            lsp::CompletionItem {
2387                label: "first_method(…)".into(),
2388                detail: Some("fn(&mut self, B) -> C".into()),
2389                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2390                    new_text: "first_method($1)".to_string(),
2391                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2392                })),
2393                additional_text_edits: Some(vec![lsp::TextEdit {
2394                    new_text: "use d::SomeTrait;\n".to_string(),
2395                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2396                }]),
2397                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2398                ..Default::default()
2399            }
2400        });
2401
2402        // The additional edit is applied.
2403        buffer_a
2404            .condition(&cx_a, |buffer, _| {
2405                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2406            })
2407            .await;
2408        buffer_b
2409            .condition(&cx_b, |buffer, _| {
2410                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2411            })
2412            .await;
2413    }
2414
2415    #[gpui::test(iterations = 10)]
2416    async fn test_formatting_buffer(
2417        mut cx_a: TestAppContext,
2418        mut cx_b: TestAppContext,
2419        last_iteration: bool,
2420    ) {
2421        cx_a.foreground().forbid_parking();
2422        let mut lang_registry = Arc::new(LanguageRegistry::new());
2423        let fs = Arc::new(FakeFs::new(cx_a.background()));
2424
2425        // Set up a fake language server.
2426        let (language_server_config, mut fake_language_server) =
2427            LanguageServerConfig::fake(&cx_a).await;
2428        Arc::get_mut(&mut lang_registry)
2429            .unwrap()
2430            .add(Arc::new(Language::new(
2431                LanguageConfig {
2432                    name: "Rust".to_string(),
2433                    path_suffixes: vec!["rs".to_string()],
2434                    language_server: Some(language_server_config),
2435                    ..Default::default()
2436                },
2437                Some(tree_sitter_rust::language()),
2438            )));
2439
2440        // Connect to a server as 2 clients.
2441        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2442        let client_a = server.create_client(&mut cx_a, "user_a").await;
2443        let client_b = server.create_client(&mut cx_b, "user_b").await;
2444
2445        // Share a project as client A
2446        fs.insert_tree(
2447            "/a",
2448            json!({
2449                ".zed.toml": r#"collaborators = ["user_b"]"#,
2450                "a.rs": "let one = two",
2451            }),
2452        )
2453        .await;
2454        let project_a = cx_a.update(|cx| {
2455            Project::local(
2456                client_a.clone(),
2457                client_a.user_store.clone(),
2458                lang_registry.clone(),
2459                fs.clone(),
2460                cx,
2461            )
2462        });
2463        let (worktree_a, _) = project_a
2464            .update(&mut cx_a, |p, cx| {
2465                p.find_or_create_local_worktree("/a", false, cx)
2466            })
2467            .await
2468            .unwrap();
2469        worktree_a
2470            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2471            .await;
2472        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2473        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2474        project_a
2475            .update(&mut cx_a, |p, cx| p.share(cx))
2476            .await
2477            .unwrap();
2478
2479        // Join the worktree as client B.
2480        let project_b = Project::remote(
2481            project_id,
2482            client_b.clone(),
2483            client_b.user_store.clone(),
2484            lang_registry.clone(),
2485            fs.clone(),
2486            &mut cx_b.to_async(),
2487        )
2488        .await
2489        .unwrap();
2490
2491        let buffer_b = cx_b
2492            .background()
2493            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2494            .await
2495            .unwrap();
2496
2497        let format = project_b.update(&mut cx_b, |project, cx| {
2498            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2499        });
2500
2501        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2502            Some(vec![
2503                lsp::TextEdit {
2504                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2505                    new_text: "h".to_string(),
2506                },
2507                lsp::TextEdit {
2508                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2509                    new_text: "y".to_string(),
2510                },
2511            ])
2512        });
2513
2514        format.await.unwrap();
2515        assert_eq!(
2516            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2517            "let honey = two"
2518        );
2519    }
2520
2521    #[gpui::test(iterations = 10)]
2522    async fn test_definition(
2523        mut cx_a: TestAppContext,
2524        mut cx_b: TestAppContext,
2525        last_iteration: bool,
2526    ) {
2527        cx_a.foreground().forbid_parking();
2528        let mut lang_registry = Arc::new(LanguageRegistry::new());
2529        let fs = Arc::new(FakeFs::new(cx_a.background()));
2530        fs.insert_tree(
2531            "/root-1",
2532            json!({
2533                ".zed.toml": r#"collaborators = ["user_b"]"#,
2534                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2535            }),
2536        )
2537        .await;
2538        fs.insert_tree(
2539            "/root-2",
2540            json!({
2541                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2542            }),
2543        )
2544        .await;
2545
2546        // Set up a fake language server.
2547        let (language_server_config, mut fake_language_server) =
2548            LanguageServerConfig::fake(&cx_a).await;
2549        Arc::get_mut(&mut lang_registry)
2550            .unwrap()
2551            .add(Arc::new(Language::new(
2552                LanguageConfig {
2553                    name: "Rust".to_string(),
2554                    path_suffixes: vec!["rs".to_string()],
2555                    language_server: Some(language_server_config),
2556                    ..Default::default()
2557                },
2558                Some(tree_sitter_rust::language()),
2559            )));
2560
2561        // Connect to a server as 2 clients.
2562        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2563        let client_a = server.create_client(&mut cx_a, "user_a").await;
2564        let client_b = server.create_client(&mut cx_b, "user_b").await;
2565
2566        // Share a project as client A
2567        let project_a = cx_a.update(|cx| {
2568            Project::local(
2569                client_a.clone(),
2570                client_a.user_store.clone(),
2571                lang_registry.clone(),
2572                fs.clone(),
2573                cx,
2574            )
2575        });
2576        let (worktree_a, _) = project_a
2577            .update(&mut cx_a, |p, cx| {
2578                p.find_or_create_local_worktree("/root-1", false, cx)
2579            })
2580            .await
2581            .unwrap();
2582        worktree_a
2583            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2584            .await;
2585        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2586        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2587        project_a
2588            .update(&mut cx_a, |p, cx| p.share(cx))
2589            .await
2590            .unwrap();
2591
2592        // Join the worktree as client B.
2593        let project_b = Project::remote(
2594            project_id,
2595            client_b.clone(),
2596            client_b.user_store.clone(),
2597            lang_registry.clone(),
2598            fs.clone(),
2599            &mut cx_b.to_async(),
2600        )
2601        .await
2602        .unwrap();
2603
2604        // Open the file on client B.
2605        let buffer_b = cx_b
2606            .background()
2607            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2608            .await
2609            .unwrap();
2610
2611        // Request the definition of a symbol as the guest.
2612        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2613        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2614            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2615                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2616                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2617            )))
2618        });
2619
2620        let definitions_1 = definitions_1.await.unwrap();
2621        cx_b.read(|cx| {
2622            assert_eq!(definitions_1.len(), 1);
2623            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2624            let target_buffer = definitions_1[0].target_buffer.read(cx);
2625            assert_eq!(
2626                target_buffer.text(),
2627                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2628            );
2629            assert_eq!(
2630                definitions_1[0].target_range.to_point(target_buffer),
2631                Point::new(0, 6)..Point::new(0, 9)
2632            );
2633        });
2634
2635        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2636        // the previous call to `definition`.
2637        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2638        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2639            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2640                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2641                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2642            )))
2643        });
2644
2645        let definitions_2 = definitions_2.await.unwrap();
2646        cx_b.read(|cx| {
2647            assert_eq!(definitions_2.len(), 1);
2648            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2649            let target_buffer = definitions_2[0].target_buffer.read(cx);
2650            assert_eq!(
2651                target_buffer.text(),
2652                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2653            );
2654            assert_eq!(
2655                definitions_2[0].target_range.to_point(target_buffer),
2656                Point::new(1, 6)..Point::new(1, 11)
2657            );
2658        });
2659        assert_eq!(
2660            definitions_1[0].target_buffer,
2661            definitions_2[0].target_buffer
2662        );
2663
2664        cx_b.update(|_| {
2665            drop(definitions_1);
2666            drop(definitions_2);
2667        });
2668        project_b
2669            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2670            .await;
2671    }
2672
2673    #[gpui::test(iterations = 10)]
2674    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2675        mut cx_a: TestAppContext,
2676        mut cx_b: TestAppContext,
2677        mut rng: StdRng,
2678        last_iteration: bool,
2679    ) {
2680        cx_a.foreground().forbid_parking();
2681        let mut lang_registry = Arc::new(LanguageRegistry::new());
2682        let fs = Arc::new(FakeFs::new(cx_a.background()));
2683        fs.insert_tree(
2684            "/root",
2685            json!({
2686                ".zed.toml": r#"collaborators = ["user_b"]"#,
2687                "a.rs": "const ONE: usize = b::TWO;",
2688                "b.rs": "const TWO: usize = 2",
2689            }),
2690        )
2691        .await;
2692
2693        // Set up a fake language server.
2694        let (language_server_config, mut fake_language_server) =
2695            LanguageServerConfig::fake(&cx_a).await;
2696
2697        Arc::get_mut(&mut lang_registry)
2698            .unwrap()
2699            .add(Arc::new(Language::new(
2700                LanguageConfig {
2701                    name: "Rust".to_string(),
2702                    path_suffixes: vec!["rs".to_string()],
2703                    language_server: Some(language_server_config),
2704                    ..Default::default()
2705                },
2706                Some(tree_sitter_rust::language()),
2707            )));
2708
2709        // Connect to a server as 2 clients.
2710        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2711        let client_a = server.create_client(&mut cx_a, "user_a").await;
2712        let client_b = server.create_client(&mut cx_b, "user_b").await;
2713
2714        // Share a project as client A
2715        let project_a = cx_a.update(|cx| {
2716            Project::local(
2717                client_a.clone(),
2718                client_a.user_store.clone(),
2719                lang_registry.clone(),
2720                fs.clone(),
2721                cx,
2722            )
2723        });
2724
2725        let (worktree_a, _) = project_a
2726            .update(&mut cx_a, |p, cx| {
2727                p.find_or_create_local_worktree("/root", false, cx)
2728            })
2729            .await
2730            .unwrap();
2731        worktree_a
2732            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2733            .await;
2734        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2735        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2736        project_a
2737            .update(&mut cx_a, |p, cx| p.share(cx))
2738            .await
2739            .unwrap();
2740
2741        // Join the worktree as client B.
2742        let project_b = Project::remote(
2743            project_id,
2744            client_b.clone(),
2745            client_b.user_store.clone(),
2746            lang_registry.clone(),
2747            fs.clone(),
2748            &mut cx_b.to_async(),
2749        )
2750        .await
2751        .unwrap();
2752
2753        let buffer_b1 = cx_b
2754            .background()
2755            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2756            .await
2757            .unwrap();
2758
2759        let definitions;
2760        let buffer_b2;
2761        if rng.gen() {
2762            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2763            buffer_b2 =
2764                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2765        } else {
2766            buffer_b2 =
2767                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2768            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2769        }
2770
2771        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2772            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2773                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2774                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2775            )))
2776        });
2777
2778        let buffer_b2 = buffer_b2.await.unwrap();
2779        let definitions = definitions.await.unwrap();
2780        assert_eq!(definitions.len(), 1);
2781        assert_eq!(definitions[0].target_buffer, buffer_b2);
2782    }
2783
2784    #[gpui::test(iterations = 10)]
2785    async fn test_collaborating_with_code_actions(
2786        mut cx_a: TestAppContext,
2787        mut cx_b: TestAppContext,
2788        last_iteration: bool,
2789    ) {
2790        cx_a.foreground().forbid_parking();
2791        let mut lang_registry = Arc::new(LanguageRegistry::new());
2792        let fs = Arc::new(FakeFs::new(cx_a.background()));
2793        let mut path_openers_b = Vec::new();
2794        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2795
2796        // Set up a fake language server.
2797        let (language_server_config, mut fake_language_server) =
2798            LanguageServerConfig::fake_with_capabilities(
2799                lsp::ServerCapabilities {
2800                    ..Default::default()
2801                },
2802                &cx_a,
2803            )
2804            .await;
2805        Arc::get_mut(&mut lang_registry)
2806            .unwrap()
2807            .add(Arc::new(Language::new(
2808                LanguageConfig {
2809                    name: "Rust".to_string(),
2810                    path_suffixes: vec!["rs".to_string()],
2811                    language_server: Some(language_server_config),
2812                    ..Default::default()
2813                },
2814                Some(tree_sitter_rust::language()),
2815            )));
2816
2817        // Connect to a server as 2 clients.
2818        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2819        let client_a = server.create_client(&mut cx_a, "user_a").await;
2820        let client_b = server.create_client(&mut cx_b, "user_b").await;
2821
2822        // Share a project as client A
2823        fs.insert_tree(
2824            "/a",
2825            json!({
2826                ".zed.toml": r#"collaborators = ["user_b"]"#,
2827                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2828                "other.rs": "pub fn foo() -> usize { 4 }",
2829            }),
2830        )
2831        .await;
2832        let project_a = cx_a.update(|cx| {
2833            Project::local(
2834                client_a.clone(),
2835                client_a.user_store.clone(),
2836                lang_registry.clone(),
2837                fs.clone(),
2838                cx,
2839            )
2840        });
2841        let (worktree_a, _) = project_a
2842            .update(&mut cx_a, |p, cx| {
2843                p.find_or_create_local_worktree("/a", false, cx)
2844            })
2845            .await
2846            .unwrap();
2847        worktree_a
2848            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2849            .await;
2850        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2851        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2852        project_a
2853            .update(&mut cx_a, |p, cx| p.share(cx))
2854            .await
2855            .unwrap();
2856
2857        // Join the worktree as client B.
2858        let project_b = Project::remote(
2859            project_id,
2860            client_b.clone(),
2861            client_b.user_store.clone(),
2862            lang_registry.clone(),
2863            fs.clone(),
2864            &mut cx_b.to_async(),
2865        )
2866        .await
2867        .unwrap();
2868        let mut params = cx_b.update(WorkspaceParams::test);
2869        params.languages = lang_registry.clone();
2870        params.client = client_b.client.clone();
2871        params.user_store = client_b.user_store.clone();
2872        params.project = project_b;
2873        params.path_openers = path_openers_b.into();
2874
2875        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2876        let editor_b = workspace_b
2877            .update(&mut cx_b, |workspace, cx| {
2878                workspace.open_path((worktree_id, "main.rs").into(), cx)
2879            })
2880            .await
2881            .unwrap()
2882            .downcast::<Editor>()
2883            .unwrap();
2884        fake_language_server
2885            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2886                assert_eq!(
2887                    params.text_document.uri,
2888                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2889                );
2890                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2891                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2892                None
2893            })
2894            .next()
2895            .await;
2896
2897        // Move cursor to a location that contains code actions.
2898        editor_b.update(&mut cx_b, |editor, cx| {
2899            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2900            cx.focus(&editor_b);
2901        });
2902        fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2903            assert_eq!(
2904                params.text_document.uri,
2905                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2906            );
2907            assert_eq!(params.range.start, lsp::Position::new(1, 31));
2908            assert_eq!(params.range.end, lsp::Position::new(1, 31));
2909
2910            Some(vec![lsp::CodeActionOrCommand::CodeAction(
2911                lsp::CodeAction {
2912                    title: "Inline into all callers".to_string(),
2913                    edit: Some(lsp::WorkspaceEdit {
2914                        changes: Some(
2915                            [
2916                                (
2917                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2918                                    vec![lsp::TextEdit::new(
2919                                        lsp::Range::new(
2920                                            lsp::Position::new(1, 22),
2921                                            lsp::Position::new(1, 34),
2922                                        ),
2923                                        "4".to_string(),
2924                                    )],
2925                                ),
2926                                (
2927                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
2928                                    vec![lsp::TextEdit::new(
2929                                        lsp::Range::new(
2930                                            lsp::Position::new(0, 0),
2931                                            lsp::Position::new(0, 27),
2932                                        ),
2933                                        "".to_string(),
2934                                    )],
2935                                ),
2936                            ]
2937                            .into_iter()
2938                            .collect(),
2939                        ),
2940                        ..Default::default()
2941                    }),
2942                    data: Some(json!({
2943                        "codeActionParams": {
2944                            "range": {
2945                                "start": {"line": 1, "column": 31},
2946                                "end": {"line": 1, "column": 31},
2947                            }
2948                        }
2949                    })),
2950                    ..Default::default()
2951                },
2952            )])
2953        });
2954
2955        // Toggle code actions and wait for them to display.
2956        editor_b.update(&mut cx_b, |editor, cx| {
2957            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2958        });
2959        editor_b
2960            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2961            .await;
2962
2963        // Confirming the code action will trigger a resolve request.
2964        let confirm_action = workspace_b
2965            .update(&mut cx_b, |workspace, cx| {
2966                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2967            })
2968            .unwrap();
2969        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2970            lsp::CodeAction {
2971                title: "Inline into all callers".to_string(),
2972                edit: Some(lsp::WorkspaceEdit {
2973                    changes: Some(
2974                        [
2975                            (
2976                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2977                                vec![lsp::TextEdit::new(
2978                                    lsp::Range::new(
2979                                        lsp::Position::new(1, 22),
2980                                        lsp::Position::new(1, 34),
2981                                    ),
2982                                    "4".to_string(),
2983                                )],
2984                            ),
2985                            (
2986                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2987                                vec![lsp::TextEdit::new(
2988                                    lsp::Range::new(
2989                                        lsp::Position::new(0, 0),
2990                                        lsp::Position::new(0, 27),
2991                                    ),
2992                                    "".to_string(),
2993                                )],
2994                            ),
2995                        ]
2996                        .into_iter()
2997                        .collect(),
2998                    ),
2999                    ..Default::default()
3000                }),
3001                ..Default::default()
3002            }
3003        });
3004
3005        // After the action is confirmed, an editor containing both modified files is opened.
3006        confirm_action.await.unwrap();
3007        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3008            workspace
3009                .active_item(cx)
3010                .unwrap()
3011                .downcast::<Editor>()
3012                .unwrap()
3013        });
3014        code_action_editor.update(&mut cx_b, |editor, cx| {
3015            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3016            editor.undo(&Undo, cx);
3017            assert_eq!(
3018                editor.text(cx),
3019                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3020            );
3021            editor.redo(&Redo, cx);
3022            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3023        });
3024    }
3025
3026    #[gpui::test(iterations = 10)]
3027    async fn test_basic_chat(
3028        mut cx_a: TestAppContext,
3029        mut cx_b: TestAppContext,
3030        last_iteration: bool,
3031    ) {
3032        cx_a.foreground().forbid_parking();
3033
3034        // Connect to a server as 2 clients.
3035        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3036        let client_a = server.create_client(&mut cx_a, "user_a").await;
3037        let client_b = server.create_client(&mut cx_b, "user_b").await;
3038
3039        // Create an org that includes these 2 users.
3040        let db = &server.app_state.db;
3041        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3042        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3043            .await
3044            .unwrap();
3045        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3046            .await
3047            .unwrap();
3048
3049        // Create a channel that includes all the users.
3050        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3051        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3052            .await
3053            .unwrap();
3054        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3055            .await
3056            .unwrap();
3057        db.create_channel_message(
3058            channel_id,
3059            client_b.current_user_id(&cx_b),
3060            "hello A, it's B.",
3061            OffsetDateTime::now_utc(),
3062            1,
3063        )
3064        .await
3065        .unwrap();
3066
3067        let channels_a = cx_a
3068            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3069        channels_a
3070            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3071            .await;
3072        channels_a.read_with(&cx_a, |list, _| {
3073            assert_eq!(
3074                list.available_channels().unwrap(),
3075                &[ChannelDetails {
3076                    id: channel_id.to_proto(),
3077                    name: "test-channel".to_string()
3078                }]
3079            )
3080        });
3081        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3082            this.get_channel(channel_id.to_proto(), cx).unwrap()
3083        });
3084        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3085        channel_a
3086            .condition(&cx_a, |channel, _| {
3087                channel_messages(channel)
3088                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3089            })
3090            .await;
3091
3092        let channels_b = cx_b
3093            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3094        channels_b
3095            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3096            .await;
3097        channels_b.read_with(&cx_b, |list, _| {
3098            assert_eq!(
3099                list.available_channels().unwrap(),
3100                &[ChannelDetails {
3101                    id: channel_id.to_proto(),
3102                    name: "test-channel".to_string()
3103                }]
3104            )
3105        });
3106
3107        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3108            this.get_channel(channel_id.to_proto(), cx).unwrap()
3109        });
3110        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3111        channel_b
3112            .condition(&cx_b, |channel, _| {
3113                channel_messages(channel)
3114                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3115            })
3116            .await;
3117
3118        channel_a
3119            .update(&mut cx_a, |channel, cx| {
3120                channel
3121                    .send_message("oh, hi B.".to_string(), cx)
3122                    .unwrap()
3123                    .detach();
3124                let task = channel.send_message("sup".to_string(), cx).unwrap();
3125                assert_eq!(
3126                    channel_messages(channel),
3127                    &[
3128                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3129                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3130                        ("user_a".to_string(), "sup".to_string(), true)
3131                    ]
3132                );
3133                task
3134            })
3135            .await
3136            .unwrap();
3137
3138        channel_b
3139            .condition(&cx_b, |channel, _| {
3140                channel_messages(channel)
3141                    == [
3142                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3143                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3144                        ("user_a".to_string(), "sup".to_string(), false),
3145                    ]
3146            })
3147            .await;
3148
3149        assert_eq!(
3150            server
3151                .state()
3152                .await
3153                .channel(channel_id)
3154                .unwrap()
3155                .connection_ids
3156                .len(),
3157            2
3158        );
3159        cx_b.update(|_| drop(channel_b));
3160        server
3161            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3162            .await;
3163
3164        cx_a.update(|_| drop(channel_a));
3165        server
3166            .condition(|state| state.channel(channel_id).is_none())
3167            .await;
3168    }
3169
3170    #[gpui::test(iterations = 10)]
3171    async fn test_chat_message_validation(mut cx_a: TestAppContext, last_iteration: bool) {
3172        cx_a.foreground().forbid_parking();
3173
3174        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3175        let client_a = server.create_client(&mut cx_a, "user_a").await;
3176
3177        let db = &server.app_state.db;
3178        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3179        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3180        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3181            .await
3182            .unwrap();
3183        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3184            .await
3185            .unwrap();
3186
3187        let channels_a = cx_a
3188            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3189        channels_a
3190            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3191            .await;
3192        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3193            this.get_channel(channel_id.to_proto(), cx).unwrap()
3194        });
3195
3196        // Messages aren't allowed to be too long.
3197        channel_a
3198            .update(&mut cx_a, |channel, cx| {
3199                let long_body = "this is long.\n".repeat(1024);
3200                channel.send_message(long_body, cx).unwrap()
3201            })
3202            .await
3203            .unwrap_err();
3204
3205        // Messages aren't allowed to be blank.
3206        channel_a.update(&mut cx_a, |channel, cx| {
3207            channel.send_message(String::new(), cx).unwrap_err()
3208        });
3209
3210        // Leading and trailing whitespace are trimmed.
3211        channel_a
3212            .update(&mut cx_a, |channel, cx| {
3213                channel
3214                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3215                    .unwrap()
3216            })
3217            .await
3218            .unwrap();
3219        assert_eq!(
3220            db.get_channel_messages(channel_id, 10, None)
3221                .await
3222                .unwrap()
3223                .iter()
3224                .map(|m| &m.body)
3225                .collect::<Vec<_>>(),
3226            &["surrounded by whitespace"]
3227        );
3228    }
3229
3230    #[gpui::test(iterations = 10)]
3231    async fn test_chat_reconnection(
3232        mut cx_a: TestAppContext,
3233        mut cx_b: TestAppContext,
3234        last_iteration: bool,
3235    ) {
3236        cx_a.foreground().forbid_parking();
3237
3238        // Connect to a server as 2 clients.
3239        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3240        let client_a = server.create_client(&mut cx_a, "user_a").await;
3241        let client_b = server.create_client(&mut cx_b, "user_b").await;
3242        let mut status_b = client_b.status();
3243
3244        // Create an org that includes these 2 users.
3245        let db = &server.app_state.db;
3246        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3247        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3248            .await
3249            .unwrap();
3250        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3251            .await
3252            .unwrap();
3253
3254        // Create a channel that includes all the users.
3255        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3256        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3257            .await
3258            .unwrap();
3259        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3260            .await
3261            .unwrap();
3262        db.create_channel_message(
3263            channel_id,
3264            client_b.current_user_id(&cx_b),
3265            "hello A, it's B.",
3266            OffsetDateTime::now_utc(),
3267            2,
3268        )
3269        .await
3270        .unwrap();
3271
3272        let channels_a = cx_a
3273            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3274        channels_a
3275            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3276            .await;
3277
3278        channels_a.read_with(&cx_a, |list, _| {
3279            assert_eq!(
3280                list.available_channels().unwrap(),
3281                &[ChannelDetails {
3282                    id: channel_id.to_proto(),
3283                    name: "test-channel".to_string()
3284                }]
3285            )
3286        });
3287        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3288            this.get_channel(channel_id.to_proto(), cx).unwrap()
3289        });
3290        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3291        channel_a
3292            .condition(&cx_a, |channel, _| {
3293                channel_messages(channel)
3294                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3295            })
3296            .await;
3297
3298        let channels_b = cx_b
3299            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3300        channels_b
3301            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3302            .await;
3303        channels_b.read_with(&cx_b, |list, _| {
3304            assert_eq!(
3305                list.available_channels().unwrap(),
3306                &[ChannelDetails {
3307                    id: channel_id.to_proto(),
3308                    name: "test-channel".to_string()
3309                }]
3310            )
3311        });
3312
3313        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3314            this.get_channel(channel_id.to_proto(), cx).unwrap()
3315        });
3316        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3317        channel_b
3318            .condition(&cx_b, |channel, _| {
3319                channel_messages(channel)
3320                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3321            })
3322            .await;
3323
3324        // Disconnect client B, ensuring we can still access its cached channel data.
3325        server.forbid_connections();
3326        server.disconnect_client(client_b.current_user_id(&cx_b));
3327        while !matches!(
3328            status_b.next().await,
3329            Some(client::Status::ReconnectionError { .. })
3330        ) {}
3331
3332        channels_b.read_with(&cx_b, |channels, _| {
3333            assert_eq!(
3334                channels.available_channels().unwrap(),
3335                [ChannelDetails {
3336                    id: channel_id.to_proto(),
3337                    name: "test-channel".to_string()
3338                }]
3339            )
3340        });
3341        channel_b.read_with(&cx_b, |channel, _| {
3342            assert_eq!(
3343                channel_messages(channel),
3344                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3345            )
3346        });
3347
3348        // Send a message from client B while it is disconnected.
3349        channel_b
3350            .update(&mut cx_b, |channel, cx| {
3351                let task = channel
3352                    .send_message("can you see this?".to_string(), cx)
3353                    .unwrap();
3354                assert_eq!(
3355                    channel_messages(channel),
3356                    &[
3357                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3358                        ("user_b".to_string(), "can you see this?".to_string(), true)
3359                    ]
3360                );
3361                task
3362            })
3363            .await
3364            .unwrap_err();
3365
3366        // Send a message from client A while B is disconnected.
3367        channel_a
3368            .update(&mut cx_a, |channel, cx| {
3369                channel
3370                    .send_message("oh, hi B.".to_string(), cx)
3371                    .unwrap()
3372                    .detach();
3373                let task = channel.send_message("sup".to_string(), cx).unwrap();
3374                assert_eq!(
3375                    channel_messages(channel),
3376                    &[
3377                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3378                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3379                        ("user_a".to_string(), "sup".to_string(), true)
3380                    ]
3381                );
3382                task
3383            })
3384            .await
3385            .unwrap();
3386
3387        // Give client B a chance to reconnect.
3388        server.allow_connections();
3389        cx_b.foreground().advance_clock(Duration::from_secs(10));
3390
3391        // Verify that B sees the new messages upon reconnection, as well as the message client B
3392        // sent while offline.
3393        channel_b
3394            .condition(&cx_b, |channel, _| {
3395                channel_messages(channel)
3396                    == [
3397                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3398                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3399                        ("user_a".to_string(), "sup".to_string(), false),
3400                        ("user_b".to_string(), "can you see this?".to_string(), false),
3401                    ]
3402            })
3403            .await;
3404
3405        // Ensure client A and B can communicate normally after reconnection.
3406        channel_a
3407            .update(&mut cx_a, |channel, cx| {
3408                channel.send_message("you online?".to_string(), cx).unwrap()
3409            })
3410            .await
3411            .unwrap();
3412        channel_b
3413            .condition(&cx_b, |channel, _| {
3414                channel_messages(channel)
3415                    == [
3416                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3417                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3418                        ("user_a".to_string(), "sup".to_string(), false),
3419                        ("user_b".to_string(), "can you see this?".to_string(), false),
3420                        ("user_a".to_string(), "you online?".to_string(), false),
3421                    ]
3422            })
3423            .await;
3424
3425        channel_b
3426            .update(&mut cx_b, |channel, cx| {
3427                channel.send_message("yep".to_string(), cx).unwrap()
3428            })
3429            .await
3430            .unwrap();
3431        channel_a
3432            .condition(&cx_a, |channel, _| {
3433                channel_messages(channel)
3434                    == [
3435                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3436                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3437                        ("user_a".to_string(), "sup".to_string(), false),
3438                        ("user_b".to_string(), "can you see this?".to_string(), false),
3439                        ("user_a".to_string(), "you online?".to_string(), false),
3440                        ("user_b".to_string(), "yep".to_string(), false),
3441                    ]
3442            })
3443            .await;
3444    }
3445
3446    #[gpui::test(iterations = 10)]
3447    async fn test_contacts(
3448        mut cx_a: TestAppContext,
3449        mut cx_b: TestAppContext,
3450        mut cx_c: TestAppContext,
3451        last_iteration: bool,
3452    ) {
3453        cx_a.foreground().forbid_parking();
3454        let lang_registry = Arc::new(LanguageRegistry::new());
3455        let fs = Arc::new(FakeFs::new(cx_a.background()));
3456
3457        // Connect to a server as 3 clients.
3458        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3459        let client_a = server.create_client(&mut cx_a, "user_a").await;
3460        let client_b = server.create_client(&mut cx_b, "user_b").await;
3461        let client_c = server.create_client(&mut cx_c, "user_c").await;
3462
3463        // Share a worktree as client A.
3464        fs.insert_tree(
3465            "/a",
3466            json!({
3467                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3468            }),
3469        )
3470        .await;
3471
3472        let project_a = cx_a.update(|cx| {
3473            Project::local(
3474                client_a.clone(),
3475                client_a.user_store.clone(),
3476                lang_registry.clone(),
3477                fs.clone(),
3478                cx,
3479            )
3480        });
3481        let (worktree_a, _) = project_a
3482            .update(&mut cx_a, |p, cx| {
3483                p.find_or_create_local_worktree("/a", false, cx)
3484            })
3485            .await
3486            .unwrap();
3487        worktree_a
3488            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3489            .await;
3490
3491        client_a
3492            .user_store
3493            .condition(&cx_a, |user_store, _| {
3494                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3495            })
3496            .await;
3497        client_b
3498            .user_store
3499            .condition(&cx_b, |user_store, _| {
3500                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3501            })
3502            .await;
3503        client_c
3504            .user_store
3505            .condition(&cx_c, |user_store, _| {
3506                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3507            })
3508            .await;
3509
3510        let project_id = project_a
3511            .update(&mut cx_a, |project, _| project.next_remote_id())
3512            .await;
3513        project_a
3514            .update(&mut cx_a, |project, cx| project.share(cx))
3515            .await
3516            .unwrap();
3517
3518        let _project_b = Project::remote(
3519            project_id,
3520            client_b.clone(),
3521            client_b.user_store.clone(),
3522            lang_registry.clone(),
3523            fs.clone(),
3524            &mut cx_b.to_async(),
3525        )
3526        .await
3527        .unwrap();
3528
3529        client_a
3530            .user_store
3531            .condition(&cx_a, |user_store, _| {
3532                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3533            })
3534            .await;
3535        client_b
3536            .user_store
3537            .condition(&cx_b, |user_store, _| {
3538                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3539            })
3540            .await;
3541        client_c
3542            .user_store
3543            .condition(&cx_c, |user_store, _| {
3544                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3545            })
3546            .await;
3547
3548        project_a
3549            .condition(&cx_a, |project, _| {
3550                project.collaborators().contains_key(&client_b.peer_id)
3551            })
3552            .await;
3553
3554        cx_a.update(move |_| drop(project_a));
3555        client_a
3556            .user_store
3557            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3558            .await;
3559        client_b
3560            .user_store
3561            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3562            .await;
3563        client_c
3564            .user_store
3565            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3566            .await;
3567
3568        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3569            user_store
3570                .contacts()
3571                .iter()
3572                .map(|contact| {
3573                    let worktrees = contact
3574                        .projects
3575                        .iter()
3576                        .map(|p| {
3577                            (
3578                                p.worktree_root_names[0].as_str(),
3579                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3580                            )
3581                        })
3582                        .collect();
3583                    (contact.user.github_login.as_str(), worktrees)
3584                })
3585                .collect()
3586        }
3587    }
3588
3589    #[gpui::test(iterations = 100)]
3590    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng, last_iteration: bool) {
3591        cx.foreground().forbid_parking();
3592        let max_peers = env::var("MAX_PEERS")
3593            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3594            .unwrap_or(5);
3595        let max_operations = env::var("OPERATIONS")
3596            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3597            .unwrap_or(10);
3598
3599        let rng = Rc::new(RefCell::new(rng));
3600        let lang_registry = Arc::new(LanguageRegistry::new());
3601        let fs = Arc::new(FakeFs::new(cx.background()));
3602        fs.insert_tree(
3603            "/_collab",
3604            json!({
3605                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3606            }),
3607        )
3608        .await;
3609
3610        let operations = Rc::new(Cell::new(0));
3611        let mut server = TestServer::start(cx.foreground(), last_iteration).await;
3612        let mut clients = Vec::new();
3613
3614        let mut next_entity_id = 100000;
3615        let mut host_cx = TestAppContext::new(
3616            cx.foreground_platform(),
3617            cx.platform(),
3618            cx.foreground(),
3619            cx.background(),
3620            cx.font_cache(),
3621            next_entity_id,
3622        );
3623        let host = server.create_client(&mut host_cx, "host").await;
3624        let host_project = host_cx.update(|cx| {
3625            Project::local(
3626                host.client.clone(),
3627                host.user_store.clone(),
3628                lang_registry.clone(),
3629                fs.clone(),
3630                cx,
3631            )
3632        });
3633        let host_project_id = host_project
3634            .update(&mut host_cx, |p, _| p.next_remote_id())
3635            .await;
3636
3637        let (collab_worktree, _) = host_project
3638            .update(&mut host_cx, |project, cx| {
3639                project.find_or_create_local_worktree("/_collab", false, cx)
3640            })
3641            .await
3642            .unwrap();
3643        collab_worktree
3644            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3645            .await;
3646        host_project
3647            .update(&mut host_cx, |project, cx| project.share(cx))
3648            .await
3649            .unwrap();
3650
3651        clients.push(cx.foreground().spawn(host.simulate_host(
3652            host_project.clone(),
3653            operations.clone(),
3654            max_operations,
3655            rng.clone(),
3656            host_cx.clone(),
3657        )));
3658
3659        while operations.get() < max_operations {
3660            cx.background().simulate_random_delay().await;
3661            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3662                operations.set(operations.get() + 1);
3663
3664                let guest_id = clients.len();
3665                log::info!("Adding guest {}", guest_id);
3666                next_entity_id += 100000;
3667                let mut guest_cx = TestAppContext::new(
3668                    cx.foreground_platform(),
3669                    cx.platform(),
3670                    cx.foreground(),
3671                    cx.background(),
3672                    cx.font_cache(),
3673                    next_entity_id,
3674                );
3675                let guest = server
3676                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3677                    .await;
3678                let guest_project = Project::remote(
3679                    host_project_id,
3680                    guest.client.clone(),
3681                    guest.user_store.clone(),
3682                    lang_registry.clone(),
3683                    fs.clone(),
3684                    &mut guest_cx.to_async(),
3685                )
3686                .await
3687                .unwrap();
3688                clients.push(cx.foreground().spawn(guest.simulate_guest(
3689                    guest_id,
3690                    guest_project,
3691                    operations.clone(),
3692                    max_operations,
3693                    rng.clone(),
3694                    guest_cx,
3695                )));
3696
3697                log::info!("Guest {} added", guest_id);
3698            }
3699        }
3700
3701        let clients = futures::future::join_all(clients).await;
3702        cx.foreground().run_until_parked();
3703
3704        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3705            project
3706                .worktrees(cx)
3707                .map(|worktree| {
3708                    let snapshot = worktree.read(cx).snapshot();
3709                    (snapshot.id(), snapshot)
3710                })
3711                .collect::<BTreeMap<_, _>>()
3712        });
3713
3714        for (ix, (client_a, cx_a)) in clients.iter().enumerate() {
3715            let worktree_snapshots =
3716                client_a
3717                    .project
3718                    .as_ref()
3719                    .unwrap()
3720                    .read_with(cx_a, |project, cx| {
3721                        project
3722                            .worktrees(cx)
3723                            .map(|worktree| {
3724                                let snapshot = worktree.read(cx).snapshot();
3725                                (snapshot.id(), snapshot)
3726                            })
3727                            .collect::<BTreeMap<_, _>>()
3728                    });
3729
3730            assert_eq!(
3731                worktree_snapshots.keys().collect::<Vec<_>>(),
3732                host_worktree_snapshots.keys().collect::<Vec<_>>(),
3733                "guest {} has different worktrees than the host",
3734                ix
3735            );
3736            for (id, host_snapshot) in &host_worktree_snapshots {
3737                let guest_snapshot = &worktree_snapshots[id];
3738                assert_eq!(
3739                    guest_snapshot.root_name(),
3740                    host_snapshot.root_name(),
3741                    "guest {} has different root name than the host for worktree {}",
3742                    ix,
3743                    id
3744                );
3745                assert_eq!(
3746                    guest_snapshot.entries(false).collect::<Vec<_>>(),
3747                    host_snapshot.entries(false).collect::<Vec<_>>(),
3748                    "guest {} has different snapshot than the host for worktree {}",
3749                    ix,
3750                    id
3751                );
3752            }
3753
3754            for (client_b, cx_b) in &clients[ix + 1..] {
3755                for buffer_a in &client_a.buffers {
3756                    let buffer_id = buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id());
3757                    if let Some(buffer_b) = client_b.buffers.iter().find(|buffer| {
3758                        buffer.read_with(cx_b, |buffer, _| buffer.remote_id() == buffer_id)
3759                    }) {
3760                        assert_eq!(
3761                            buffer_a.read_with(cx_a, |buffer, _| buffer.text()),
3762                            buffer_b.read_with(cx_b, |buffer, _| buffer.text())
3763                        );
3764                    }
3765                }
3766            }
3767        }
3768    }
3769
3770    struct TestServer {
3771        peer: Arc<Peer>,
3772        app_state: Arc<AppState>,
3773        server: Arc<Server>,
3774        foreground: Rc<executor::Foreground>,
3775        notifications: mpsc::Receiver<()>,
3776        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3777        forbid_connections: Arc<AtomicBool>,
3778        _test_db: TestDb,
3779    }
3780
3781    impl TestServer {
3782        async fn start(foreground: Rc<executor::Foreground>, clean_db_pool_on_drop: bool) -> Self {
3783            let mut test_db = TestDb::new();
3784            test_db.set_clean_pool_on_drop(clean_db_pool_on_drop);
3785            let app_state = Self::build_app_state(&test_db).await;
3786            let peer = Peer::new();
3787            let notifications = mpsc::channel(128);
3788            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3789            Self {
3790                peer,
3791                app_state,
3792                server,
3793                foreground,
3794                notifications: notifications.1,
3795                connection_killers: Default::default(),
3796                forbid_connections: Default::default(),
3797                _test_db: test_db,
3798            }
3799        }
3800
3801        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3802            let http = FakeHttpClient::with_404_response();
3803            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3804            let client_name = name.to_string();
3805            let mut client = Client::new(http.clone());
3806            let server = self.server.clone();
3807            let connection_killers = self.connection_killers.clone();
3808            let forbid_connections = self.forbid_connections.clone();
3809            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3810
3811            Arc::get_mut(&mut client)
3812                .unwrap()
3813                .override_authenticate(move |cx| {
3814                    cx.spawn(|_| async move {
3815                        let access_token = "the-token".to_string();
3816                        Ok(Credentials {
3817                            user_id: user_id.0 as u64,
3818                            access_token,
3819                        })
3820                    })
3821                })
3822                .override_establish_connection(move |credentials, cx| {
3823                    assert_eq!(credentials.user_id, user_id.0 as u64);
3824                    assert_eq!(credentials.access_token, "the-token");
3825
3826                    let server = server.clone();
3827                    let connection_killers = connection_killers.clone();
3828                    let forbid_connections = forbid_connections.clone();
3829                    let client_name = client_name.clone();
3830                    let connection_id_tx = connection_id_tx.clone();
3831                    cx.spawn(move |cx| async move {
3832                        if forbid_connections.load(SeqCst) {
3833                            Err(EstablishConnectionError::other(anyhow!(
3834                                "server is forbidding connections"
3835                            )))
3836                        } else {
3837                            let (client_conn, server_conn, kill_conn) =
3838                                Connection::in_memory(cx.background());
3839                            connection_killers.lock().insert(user_id, kill_conn);
3840                            cx.background()
3841                                .spawn(server.handle_connection(
3842                                    server_conn,
3843                                    client_name,
3844                                    user_id,
3845                                    Some(connection_id_tx),
3846                                    cx.background(),
3847                                ))
3848                                .detach();
3849                            Ok(client_conn)
3850                        }
3851                    })
3852                });
3853
3854            client
3855                .authenticate_and_connect(&cx.to_async())
3856                .await
3857                .unwrap();
3858
3859            Channel::init(&client);
3860            Project::init(&client);
3861
3862            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3863            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3864            let mut authed_user =
3865                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3866            while authed_user.next().await.unwrap().is_none() {}
3867
3868            TestClient {
3869                client,
3870                peer_id,
3871                user_store,
3872                project: Default::default(),
3873                buffers: Default::default(),
3874            }
3875        }
3876
3877        fn disconnect_client(&self, user_id: UserId) {
3878            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3879                let _ = kill_conn.try_send(Some(()));
3880            }
3881        }
3882
3883        fn forbid_connections(&self) {
3884            self.forbid_connections.store(true, SeqCst);
3885        }
3886
3887        fn allow_connections(&self) {
3888            self.forbid_connections.store(false, SeqCst);
3889        }
3890
3891        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3892            let mut config = Config::default();
3893            config.session_secret = "a".repeat(32);
3894            config.database_url = test_db.url.clone();
3895            let github_client = github::AppClient::test();
3896            Arc::new(AppState {
3897                db: test_db.db().clone(),
3898                handlebars: Default::default(),
3899                auth_client: auth::build_client("", ""),
3900                repo_client: github::RepoClient::test(&github_client),
3901                github_client,
3902                config,
3903            })
3904        }
3905
3906        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3907            self.server.store.read()
3908        }
3909
3910        async fn condition<F>(&mut self, mut predicate: F)
3911        where
3912            F: FnMut(&Store) -> bool,
3913        {
3914            async_std::future::timeout(Duration::from_millis(500), async {
3915                while !(predicate)(&*self.server.store.read()) {
3916                    self.foreground.start_waiting();
3917                    self.notifications.next().await;
3918                    self.foreground.finish_waiting();
3919                }
3920            })
3921            .await
3922            .expect("condition timed out");
3923        }
3924    }
3925
3926    impl Drop for TestServer {
3927        fn drop(&mut self) {
3928            self.peer.reset();
3929        }
3930    }
3931
3932    struct TestClient {
3933        client: Arc<Client>,
3934        pub peer_id: PeerId,
3935        pub user_store: ModelHandle<UserStore>,
3936        project: Option<ModelHandle<Project>>,
3937        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3938    }
3939
3940    impl Deref for TestClient {
3941        type Target = Arc<Client>;
3942
3943        fn deref(&self) -> &Self::Target {
3944            &self.client
3945        }
3946    }
3947
3948    impl TestClient {
3949        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3950            UserId::from_proto(
3951                self.user_store
3952                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3953            )
3954        }
3955
3956        async fn simulate_host(
3957            mut self,
3958            project: ModelHandle<Project>,
3959            operations: Rc<Cell<usize>>,
3960            max_operations: usize,
3961            rng: Rc<RefCell<StdRng>>,
3962            mut cx: TestAppContext,
3963        ) -> (Self, TestAppContext) {
3964            let fs = project.read_with(&cx, |project, _| project.fs().clone());
3965            let mut files: Vec<PathBuf> = Default::default();
3966            while operations.get() < max_operations {
3967                operations.set(operations.get() + 1);
3968
3969                let distribution = rng.borrow_mut().gen_range(0..100);
3970                match distribution {
3971                    0..=20 if !files.is_empty() => {
3972                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
3973                        while let Some(parent_path) = path.parent() {
3974                            path = parent_path;
3975                            if rng.borrow_mut().gen() {
3976                                break;
3977                            }
3978                        }
3979
3980                        log::info!("Host: find/create local worktree {:?}", path);
3981                        project
3982                            .update(&mut cx, |project, cx| {
3983                                project.find_or_create_local_worktree(path, false, cx)
3984                            })
3985                            .await
3986                            .unwrap();
3987                    }
3988                    10..=80 if !files.is_empty() => {
3989                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
3990                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
3991                            let (worktree, path) = project
3992                                .update(&mut cx, |project, cx| {
3993                                    project.find_or_create_local_worktree(file, false, cx)
3994                                })
3995                                .await
3996                                .unwrap();
3997                            let project_path =
3998                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
3999                            log::info!("Host: opening path {:?}", project_path);
4000                            let buffer = project
4001                                .update(&mut cx, |project, cx| {
4002                                    project.open_buffer(project_path, cx)
4003                                })
4004                                .await
4005                                .unwrap();
4006                            self.buffers.insert(buffer.clone());
4007                            buffer
4008                        } else {
4009                            self.buffers
4010                                .iter()
4011                                .choose(&mut *rng.borrow_mut())
4012                                .unwrap()
4013                                .clone()
4014                        };
4015
4016                        if rng.borrow_mut().gen_bool(0.1) {
4017                            cx.update(|cx| {
4018                                log::info!(
4019                                    "Host: dropping buffer {:?}",
4020                                    buffer.read(cx).file().unwrap().full_path(cx)
4021                                );
4022                                self.buffers.remove(&buffer);
4023                                drop(buffer);
4024                            });
4025                        } else {
4026                            buffer.update(&mut cx, |buffer, cx| {
4027                                log::info!(
4028                                    "Host: updating buffer {:?}",
4029                                    buffer.file().unwrap().full_path(cx)
4030                                );
4031                                buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4032                            });
4033                        }
4034                    }
4035                    _ => loop {
4036                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
4037                        let mut path = PathBuf::new();
4038                        path.push("/");
4039                        for _ in 0..path_component_count {
4040                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4041                            path.push(std::str::from_utf8(&[letter]).unwrap());
4042                        }
4043                        let parent_path = path.parent().unwrap();
4044
4045                        log::info!("Host: creating file {:?}", path);
4046                        if fs.create_dir(&parent_path).await.is_ok()
4047                            && fs.create_file(&path, Default::default()).await.is_ok()
4048                        {
4049                            files.push(path);
4050                            break;
4051                        } else {
4052                            log::info!("Host: cannot create file");
4053                        }
4054                    },
4055                }
4056
4057                cx.background().simulate_random_delay().await;
4058            }
4059
4060            self.project = Some(project);
4061            (self, cx)
4062        }
4063
4064        pub async fn simulate_guest(
4065            mut self,
4066            guest_id: usize,
4067            project: ModelHandle<Project>,
4068            operations: Rc<Cell<usize>>,
4069            max_operations: usize,
4070            rng: Rc<RefCell<StdRng>>,
4071            mut cx: TestAppContext,
4072        ) -> (Self, TestAppContext) {
4073            while operations.get() < max_operations {
4074                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4075                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4076                        project
4077                            .worktrees(&cx)
4078                            .filter(|worktree| {
4079                                worktree.read(cx).entries(false).any(|e| e.is_file())
4080                            })
4081                            .choose(&mut *rng.borrow_mut())
4082                    }) {
4083                        worktree
4084                    } else {
4085                        cx.background().simulate_random_delay().await;
4086                        continue;
4087                    };
4088
4089                    operations.set(operations.get() + 1);
4090                    let project_path = worktree.read_with(&cx, |worktree, _| {
4091                        let entry = worktree
4092                            .entries(false)
4093                            .filter(|e| e.is_file())
4094                            .choose(&mut *rng.borrow_mut())
4095                            .unwrap();
4096                        (worktree.id(), entry.path.clone())
4097                    });
4098                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4099                    let buffer = project
4100                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4101                        .await
4102                        .unwrap();
4103                    self.buffers.insert(buffer.clone());
4104                    buffer
4105                } else {
4106                    self.buffers
4107                        .iter()
4108                        .choose(&mut *rng.borrow_mut())
4109                        .unwrap()
4110                        .clone()
4111                };
4112
4113                if rng.borrow_mut().gen_bool(0.1) {
4114                    cx.update(|cx| {
4115                        log::info!(
4116                            "Guest {}: dropping buffer {:?}",
4117                            guest_id,
4118                            buffer.read(cx).file().unwrap().full_path(cx)
4119                        );
4120                        self.buffers.remove(&buffer);
4121                        drop(buffer);
4122                    });
4123                } else {
4124                    buffer.update(&mut cx, |buffer, cx| {
4125                        log::info!(
4126                            "Guest {}: updating buffer {:?}",
4127                            guest_id,
4128                            buffer.file().unwrap().full_path(cx)
4129                        );
4130                        buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4131                    });
4132                }
4133
4134                cx.background().simulate_random_delay().await;
4135            }
4136
4137            self.project = Some(project);
4138            (self, cx)
4139        }
4140    }
4141
4142    impl Executor for Arc<gpui::executor::Background> {
4143        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4144            self.spawn(future).detach();
4145        }
4146    }
4147
4148    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4149        channel
4150            .messages()
4151            .cursor::<()>()
4152            .map(|m| {
4153                (
4154                    m.sender.github_login.clone(),
4155                    m.body.clone(),
4156                    m.is_pending(),
4157                )
4158            })
4159            .collect()
4160    }
4161
4162    struct EmptyView;
4163
4164    impl gpui::Entity for EmptyView {
4165        type Event = ();
4166    }
4167
4168    impl gpui::View for EmptyView {
4169        fn ui_name() -> &'static str {
4170            "empty view"
4171        }
4172
4173        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4174            gpui::Element::boxed(gpui::elements::Empty)
4175        }
4176    }
4177}