rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44pub trait Executor {
  45    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  46}
  47
  48pub struct RealExecutor;
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_request_handler(Server::ping)
  69            .add_request_handler(Server::register_project)
  70            .add_message_handler(Server::unregister_project)
  71            .add_request_handler(Server::share_project)
  72            .add_message_handler(Server::unshare_project)
  73            .add_request_handler(Server::join_project)
  74            .add_message_handler(Server::leave_project)
  75            .add_request_handler(Server::register_worktree)
  76            .add_message_handler(Server::unregister_worktree)
  77            .add_request_handler(Server::share_worktree)
  78            .add_request_handler(Server::update_worktree)
  79            .add_message_handler(Server::update_diagnostic_summary)
  80            .add_message_handler(Server::disk_based_diagnostics_updating)
  81            .add_message_handler(Server::disk_based_diagnostics_updated)
  82            .add_request_handler(Server::get_definition)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::get_channels)
  96            .add_request_handler(Server::get_users)
  97            .add_request_handler(Server::join_channel)
  98            .add_message_handler(Server::leave_channel)
  99            .add_request_handler(Server::send_channel_message)
 100            .add_request_handler(Server::get_channel_messages);
 101
 102        Arc::new(server)
 103    }
 104
 105    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 106    where
 107        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 108        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 109        M: EnvelopedMessage,
 110    {
 111        let prev_handler = self.handlers.insert(
 112            TypeId::of::<M>(),
 113            Box::new(move |server, envelope| {
 114                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 115                (handler)(server, *envelope).boxed()
 116            }),
 117        );
 118        if prev_handler.is_some() {
 119            panic!("registered a handler for the same message twice");
 120        }
 121        self
 122    }
 123
 124    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 128        M: RequestMessage,
 129    {
 130        self.add_message_handler(move |server, envelope| {
 131            let receipt = envelope.receipt();
 132            let response = (handler)(server.clone(), envelope);
 133            async move {
 134                match response.await {
 135                    Ok(response) => {
 136                        server.peer.respond(receipt, response)?;
 137                        Ok(())
 138                    }
 139                    Err(error) => {
 140                        server.peer.respond_with_error(
 141                            receipt,
 142                            proto::Error {
 143                                message: error.to_string(),
 144                            },
 145                        )?;
 146                        Err(error)
 147                    }
 148                }
 149            }
 150        })
 151    }
 152
 153    pub fn handle_connection<E: Executor>(
 154        self: &Arc<Self>,
 155        connection: Connection,
 156        addr: String,
 157        user_id: UserId,
 158        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 159        executor: E,
 160    ) -> impl Future<Output = ()> {
 161        let mut this = self.clone();
 162        async move {
 163            let (connection_id, handle_io, mut incoming_rx) =
 164                this.peer.add_connection(connection).await;
 165
 166            if let Some(send_connection_id) = send_connection_id.as_mut() {
 167                let _ = send_connection_id.send(connection_id).await;
 168            }
 169
 170            this.state_mut().add_connection(connection_id, user_id);
 171            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 172                log::error!("error updating contacts for {:?}: {}", user_id, err);
 173            }
 174
 175            let handle_io = handle_io.fuse();
 176            futures::pin_mut!(handle_io);
 177            loop {
 178                let next_message = incoming_rx.next().fuse();
 179                futures::pin_mut!(next_message);
 180                futures::select_biased! {
 181                    result = handle_io => {
 182                        if let Err(err) = result {
 183                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 184                        }
 185                        break;
 186                    }
 187                    message = next_message => {
 188                        if let Some(message) = message {
 189                            let start_time = Instant::now();
 190                            let type_name = message.payload_type_name();
 191                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 192                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 193                                let notifications = this.notifications.clone();
 194                                let is_background = message.is_background();
 195                                let handle_message = (handler)(this.clone(), message);
 196                                let handle_message = async move {
 197                                    if let Err(err) = handle_message.await {
 198                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 199                                    } else {
 200                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 201                                    }
 202                                    if let Some(mut notifications) = notifications {
 203                                        let _ = notifications.send(()).await;
 204                                    }
 205                                };
 206                                if is_background {
 207                                    executor.spawn_detached(handle_message);
 208                                } else {
 209                                    handle_message.await;
 210                                }
 211                            } else {
 212                                log::warn!("unhandled message: {}", type_name);
 213                            }
 214                        } else {
 215                            log::info!("rpc connection closed {:?}", addr);
 216                            break;
 217                        }
 218                    }
 219                }
 220            }
 221
 222            if let Err(err) = this.sign_out(connection_id).await {
 223                log::error!("error signing out connection {:?} - {:?}", addr, err);
 224            }
 225        }
 226    }
 227
 228    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 229        self.peer.disconnect(connection_id);
 230        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 231
 232        for (project_id, project) in removed_connection.hosted_projects {
 233            if let Some(share) = project.share {
 234                broadcast(
 235                    connection_id,
 236                    share.guests.keys().copied().collect(),
 237                    |conn_id| {
 238                        self.peer
 239                            .send(conn_id, proto::UnshareProject { project_id })
 240                    },
 241                )?;
 242            }
 243        }
 244
 245        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 246            broadcast(connection_id, peer_ids, |conn_id| {
 247                self.peer.send(
 248                    conn_id,
 249                    proto::RemoveProjectCollaborator {
 250                        project_id,
 251                        peer_id: connection_id.0,
 252                    },
 253                )
 254            })?;
 255        }
 256
 257        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 258        Ok(())
 259    }
 260
 261    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 262        Ok(proto::Ack {})
 263    }
 264
 265    async fn register_project(
 266        mut self: Arc<Server>,
 267        request: TypedEnvelope<proto::RegisterProject>,
 268    ) -> tide::Result<proto::RegisterProjectResponse> {
 269        let project_id = {
 270            let mut state = self.state_mut();
 271            let user_id = state.user_id_for_connection(request.sender_id)?;
 272            state.register_project(request.sender_id, user_id)
 273        };
 274        Ok(proto::RegisterProjectResponse { project_id })
 275    }
 276
 277    async fn unregister_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::UnregisterProject>,
 280    ) -> tide::Result<()> {
 281        let project = self
 282            .state_mut()
 283            .unregister_project(request.payload.project_id, request.sender_id)?;
 284        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 285        Ok(())
 286    }
 287
 288    async fn share_project(
 289        mut self: Arc<Server>,
 290        request: TypedEnvelope<proto::ShareProject>,
 291    ) -> tide::Result<proto::Ack> {
 292        self.state_mut()
 293            .share_project(request.payload.project_id, request.sender_id);
 294        Ok(proto::Ack {})
 295    }
 296
 297    async fn unshare_project(
 298        mut self: Arc<Server>,
 299        request: TypedEnvelope<proto::UnshareProject>,
 300    ) -> tide::Result<()> {
 301        let project_id = request.payload.project_id;
 302        let project = self
 303            .state_mut()
 304            .unshare_project(project_id, request.sender_id)?;
 305
 306        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 307            self.peer
 308                .send(conn_id, proto::UnshareProject { project_id })
 309        })?;
 310        self.update_contacts_for_users(&project.authorized_user_ids)?;
 311        Ok(())
 312    }
 313
 314    async fn join_project(
 315        mut self: Arc<Server>,
 316        request: TypedEnvelope<proto::JoinProject>,
 317    ) -> tide::Result<proto::JoinProjectResponse> {
 318        let project_id = request.payload.project_id;
 319
 320        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 321        let (response, connection_ids, contact_user_ids) = self
 322            .state_mut()
 323            .join_project(request.sender_id, user_id, project_id)
 324            .and_then(|joined| {
 325                let share = joined.project.share()?;
 326                let peer_count = share.guests.len();
 327                let mut collaborators = Vec::with_capacity(peer_count);
 328                collaborators.push(proto::Collaborator {
 329                    peer_id: joined.project.host_connection_id.0,
 330                    replica_id: 0,
 331                    user_id: joined.project.host_user_id.to_proto(),
 332                });
 333                let worktrees = joined
 334                    .project
 335                    .worktrees
 336                    .iter()
 337                    .filter_map(|(id, worktree)| {
 338                        worktree.share.as_ref().map(|share| proto::Worktree {
 339                            id: *id,
 340                            root_name: worktree.root_name.clone(),
 341                            entries: share.entries.values().cloned().collect(),
 342                            diagnostic_summaries: share
 343                                .diagnostic_summaries
 344                                .values()
 345                                .cloned()
 346                                .collect(),
 347                            weak: worktree.weak,
 348                            next_update_id: share.next_update_id as u64,
 349                        })
 350                    })
 351                    .collect();
 352                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 353                    if *peer_conn_id != request.sender_id {
 354                        collaborators.push(proto::Collaborator {
 355                            peer_id: peer_conn_id.0,
 356                            replica_id: *peer_replica_id as u32,
 357                            user_id: peer_user_id.to_proto(),
 358                        });
 359                    }
 360                }
 361                let response = proto::JoinProjectResponse {
 362                    worktrees,
 363                    replica_id: joined.replica_id as u32,
 364                    collaborators,
 365                };
 366                let connection_ids = joined.project.connection_ids();
 367                let contact_user_ids = joined.project.authorized_user_ids();
 368                Ok((response, connection_ids, contact_user_ids))
 369            })?;
 370
 371        broadcast(request.sender_id, connection_ids, |conn_id| {
 372            self.peer.send(
 373                conn_id,
 374                proto::AddProjectCollaborator {
 375                    project_id,
 376                    collaborator: Some(proto::Collaborator {
 377                        peer_id: request.sender_id.0,
 378                        replica_id: response.replica_id,
 379                        user_id: user_id.to_proto(),
 380                    }),
 381                },
 382            )
 383        })?;
 384        self.update_contacts_for_users(&contact_user_ids)?;
 385        Ok(response)
 386    }
 387
 388    async fn leave_project(
 389        mut self: Arc<Server>,
 390        request: TypedEnvelope<proto::LeaveProject>,
 391    ) -> tide::Result<()> {
 392        let sender_id = request.sender_id;
 393        let project_id = request.payload.project_id;
 394        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 395
 396        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::RemoveProjectCollaborator {
 400                    project_id,
 401                    peer_id: sender_id.0,
 402                },
 403            )
 404        })?;
 405        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 406
 407        Ok(())
 408    }
 409
 410    async fn register_worktree(
 411        mut self: Arc<Server>,
 412        request: TypedEnvelope<proto::RegisterWorktree>,
 413    ) -> tide::Result<proto::Ack> {
 414        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 415
 416        let mut contact_user_ids = HashSet::default();
 417        contact_user_ids.insert(host_user_id);
 418        for github_login in request.payload.authorized_logins {
 419            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 420            contact_user_ids.insert(contact_user_id);
 421        }
 422
 423        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 424        self.state_mut().register_worktree(
 425            request.payload.project_id,
 426            request.payload.worktree_id,
 427            request.sender_id,
 428            Worktree {
 429                authorized_user_ids: contact_user_ids.clone(),
 430                root_name: request.payload.root_name,
 431                share: None,
 432                weak: false,
 433            },
 434        )?;
 435        self.update_contacts_for_users(&contact_user_ids)?;
 436        Ok(proto::Ack {})
 437    }
 438
 439    async fn unregister_worktree(
 440        mut self: Arc<Server>,
 441        request: TypedEnvelope<proto::UnregisterWorktree>,
 442    ) -> tide::Result<()> {
 443        let project_id = request.payload.project_id;
 444        let worktree_id = request.payload.worktree_id;
 445        let (worktree, guest_connection_ids) =
 446            self.state_mut()
 447                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 448        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 449            self.peer.send(
 450                conn_id,
 451                proto::UnregisterWorktree {
 452                    project_id,
 453                    worktree_id,
 454                },
 455            )
 456        })?;
 457        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 458        Ok(())
 459    }
 460
 461    async fn share_worktree(
 462        mut self: Arc<Server>,
 463        mut request: TypedEnvelope<proto::ShareWorktree>,
 464    ) -> tide::Result<proto::Ack> {
 465        let worktree = request
 466            .payload
 467            .worktree
 468            .as_mut()
 469            .ok_or_else(|| anyhow!("missing worktree"))?;
 470        let entries = worktree
 471            .entries
 472            .iter()
 473            .map(|entry| (entry.id, entry.clone()))
 474            .collect();
 475        let diagnostic_summaries = worktree
 476            .diagnostic_summaries
 477            .iter()
 478            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 479            .collect();
 480
 481        let shared_worktree = self.state_mut().share_worktree(
 482            request.payload.project_id,
 483            worktree.id,
 484            request.sender_id,
 485            entries,
 486            diagnostic_summaries,
 487            worktree.next_update_id,
 488        )?;
 489
 490        broadcast(
 491            request.sender_id,
 492            shared_worktree.connection_ids,
 493            |connection_id| {
 494                self.peer
 495                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 496            },
 497        )?;
 498        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 499
 500        Ok(proto::Ack {})
 501    }
 502
 503    async fn update_worktree(
 504        mut self: Arc<Server>,
 505        request: TypedEnvelope<proto::UpdateWorktree>,
 506    ) -> tide::Result<proto::Ack> {
 507        let connection_ids = self.state_mut().update_worktree(
 508            request.sender_id,
 509            request.payload.project_id,
 510            request.payload.worktree_id,
 511            request.payload.id,
 512            &request.payload.removed_entries,
 513            &request.payload.updated_entries,
 514        )?;
 515
 516        broadcast(request.sender_id, connection_ids, |connection_id| {
 517            self.peer
 518                .forward_send(request.sender_id, connection_id, request.payload.clone())
 519        })?;
 520
 521        Ok(proto::Ack {})
 522    }
 523
 524    async fn update_diagnostic_summary(
 525        mut self: Arc<Server>,
 526        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 527    ) -> tide::Result<()> {
 528        let summary = request
 529            .payload
 530            .summary
 531            .clone()
 532            .ok_or_else(|| anyhow!("invalid summary"))?;
 533        let receiver_ids = self.state_mut().update_diagnostic_summary(
 534            request.payload.project_id,
 535            request.payload.worktree_id,
 536            request.sender_id,
 537            summary,
 538        )?;
 539
 540        broadcast(request.sender_id, receiver_ids, |connection_id| {
 541            self.peer
 542                .forward_send(request.sender_id, connection_id, request.payload.clone())
 543        })?;
 544        Ok(())
 545    }
 546
 547    async fn disk_based_diagnostics_updating(
 548        self: Arc<Server>,
 549        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 550    ) -> tide::Result<()> {
 551        let receiver_ids = self
 552            .state()
 553            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 554        broadcast(request.sender_id, receiver_ids, |connection_id| {
 555            self.peer
 556                .forward_send(request.sender_id, connection_id, request.payload.clone())
 557        })?;
 558        Ok(())
 559    }
 560
 561    async fn disk_based_diagnostics_updated(
 562        self: Arc<Server>,
 563        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 564    ) -> tide::Result<()> {
 565        let receiver_ids = self
 566            .state()
 567            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 568        broadcast(request.sender_id, receiver_ids, |connection_id| {
 569            self.peer
 570                .forward_send(request.sender_id, connection_id, request.payload.clone())
 571        })?;
 572        Ok(())
 573    }
 574
 575    async fn get_definition(
 576        self: Arc<Server>,
 577        request: TypedEnvelope<proto::GetDefinition>,
 578    ) -> tide::Result<proto::GetDefinitionResponse> {
 579        let host_connection_id = self
 580            .state()
 581            .read_project(request.payload.project_id, request.sender_id)?
 582            .host_connection_id;
 583        Ok(self
 584            .peer
 585            .forward_request(request.sender_id, host_connection_id, request.payload)
 586            .await?)
 587    }
 588
 589    async fn open_buffer(
 590        self: Arc<Server>,
 591        request: TypedEnvelope<proto::OpenBuffer>,
 592    ) -> tide::Result<proto::OpenBufferResponse> {
 593        let host_connection_id = self
 594            .state()
 595            .read_project(request.payload.project_id, request.sender_id)?
 596            .host_connection_id;
 597        Ok(self
 598            .peer
 599            .forward_request(request.sender_id, host_connection_id, request.payload)
 600            .await?)
 601    }
 602
 603    async fn close_buffer(
 604        self: Arc<Server>,
 605        request: TypedEnvelope<proto::CloseBuffer>,
 606    ) -> tide::Result<()> {
 607        let host_connection_id = self
 608            .state()
 609            .read_project(request.payload.project_id, request.sender_id)?
 610            .host_connection_id;
 611        self.peer
 612            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 613        Ok(())
 614    }
 615
 616    async fn save_buffer(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::SaveBuffer>,
 619    ) -> tide::Result<proto::BufferSaved> {
 620        let host;
 621        let mut guests;
 622        {
 623            let state = self.state();
 624            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 625            host = project.host_connection_id;
 626            guests = project.guest_connection_ids()
 627        }
 628
 629        let response = self
 630            .peer
 631            .forward_request(request.sender_id, host, request.payload.clone())
 632            .await?;
 633
 634        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 635        broadcast(host, guests, |conn_id| {
 636            self.peer.forward_send(host, conn_id, response.clone())
 637        })?;
 638
 639        Ok(response)
 640    }
 641
 642    async fn format_buffers(
 643        self: Arc<Server>,
 644        request: TypedEnvelope<proto::FormatBuffers>,
 645    ) -> tide::Result<proto::FormatBuffersResponse> {
 646        let host = self
 647            .state()
 648            .read_project(request.payload.project_id, request.sender_id)?
 649            .host_connection_id;
 650        Ok(self
 651            .peer
 652            .forward_request(request.sender_id, host, request.payload.clone())
 653            .await?)
 654    }
 655
 656    async fn get_completions(
 657        self: Arc<Server>,
 658        request: TypedEnvelope<proto::GetCompletions>,
 659    ) -> tide::Result<proto::GetCompletionsResponse> {
 660        let host = self
 661            .state()
 662            .read_project(request.payload.project_id, request.sender_id)?
 663            .host_connection_id;
 664        Ok(self
 665            .peer
 666            .forward_request(request.sender_id, host, request.payload.clone())
 667            .await?)
 668    }
 669
 670    async fn apply_additional_edits_for_completion(
 671        self: Arc<Server>,
 672        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 673    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 674        let host = self
 675            .state()
 676            .read_project(request.payload.project_id, request.sender_id)?
 677            .host_connection_id;
 678        Ok(self
 679            .peer
 680            .forward_request(request.sender_id, host, request.payload.clone())
 681            .await?)
 682    }
 683
 684    async fn get_code_actions(
 685        self: Arc<Server>,
 686        request: TypedEnvelope<proto::GetCodeActions>,
 687    ) -> tide::Result<proto::GetCodeActionsResponse> {
 688        let host = self
 689            .state()
 690            .read_project(request.payload.project_id, request.sender_id)?
 691            .host_connection_id;
 692        Ok(self
 693            .peer
 694            .forward_request(request.sender_id, host, request.payload.clone())
 695            .await?)
 696    }
 697
 698    async fn apply_code_action(
 699        self: Arc<Server>,
 700        request: TypedEnvelope<proto::ApplyCodeAction>,
 701    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 702        let host = self
 703            .state()
 704            .read_project(request.payload.project_id, request.sender_id)?
 705            .host_connection_id;
 706        Ok(self
 707            .peer
 708            .forward_request(request.sender_id, host, request.payload.clone())
 709            .await?)
 710    }
 711
 712    async fn update_buffer(
 713        self: Arc<Server>,
 714        request: TypedEnvelope<proto::UpdateBuffer>,
 715    ) -> tide::Result<proto::Ack> {
 716        let receiver_ids = self
 717            .state()
 718            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 719        broadcast(request.sender_id, receiver_ids, |connection_id| {
 720            self.peer
 721                .forward_send(request.sender_id, connection_id, request.payload.clone())
 722        })?;
 723        Ok(proto::Ack {})
 724    }
 725
 726    async fn update_buffer_file(
 727        self: Arc<Server>,
 728        request: TypedEnvelope<proto::UpdateBufferFile>,
 729    ) -> tide::Result<()> {
 730        let receiver_ids = self
 731            .state()
 732            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 733        broadcast(request.sender_id, receiver_ids, |connection_id| {
 734            self.peer
 735                .forward_send(request.sender_id, connection_id, request.payload.clone())
 736        })?;
 737        Ok(())
 738    }
 739
 740    async fn buffer_reloaded(
 741        self: Arc<Server>,
 742        request: TypedEnvelope<proto::BufferReloaded>,
 743    ) -> tide::Result<()> {
 744        let receiver_ids = self
 745            .state()
 746            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 747        broadcast(request.sender_id, receiver_ids, |connection_id| {
 748            self.peer
 749                .forward_send(request.sender_id, connection_id, request.payload.clone())
 750        })?;
 751        Ok(())
 752    }
 753
 754    async fn buffer_saved(
 755        self: Arc<Server>,
 756        request: TypedEnvelope<proto::BufferSaved>,
 757    ) -> tide::Result<()> {
 758        let receiver_ids = self
 759            .state()
 760            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 761        broadcast(request.sender_id, receiver_ids, |connection_id| {
 762            self.peer
 763                .forward_send(request.sender_id, connection_id, request.payload.clone())
 764        })?;
 765        Ok(())
 766    }
 767
 768    async fn get_channels(
 769        self: Arc<Server>,
 770        request: TypedEnvelope<proto::GetChannels>,
 771    ) -> tide::Result<proto::GetChannelsResponse> {
 772        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 773        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 774        Ok(proto::GetChannelsResponse {
 775            channels: channels
 776                .into_iter()
 777                .map(|chan| proto::Channel {
 778                    id: chan.id.to_proto(),
 779                    name: chan.name,
 780                })
 781                .collect(),
 782        })
 783    }
 784
 785    async fn get_users(
 786        self: Arc<Server>,
 787        request: TypedEnvelope<proto::GetUsers>,
 788    ) -> tide::Result<proto::GetUsersResponse> {
 789        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 790        let users = self
 791            .app_state
 792            .db
 793            .get_users_by_ids(user_ids)
 794            .await?
 795            .into_iter()
 796            .map(|user| proto::User {
 797                id: user.id.to_proto(),
 798                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 799                github_login: user.github_login,
 800            })
 801            .collect();
 802        Ok(proto::GetUsersResponse { users })
 803    }
 804
 805    fn update_contacts_for_users<'a>(
 806        self: &Arc<Server>,
 807        user_ids: impl IntoIterator<Item = &'a UserId>,
 808    ) -> anyhow::Result<()> {
 809        let mut result = Ok(());
 810        let state = self.state();
 811        for user_id in user_ids {
 812            let contacts = state.contacts_for_user(*user_id);
 813            for connection_id in state.connection_ids_for_user(*user_id) {
 814                if let Err(error) = self.peer.send(
 815                    connection_id,
 816                    proto::UpdateContacts {
 817                        contacts: contacts.clone(),
 818                    },
 819                ) {
 820                    result = Err(error);
 821                }
 822            }
 823        }
 824        result
 825    }
 826
 827    async fn join_channel(
 828        mut self: Arc<Self>,
 829        request: TypedEnvelope<proto::JoinChannel>,
 830    ) -> tide::Result<proto::JoinChannelResponse> {
 831        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 832        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 833        if !self
 834            .app_state
 835            .db
 836            .can_user_access_channel(user_id, channel_id)
 837            .await?
 838        {
 839            Err(anyhow!("access denied"))?;
 840        }
 841
 842        self.state_mut().join_channel(request.sender_id, channel_id);
 843        let messages = self
 844            .app_state
 845            .db
 846            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 847            .await?
 848            .into_iter()
 849            .map(|msg| proto::ChannelMessage {
 850                id: msg.id.to_proto(),
 851                body: msg.body,
 852                timestamp: msg.sent_at.unix_timestamp() as u64,
 853                sender_id: msg.sender_id.to_proto(),
 854                nonce: Some(msg.nonce.as_u128().into()),
 855            })
 856            .collect::<Vec<_>>();
 857        Ok(proto::JoinChannelResponse {
 858            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 859            messages,
 860        })
 861    }
 862
 863    async fn leave_channel(
 864        mut self: Arc<Self>,
 865        request: TypedEnvelope<proto::LeaveChannel>,
 866    ) -> tide::Result<()> {
 867        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 868        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 869        if !self
 870            .app_state
 871            .db
 872            .can_user_access_channel(user_id, channel_id)
 873            .await?
 874        {
 875            Err(anyhow!("access denied"))?;
 876        }
 877
 878        self.state_mut()
 879            .leave_channel(request.sender_id, channel_id);
 880
 881        Ok(())
 882    }
 883
 884    async fn send_channel_message(
 885        self: Arc<Self>,
 886        request: TypedEnvelope<proto::SendChannelMessage>,
 887    ) -> tide::Result<proto::SendChannelMessageResponse> {
 888        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 889        let user_id;
 890        let connection_ids;
 891        {
 892            let state = self.state();
 893            user_id = state.user_id_for_connection(request.sender_id)?;
 894            connection_ids = state.channel_connection_ids(channel_id)?;
 895        }
 896
 897        // Validate the message body.
 898        let body = request.payload.body.trim().to_string();
 899        if body.len() > MAX_MESSAGE_LEN {
 900            return Err(anyhow!("message is too long"))?;
 901        }
 902        if body.is_empty() {
 903            return Err(anyhow!("message can't be blank"))?;
 904        }
 905
 906        let timestamp = OffsetDateTime::now_utc();
 907        let nonce = request
 908            .payload
 909            .nonce
 910            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 911
 912        let message_id = self
 913            .app_state
 914            .db
 915            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 916            .await?
 917            .to_proto();
 918        let message = proto::ChannelMessage {
 919            sender_id: user_id.to_proto(),
 920            id: message_id,
 921            body,
 922            timestamp: timestamp.unix_timestamp() as u64,
 923            nonce: Some(nonce),
 924        };
 925        broadcast(request.sender_id, connection_ids, |conn_id| {
 926            self.peer.send(
 927                conn_id,
 928                proto::ChannelMessageSent {
 929                    channel_id: channel_id.to_proto(),
 930                    message: Some(message.clone()),
 931                },
 932            )
 933        })?;
 934        Ok(proto::SendChannelMessageResponse {
 935            message: Some(message),
 936        })
 937    }
 938
 939    async fn get_channel_messages(
 940        self: Arc<Self>,
 941        request: TypedEnvelope<proto::GetChannelMessages>,
 942    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 943        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 944        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 945        if !self
 946            .app_state
 947            .db
 948            .can_user_access_channel(user_id, channel_id)
 949            .await?
 950        {
 951            Err(anyhow!("access denied"))?;
 952        }
 953
 954        let messages = self
 955            .app_state
 956            .db
 957            .get_channel_messages(
 958                channel_id,
 959                MESSAGE_COUNT_PER_PAGE,
 960                Some(MessageId::from_proto(request.payload.before_message_id)),
 961            )
 962            .await?
 963            .into_iter()
 964            .map(|msg| proto::ChannelMessage {
 965                id: msg.id.to_proto(),
 966                body: msg.body,
 967                timestamp: msg.sent_at.unix_timestamp() as u64,
 968                sender_id: msg.sender_id.to_proto(),
 969                nonce: Some(msg.nonce.as_u128().into()),
 970            })
 971            .collect::<Vec<_>>();
 972
 973        Ok(proto::GetChannelMessagesResponse {
 974            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 975            messages,
 976        })
 977    }
 978
 979    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 980        self.store.read()
 981    }
 982
 983    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 984        self.store.write()
 985    }
 986}
 987
 988impl Executor for RealExecutor {
 989    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 990        task::spawn(future);
 991    }
 992}
 993
 994fn broadcast<F>(
 995    sender_id: ConnectionId,
 996    receiver_ids: Vec<ConnectionId>,
 997    mut f: F,
 998) -> anyhow::Result<()>
 999where
1000    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1001{
1002    let mut result = Ok(());
1003    for receiver_id in receiver_ids {
1004        if receiver_id != sender_id {
1005            if let Err(error) = f(receiver_id) {
1006                if result.is_ok() {
1007                    result = Err(error);
1008                }
1009            }
1010        }
1011    }
1012    result
1013}
1014
1015pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1016    let server = Server::new(app.state().clone(), rpc.clone(), None);
1017    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1018        let server = server.clone();
1019        async move {
1020            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1021
1022            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1023            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1024            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1025            let client_protocol_version: Option<u32> = request
1026                .header("X-Zed-Protocol-Version")
1027                .and_then(|v| v.as_str().parse().ok());
1028
1029            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1030                return Ok(Response::new(StatusCode::UpgradeRequired));
1031            }
1032
1033            let header = match request.header("Sec-Websocket-Key") {
1034                Some(h) => h.as_str(),
1035                None => return Err(anyhow!("expected sec-websocket-key"))?,
1036            };
1037
1038            let user_id = process_auth_header(&request).await?;
1039
1040            let mut response = Response::new(StatusCode::SwitchingProtocols);
1041            response.insert_header(UPGRADE, "websocket");
1042            response.insert_header(CONNECTION, "Upgrade");
1043            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1044            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1045            response.insert_header("Sec-Websocket-Version", "13");
1046
1047            let http_res: &mut tide::http::Response = response.as_mut();
1048            let upgrade_receiver = http_res.recv_upgrade().await;
1049            let addr = request.remote().unwrap_or("unknown").to_string();
1050            task::spawn(async move {
1051                if let Some(stream) = upgrade_receiver.await {
1052                    server
1053                        .handle_connection(
1054                            Connection::new(
1055                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1056                            ),
1057                            addr,
1058                            user_id,
1059                            None,
1060                            RealExecutor,
1061                        )
1062                        .await;
1063                }
1064            });
1065
1066            Ok(response)
1067        }
1068    });
1069}
1070
1071fn header_contains_ignore_case<T>(
1072    request: &tide::Request<T>,
1073    header_name: HeaderName,
1074    value: &str,
1075) -> bool {
1076    request
1077        .header(header_name)
1078        .map(|h| {
1079            h.as_str()
1080                .split(',')
1081                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1082        })
1083        .unwrap_or(false)
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088    use super::*;
1089    use crate::{
1090        auth,
1091        db::{tests::TestDb, UserId},
1092        github, AppState, Config,
1093    };
1094    use ::rpc::Peer;
1095    use collections::BTreeMap;
1096    use gpui::{executor, ModelHandle, TestAppContext};
1097    use parking_lot::Mutex;
1098    use postage::{mpsc, watch};
1099    use rand::prelude::*;
1100    use rpc::PeerId;
1101    use serde_json::json;
1102    use sqlx::types::time::OffsetDateTime;
1103    use std::{
1104        cell::{Cell, RefCell},
1105        env,
1106        ops::Deref,
1107        path::Path,
1108        rc::Rc,
1109        sync::{
1110            atomic::{AtomicBool, Ordering::SeqCst},
1111            Arc,
1112        },
1113        time::Duration,
1114    };
1115    use zed::{
1116        client::{
1117            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1118            EstablishConnectionError, UserStore,
1119        },
1120        editor::{
1121            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1122            Redo, ToggleCodeActions, Undo,
1123        },
1124        fs::{FakeFs, Fs as _},
1125        language::{
1126            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1127            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1128        },
1129        lsp,
1130        project::{DiagnosticSummary, Project, ProjectPath},
1131        workspace::{Workspace, WorkspaceParams},
1132    };
1133
1134    #[cfg(test)]
1135    #[ctor::ctor]
1136    fn init_logger() {
1137        if std::env::var("RUST_LOG").is_ok() {
1138            env_logger::init();
1139        }
1140    }
1141
1142    #[gpui::test(iterations = 10)]
1143    async fn test_share_project(
1144        mut cx_a: TestAppContext,
1145        mut cx_b: TestAppContext,
1146        last_iteration: bool,
1147    ) {
1148        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1149        let lang_registry = Arc::new(LanguageRegistry::new());
1150        let fs = Arc::new(FakeFs::new(cx_a.background()));
1151        cx_a.foreground().forbid_parking();
1152
1153        // Connect to a server as 2 clients.
1154        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1155        let client_a = server.create_client(&mut cx_a, "user_a").await;
1156        let client_b = server.create_client(&mut cx_b, "user_b").await;
1157
1158        // Share a project as client A
1159        fs.insert_tree(
1160            "/a",
1161            json!({
1162                ".zed.toml": r#"collaborators = ["user_b"]"#,
1163                "a.txt": "a-contents",
1164                "b.txt": "b-contents",
1165            }),
1166        )
1167        .await;
1168        let project_a = cx_a.update(|cx| {
1169            Project::local(
1170                client_a.clone(),
1171                client_a.user_store.clone(),
1172                lang_registry.clone(),
1173                fs.clone(),
1174                cx,
1175            )
1176        });
1177        let (worktree_a, _) = project_a
1178            .update(&mut cx_a, |p, cx| {
1179                p.find_or_create_local_worktree("/a", false, cx)
1180            })
1181            .await
1182            .unwrap();
1183        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1184        worktree_a
1185            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1186            .await;
1187        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1188        project_a
1189            .update(&mut cx_a, |p, cx| p.share(cx))
1190            .await
1191            .unwrap();
1192
1193        // Join that project as client B
1194        let project_b = Project::remote(
1195            project_id,
1196            client_b.clone(),
1197            client_b.user_store.clone(),
1198            lang_registry.clone(),
1199            fs.clone(),
1200            &mut cx_b.to_async(),
1201        )
1202        .await
1203        .unwrap();
1204
1205        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1206            assert_eq!(
1207                project
1208                    .collaborators()
1209                    .get(&client_a.peer_id)
1210                    .unwrap()
1211                    .user
1212                    .github_login,
1213                "user_a"
1214            );
1215            project.replica_id()
1216        });
1217        project_a
1218            .condition(&cx_a, |tree, _| {
1219                tree.collaborators()
1220                    .get(&client_b.peer_id)
1221                    .map_or(false, |collaborator| {
1222                        collaborator.replica_id == replica_id_b
1223                            && collaborator.user.github_login == "user_b"
1224                    })
1225            })
1226            .await;
1227
1228        // Open the same file as client B and client A.
1229        let buffer_b = project_b
1230            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1231            .await
1232            .unwrap();
1233        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1234        buffer_b.read_with(&cx_b, |buf, cx| {
1235            assert_eq!(buf.read(cx).text(), "b-contents")
1236        });
1237        project_a.read_with(&cx_a, |project, cx| {
1238            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1239        });
1240        let buffer_a = project_a
1241            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1242            .await
1243            .unwrap();
1244
1245        let editor_b = cx_b.add_view(window_b, |cx| {
1246            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1247        });
1248
1249        // TODO
1250        // // Create a selection set as client B and see that selection set as client A.
1251        // buffer_a
1252        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1253        //     .await;
1254
1255        // Edit the buffer as client B and see that edit as client A.
1256        editor_b.update(&mut cx_b, |editor, cx| {
1257            editor.handle_input(&Input("ok, ".into()), cx)
1258        });
1259        buffer_a
1260            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1261            .await;
1262
1263        // TODO
1264        // // Remove the selection set as client B, see those selections disappear as client A.
1265        cx_b.update(move |_| drop(editor_b));
1266        // buffer_a
1267        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1268        //     .await;
1269
1270        // Close the buffer as client A, see that the buffer is closed.
1271        cx_a.update(move |_| drop(buffer_a));
1272        project_a
1273            .condition(&cx_a, |project, cx| {
1274                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1275            })
1276            .await;
1277
1278        // Dropping the client B's project removes client B from client A's collaborators.
1279        cx_b.update(move |_| drop(project_b));
1280        project_a
1281            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1282            .await;
1283    }
1284
1285    #[gpui::test(iterations = 10)]
1286    async fn test_unshare_project(
1287        mut cx_a: TestAppContext,
1288        mut cx_b: TestAppContext,
1289        last_iteration: bool,
1290    ) {
1291        let lang_registry = Arc::new(LanguageRegistry::new());
1292        let fs = Arc::new(FakeFs::new(cx_a.background()));
1293        cx_a.foreground().forbid_parking();
1294
1295        // Connect to a server as 2 clients.
1296        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1297        let client_a = server.create_client(&mut cx_a, "user_a").await;
1298        let client_b = server.create_client(&mut cx_b, "user_b").await;
1299
1300        // Share a project as client A
1301        fs.insert_tree(
1302            "/a",
1303            json!({
1304                ".zed.toml": r#"collaborators = ["user_b"]"#,
1305                "a.txt": "a-contents",
1306                "b.txt": "b-contents",
1307            }),
1308        )
1309        .await;
1310        let project_a = cx_a.update(|cx| {
1311            Project::local(
1312                client_a.clone(),
1313                client_a.user_store.clone(),
1314                lang_registry.clone(),
1315                fs.clone(),
1316                cx,
1317            )
1318        });
1319        let (worktree_a, _) = project_a
1320            .update(&mut cx_a, |p, cx| {
1321                p.find_or_create_local_worktree("/a", false, cx)
1322            })
1323            .await
1324            .unwrap();
1325        worktree_a
1326            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1327            .await;
1328        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1329        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1330        project_a
1331            .update(&mut cx_a, |p, cx| p.share(cx))
1332            .await
1333            .unwrap();
1334        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1335
1336        // Join that project as client B
1337        let project_b = Project::remote(
1338            project_id,
1339            client_b.clone(),
1340            client_b.user_store.clone(),
1341            lang_registry.clone(),
1342            fs.clone(),
1343            &mut cx_b.to_async(),
1344        )
1345        .await
1346        .unwrap();
1347        project_b
1348            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1349            .await
1350            .unwrap();
1351
1352        // Unshare the project as client A
1353        project_a
1354            .update(&mut cx_a, |project, cx| project.unshare(cx))
1355            .await
1356            .unwrap();
1357        project_b
1358            .condition(&mut cx_b, |project, _| project.is_read_only())
1359            .await;
1360        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1361        drop(project_b);
1362
1363        // Share the project again and ensure guests can still join.
1364        project_a
1365            .update(&mut cx_a, |project, cx| project.share(cx))
1366            .await
1367            .unwrap();
1368        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1369
1370        let project_c = Project::remote(
1371            project_id,
1372            client_b.clone(),
1373            client_b.user_store.clone(),
1374            lang_registry.clone(),
1375            fs.clone(),
1376            &mut cx_b.to_async(),
1377        )
1378        .await
1379        .unwrap();
1380        project_c
1381            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1382            .await
1383            .unwrap();
1384    }
1385
1386    #[gpui::test(iterations = 10)]
1387    async fn test_propagate_saves_and_fs_changes(
1388        mut cx_a: TestAppContext,
1389        mut cx_b: TestAppContext,
1390        mut cx_c: TestAppContext,
1391        last_iteration: bool,
1392    ) {
1393        let lang_registry = Arc::new(LanguageRegistry::new());
1394        let fs = Arc::new(FakeFs::new(cx_a.background()));
1395        cx_a.foreground().forbid_parking();
1396
1397        // Connect to a server as 3 clients.
1398        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1399        let client_a = server.create_client(&mut cx_a, "user_a").await;
1400        let client_b = server.create_client(&mut cx_b, "user_b").await;
1401        let client_c = server.create_client(&mut cx_c, "user_c").await;
1402
1403        // Share a worktree as client A.
1404        fs.insert_tree(
1405            "/a",
1406            json!({
1407                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1408                "file1": "",
1409                "file2": ""
1410            }),
1411        )
1412        .await;
1413        let project_a = cx_a.update(|cx| {
1414            Project::local(
1415                client_a.clone(),
1416                client_a.user_store.clone(),
1417                lang_registry.clone(),
1418                fs.clone(),
1419                cx,
1420            )
1421        });
1422        let (worktree_a, _) = project_a
1423            .update(&mut cx_a, |p, cx| {
1424                p.find_or_create_local_worktree("/a", false, cx)
1425            })
1426            .await
1427            .unwrap();
1428        worktree_a
1429            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1430            .await;
1431        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1432        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1433        project_a
1434            .update(&mut cx_a, |p, cx| p.share(cx))
1435            .await
1436            .unwrap();
1437
1438        // Join that worktree as clients B and C.
1439        let project_b = Project::remote(
1440            project_id,
1441            client_b.clone(),
1442            client_b.user_store.clone(),
1443            lang_registry.clone(),
1444            fs.clone(),
1445            &mut cx_b.to_async(),
1446        )
1447        .await
1448        .unwrap();
1449        let project_c = Project::remote(
1450            project_id,
1451            client_c.clone(),
1452            client_c.user_store.clone(),
1453            lang_registry.clone(),
1454            fs.clone(),
1455            &mut cx_c.to_async(),
1456        )
1457        .await
1458        .unwrap();
1459        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1460        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1461
1462        // Open and edit a buffer as both guests B and C.
1463        let buffer_b = project_b
1464            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1465            .await
1466            .unwrap();
1467        let buffer_c = project_c
1468            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1469            .await
1470            .unwrap();
1471        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1472        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1473
1474        // Open and edit that buffer as the host.
1475        let buffer_a = project_a
1476            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1477            .await
1478            .unwrap();
1479
1480        buffer_a
1481            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1482            .await;
1483        buffer_a.update(&mut cx_a, |buf, cx| {
1484            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1485        });
1486
1487        // Wait for edits to propagate
1488        buffer_a
1489            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1490            .await;
1491        buffer_b
1492            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1493            .await;
1494        buffer_c
1495            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1496            .await;
1497
1498        // Edit the buffer as the host and concurrently save as guest B.
1499        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1500        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1501        save_b.await.unwrap();
1502        assert_eq!(
1503            fs.load("/a/file1".as_ref()).await.unwrap(),
1504            "hi-a, i-am-c, i-am-b, i-am-a"
1505        );
1506        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1507        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1508        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1509
1510        // Make changes on host's file system, see those changes on guest worktrees.
1511        fs.rename(
1512            "/a/file1".as_ref(),
1513            "/a/file1-renamed".as_ref(),
1514            Default::default(),
1515        )
1516        .await
1517        .unwrap();
1518
1519        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1520            .await
1521            .unwrap();
1522        fs.insert_file(Path::new("/a/file4"), "4".into())
1523            .await
1524            .unwrap();
1525
1526        worktree_a
1527            .condition(&cx_a, |tree, _| {
1528                tree.paths()
1529                    .map(|p| p.to_string_lossy())
1530                    .collect::<Vec<_>>()
1531                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1532            })
1533            .await;
1534        worktree_b
1535            .condition(&cx_b, |tree, _| {
1536                tree.paths()
1537                    .map(|p| p.to_string_lossy())
1538                    .collect::<Vec<_>>()
1539                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1540            })
1541            .await;
1542        worktree_c
1543            .condition(&cx_c, |tree, _| {
1544                tree.paths()
1545                    .map(|p| p.to_string_lossy())
1546                    .collect::<Vec<_>>()
1547                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1548            })
1549            .await;
1550
1551        // Ensure buffer files are updated as well.
1552        buffer_a
1553            .condition(&cx_a, |buf, _| {
1554                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1555            })
1556            .await;
1557        buffer_b
1558            .condition(&cx_b, |buf, _| {
1559                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1560            })
1561            .await;
1562        buffer_c
1563            .condition(&cx_c, |buf, _| {
1564                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1565            })
1566            .await;
1567    }
1568
1569    #[gpui::test(iterations = 10)]
1570    async fn test_buffer_conflict_after_save(
1571        mut cx_a: TestAppContext,
1572        mut cx_b: TestAppContext,
1573        last_iteration: bool,
1574    ) {
1575        cx_a.foreground().forbid_parking();
1576        let lang_registry = Arc::new(LanguageRegistry::new());
1577        let fs = Arc::new(FakeFs::new(cx_a.background()));
1578
1579        // Connect to a server as 2 clients.
1580        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1581        let client_a = server.create_client(&mut cx_a, "user_a").await;
1582        let client_b = server.create_client(&mut cx_b, "user_b").await;
1583
1584        // Share a project as client A
1585        fs.insert_tree(
1586            "/dir",
1587            json!({
1588                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1589                "a.txt": "a-contents",
1590            }),
1591        )
1592        .await;
1593
1594        let project_a = cx_a.update(|cx| {
1595            Project::local(
1596                client_a.clone(),
1597                client_a.user_store.clone(),
1598                lang_registry.clone(),
1599                fs.clone(),
1600                cx,
1601            )
1602        });
1603        let (worktree_a, _) = project_a
1604            .update(&mut cx_a, |p, cx| {
1605                p.find_or_create_local_worktree("/dir", false, cx)
1606            })
1607            .await
1608            .unwrap();
1609        worktree_a
1610            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1611            .await;
1612        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1613        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1614        project_a
1615            .update(&mut cx_a, |p, cx| p.share(cx))
1616            .await
1617            .unwrap();
1618
1619        // Join that project as client B
1620        let project_b = Project::remote(
1621            project_id,
1622            client_b.clone(),
1623            client_b.user_store.clone(),
1624            lang_registry.clone(),
1625            fs.clone(),
1626            &mut cx_b.to_async(),
1627        )
1628        .await
1629        .unwrap();
1630
1631        // Open a buffer as client B
1632        let buffer_b = project_b
1633            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1634            .await
1635            .unwrap();
1636
1637        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1638        buffer_b.read_with(&cx_b, |buf, _| {
1639            assert!(buf.is_dirty());
1640            assert!(!buf.has_conflict());
1641        });
1642
1643        buffer_b
1644            .update(&mut cx_b, |buf, cx| buf.save(cx))
1645            .await
1646            .unwrap();
1647        buffer_b
1648            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1649            .await;
1650        buffer_b.read_with(&cx_b, |buf, _| {
1651            assert!(!buf.has_conflict());
1652        });
1653
1654        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1655        buffer_b.read_with(&cx_b, |buf, _| {
1656            assert!(buf.is_dirty());
1657            assert!(!buf.has_conflict());
1658        });
1659    }
1660
1661    #[gpui::test(iterations = 10)]
1662    async fn test_buffer_reloading(
1663        mut cx_a: TestAppContext,
1664        mut cx_b: TestAppContext,
1665        last_iteration: bool,
1666    ) {
1667        cx_a.foreground().forbid_parking();
1668        let lang_registry = Arc::new(LanguageRegistry::new());
1669        let fs = Arc::new(FakeFs::new(cx_a.background()));
1670
1671        // Connect to a server as 2 clients.
1672        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1673        let client_a = server.create_client(&mut cx_a, "user_a").await;
1674        let client_b = server.create_client(&mut cx_b, "user_b").await;
1675
1676        // Share a project as client A
1677        fs.insert_tree(
1678            "/dir",
1679            json!({
1680                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1681                "a.txt": "a-contents",
1682            }),
1683        )
1684        .await;
1685
1686        let project_a = cx_a.update(|cx| {
1687            Project::local(
1688                client_a.clone(),
1689                client_a.user_store.clone(),
1690                lang_registry.clone(),
1691                fs.clone(),
1692                cx,
1693            )
1694        });
1695        let (worktree_a, _) = project_a
1696            .update(&mut cx_a, |p, cx| {
1697                p.find_or_create_local_worktree("/dir", false, cx)
1698            })
1699            .await
1700            .unwrap();
1701        worktree_a
1702            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1703            .await;
1704        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1705        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1706        project_a
1707            .update(&mut cx_a, |p, cx| p.share(cx))
1708            .await
1709            .unwrap();
1710
1711        // Join that project as client B
1712        let project_b = Project::remote(
1713            project_id,
1714            client_b.clone(),
1715            client_b.user_store.clone(),
1716            lang_registry.clone(),
1717            fs.clone(),
1718            &mut cx_b.to_async(),
1719        )
1720        .await
1721        .unwrap();
1722        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1723
1724        // Open a buffer as client B
1725        let buffer_b = project_b
1726            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1727            .await
1728            .unwrap();
1729        buffer_b.read_with(&cx_b, |buf, _| {
1730            assert!(!buf.is_dirty());
1731            assert!(!buf.has_conflict());
1732        });
1733
1734        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1735            .await
1736            .unwrap();
1737        buffer_b
1738            .condition(&cx_b, |buf, _| {
1739                buf.text() == "new contents" && !buf.is_dirty()
1740            })
1741            .await;
1742        buffer_b.read_with(&cx_b, |buf, _| {
1743            assert!(!buf.has_conflict());
1744        });
1745    }
1746
1747    #[gpui::test(iterations = 10)]
1748    async fn test_editing_while_guest_opens_buffer(
1749        mut cx_a: TestAppContext,
1750        mut cx_b: TestAppContext,
1751        last_iteration: bool,
1752    ) {
1753        cx_a.foreground().forbid_parking();
1754        let lang_registry = Arc::new(LanguageRegistry::new());
1755        let fs = Arc::new(FakeFs::new(cx_a.background()));
1756
1757        // Connect to a server as 2 clients.
1758        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1759        let client_a = server.create_client(&mut cx_a, "user_a").await;
1760        let client_b = server.create_client(&mut cx_b, "user_b").await;
1761
1762        // Share a project as client A
1763        fs.insert_tree(
1764            "/dir",
1765            json!({
1766                ".zed.toml": r#"collaborators = ["user_b"]"#,
1767                "a.txt": "a-contents",
1768            }),
1769        )
1770        .await;
1771        let project_a = cx_a.update(|cx| {
1772            Project::local(
1773                client_a.clone(),
1774                client_a.user_store.clone(),
1775                lang_registry.clone(),
1776                fs.clone(),
1777                cx,
1778            )
1779        });
1780        let (worktree_a, _) = project_a
1781            .update(&mut cx_a, |p, cx| {
1782                p.find_or_create_local_worktree("/dir", false, cx)
1783            })
1784            .await
1785            .unwrap();
1786        worktree_a
1787            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1788            .await;
1789        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1790        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1791        project_a
1792            .update(&mut cx_a, |p, cx| p.share(cx))
1793            .await
1794            .unwrap();
1795
1796        // Join that project as client B
1797        let project_b = Project::remote(
1798            project_id,
1799            client_b.clone(),
1800            client_b.user_store.clone(),
1801            lang_registry.clone(),
1802            fs.clone(),
1803            &mut cx_b.to_async(),
1804        )
1805        .await
1806        .unwrap();
1807
1808        // Open a buffer as client A
1809        let buffer_a = project_a
1810            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1811            .await
1812            .unwrap();
1813
1814        // Start opening the same buffer as client B
1815        let buffer_b = cx_b
1816            .background()
1817            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1818
1819        // Edit the buffer as client A while client B is still opening it.
1820        cx_b.background().simulate_random_delay().await;
1821        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1822        cx_b.background().simulate_random_delay().await;
1823        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1824
1825        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1826        let buffer_b = buffer_b.await.unwrap();
1827        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1828    }
1829
1830    #[gpui::test(iterations = 10)]
1831    async fn test_leaving_worktree_while_opening_buffer(
1832        mut cx_a: TestAppContext,
1833        mut cx_b: TestAppContext,
1834        last_iteration: bool,
1835    ) {
1836        cx_a.foreground().forbid_parking();
1837        let lang_registry = Arc::new(LanguageRegistry::new());
1838        let fs = Arc::new(FakeFs::new(cx_a.background()));
1839
1840        // Connect to a server as 2 clients.
1841        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1842        let client_a = server.create_client(&mut cx_a, "user_a").await;
1843        let client_b = server.create_client(&mut cx_b, "user_b").await;
1844
1845        // Share a project as client A
1846        fs.insert_tree(
1847            "/dir",
1848            json!({
1849                ".zed.toml": r#"collaborators = ["user_b"]"#,
1850                "a.txt": "a-contents",
1851            }),
1852        )
1853        .await;
1854        let project_a = cx_a.update(|cx| {
1855            Project::local(
1856                client_a.clone(),
1857                client_a.user_store.clone(),
1858                lang_registry.clone(),
1859                fs.clone(),
1860                cx,
1861            )
1862        });
1863        let (worktree_a, _) = project_a
1864            .update(&mut cx_a, |p, cx| {
1865                p.find_or_create_local_worktree("/dir", false, cx)
1866            })
1867            .await
1868            .unwrap();
1869        worktree_a
1870            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1871            .await;
1872        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1873        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1874        project_a
1875            .update(&mut cx_a, |p, cx| p.share(cx))
1876            .await
1877            .unwrap();
1878
1879        // Join that project as client B
1880        let project_b = Project::remote(
1881            project_id,
1882            client_b.clone(),
1883            client_b.user_store.clone(),
1884            lang_registry.clone(),
1885            fs.clone(),
1886            &mut cx_b.to_async(),
1887        )
1888        .await
1889        .unwrap();
1890
1891        // See that a guest has joined as client A.
1892        project_a
1893            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1894            .await;
1895
1896        // Begin opening a buffer as client B, but leave the project before the open completes.
1897        let buffer_b = cx_b
1898            .background()
1899            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1900        cx_b.update(|_| drop(project_b));
1901        drop(buffer_b);
1902
1903        // See that the guest has left.
1904        project_a
1905            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1906            .await;
1907    }
1908
1909    #[gpui::test(iterations = 10)]
1910    async fn test_peer_disconnection(
1911        mut cx_a: TestAppContext,
1912        mut cx_b: TestAppContext,
1913        last_iteration: bool,
1914    ) {
1915        cx_a.foreground().forbid_parking();
1916        let lang_registry = Arc::new(LanguageRegistry::new());
1917        let fs = Arc::new(FakeFs::new(cx_a.background()));
1918
1919        // Connect to a server as 2 clients.
1920        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1921        let client_a = server.create_client(&mut cx_a, "user_a").await;
1922        let client_b = server.create_client(&mut cx_b, "user_b").await;
1923
1924        // Share a project as client A
1925        fs.insert_tree(
1926            "/a",
1927            json!({
1928                ".zed.toml": r#"collaborators = ["user_b"]"#,
1929                "a.txt": "a-contents",
1930                "b.txt": "b-contents",
1931            }),
1932        )
1933        .await;
1934        let project_a = cx_a.update(|cx| {
1935            Project::local(
1936                client_a.clone(),
1937                client_a.user_store.clone(),
1938                lang_registry.clone(),
1939                fs.clone(),
1940                cx,
1941            )
1942        });
1943        let (worktree_a, _) = project_a
1944            .update(&mut cx_a, |p, cx| {
1945                p.find_or_create_local_worktree("/a", false, cx)
1946            })
1947            .await
1948            .unwrap();
1949        worktree_a
1950            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1951            .await;
1952        let project_id = project_a
1953            .update(&mut cx_a, |project, _| project.next_remote_id())
1954            .await;
1955        project_a
1956            .update(&mut cx_a, |project, cx| project.share(cx))
1957            .await
1958            .unwrap();
1959
1960        // Join that project as client B
1961        let _project_b = Project::remote(
1962            project_id,
1963            client_b.clone(),
1964            client_b.user_store.clone(),
1965            lang_registry.clone(),
1966            fs.clone(),
1967            &mut cx_b.to_async(),
1968        )
1969        .await
1970        .unwrap();
1971
1972        // See that a guest has joined as client A.
1973        project_a
1974            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1975            .await;
1976
1977        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1978        client_b.disconnect(&cx_b.to_async()).unwrap();
1979        project_a
1980            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1981            .await;
1982    }
1983
1984    #[gpui::test(iterations = 10)]
1985    async fn test_collaborating_with_diagnostics(
1986        mut cx_a: TestAppContext,
1987        mut cx_b: TestAppContext,
1988        last_iteration: bool,
1989    ) {
1990        cx_a.foreground().forbid_parking();
1991        let mut lang_registry = Arc::new(LanguageRegistry::new());
1992        let fs = Arc::new(FakeFs::new(cx_a.background()));
1993
1994        // Set up a fake language server.
1995        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1996        Arc::get_mut(&mut lang_registry)
1997            .unwrap()
1998            .add(Arc::new(Language::new(
1999                LanguageConfig {
2000                    name: "Rust".to_string(),
2001                    path_suffixes: vec!["rs".to_string()],
2002                    language_server: Some(language_server_config),
2003                    ..Default::default()
2004                },
2005                Some(tree_sitter_rust::language()),
2006            )));
2007
2008        // Connect to a server as 2 clients.
2009        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2010        let client_a = server.create_client(&mut cx_a, "user_a").await;
2011        let client_b = server.create_client(&mut cx_b, "user_b").await;
2012
2013        // Share a project as client A
2014        fs.insert_tree(
2015            "/a",
2016            json!({
2017                ".zed.toml": r#"collaborators = ["user_b"]"#,
2018                "a.rs": "let one = two",
2019                "other.rs": "",
2020            }),
2021        )
2022        .await;
2023        let project_a = cx_a.update(|cx| {
2024            Project::local(
2025                client_a.clone(),
2026                client_a.user_store.clone(),
2027                lang_registry.clone(),
2028                fs.clone(),
2029                cx,
2030            )
2031        });
2032        let (worktree_a, _) = project_a
2033            .update(&mut cx_a, |p, cx| {
2034                p.find_or_create_local_worktree("/a", false, cx)
2035            })
2036            .await
2037            .unwrap();
2038        worktree_a
2039            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2040            .await;
2041        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2042        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2043        project_a
2044            .update(&mut cx_a, |p, cx| p.share(cx))
2045            .await
2046            .unwrap();
2047
2048        // Cause the language server to start.
2049        let _ = cx_a
2050            .background()
2051            .spawn(project_a.update(&mut cx_a, |project, cx| {
2052                project.open_buffer(
2053                    ProjectPath {
2054                        worktree_id,
2055                        path: Path::new("other.rs").into(),
2056                    },
2057                    cx,
2058                )
2059            }))
2060            .await
2061            .unwrap();
2062
2063        // Simulate a language server reporting errors for a file.
2064        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2065        fake_language_server
2066            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2067                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2068                version: None,
2069                diagnostics: vec![lsp::Diagnostic {
2070                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2071                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2072                    message: "message 1".to_string(),
2073                    ..Default::default()
2074                }],
2075            })
2076            .await;
2077
2078        // Wait for server to see the diagnostics update.
2079        server
2080            .condition(|store| {
2081                let worktree = store
2082                    .project(project_id)
2083                    .unwrap()
2084                    .worktrees
2085                    .get(&worktree_id.to_proto())
2086                    .unwrap();
2087
2088                !worktree
2089                    .share
2090                    .as_ref()
2091                    .unwrap()
2092                    .diagnostic_summaries
2093                    .is_empty()
2094            })
2095            .await;
2096
2097        // Join the worktree as client B.
2098        let project_b = Project::remote(
2099            project_id,
2100            client_b.clone(),
2101            client_b.user_store.clone(),
2102            lang_registry.clone(),
2103            fs.clone(),
2104            &mut cx_b.to_async(),
2105        )
2106        .await
2107        .unwrap();
2108
2109        project_b.read_with(&cx_b, |project, cx| {
2110            assert_eq!(
2111                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2112                &[(
2113                    ProjectPath {
2114                        worktree_id,
2115                        path: Arc::from(Path::new("a.rs")),
2116                    },
2117                    DiagnosticSummary {
2118                        error_count: 1,
2119                        warning_count: 0,
2120                        ..Default::default()
2121                    },
2122                )]
2123            )
2124        });
2125
2126        // Simulate a language server reporting more errors for a file.
2127        fake_language_server
2128            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2129                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2130                version: None,
2131                diagnostics: vec![
2132                    lsp::Diagnostic {
2133                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2134                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2135                        message: "message 1".to_string(),
2136                        ..Default::default()
2137                    },
2138                    lsp::Diagnostic {
2139                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2140                        range: lsp::Range::new(
2141                            lsp::Position::new(0, 10),
2142                            lsp::Position::new(0, 13),
2143                        ),
2144                        message: "message 2".to_string(),
2145                        ..Default::default()
2146                    },
2147                ],
2148            })
2149            .await;
2150
2151        // Client b gets the updated summaries
2152        project_b
2153            .condition(&cx_b, |project, cx| {
2154                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2155                    == &[(
2156                        ProjectPath {
2157                            worktree_id,
2158                            path: Arc::from(Path::new("a.rs")),
2159                        },
2160                        DiagnosticSummary {
2161                            error_count: 1,
2162                            warning_count: 1,
2163                            ..Default::default()
2164                        },
2165                    )]
2166            })
2167            .await;
2168
2169        // Open the file with the errors on client B. They should be present.
2170        let buffer_b = cx_b
2171            .background()
2172            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2173            .await
2174            .unwrap();
2175
2176        buffer_b.read_with(&cx_b, |buffer, _| {
2177            assert_eq!(
2178                buffer
2179                    .snapshot()
2180                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2181                    .map(|entry| entry)
2182                    .collect::<Vec<_>>(),
2183                &[
2184                    DiagnosticEntry {
2185                        range: Point::new(0, 4)..Point::new(0, 7),
2186                        diagnostic: Diagnostic {
2187                            group_id: 0,
2188                            message: "message 1".to_string(),
2189                            severity: lsp::DiagnosticSeverity::ERROR,
2190                            is_primary: true,
2191                            ..Default::default()
2192                        }
2193                    },
2194                    DiagnosticEntry {
2195                        range: Point::new(0, 10)..Point::new(0, 13),
2196                        diagnostic: Diagnostic {
2197                            group_id: 1,
2198                            severity: lsp::DiagnosticSeverity::WARNING,
2199                            message: "message 2".to_string(),
2200                            is_primary: true,
2201                            ..Default::default()
2202                        }
2203                    }
2204                ]
2205            );
2206        });
2207    }
2208
2209    #[gpui::test(iterations = 10)]
2210    async fn test_collaborating_with_completion(
2211        mut cx_a: TestAppContext,
2212        mut cx_b: TestAppContext,
2213        last_iteration: bool,
2214    ) {
2215        cx_a.foreground().forbid_parking();
2216        let mut lang_registry = Arc::new(LanguageRegistry::new());
2217        let fs = Arc::new(FakeFs::new(cx_a.background()));
2218
2219        // Set up a fake language server.
2220        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2221        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2222            completion_provider: Some(lsp::CompletionOptions {
2223                trigger_characters: Some(vec![".".to_string()]),
2224                ..Default::default()
2225            }),
2226            ..Default::default()
2227        });
2228        Arc::get_mut(&mut lang_registry)
2229            .unwrap()
2230            .add(Arc::new(Language::new(
2231                LanguageConfig {
2232                    name: "Rust".to_string(),
2233                    path_suffixes: vec!["rs".to_string()],
2234                    language_server: Some(language_server_config),
2235                    ..Default::default()
2236                },
2237                Some(tree_sitter_rust::language()),
2238            )));
2239
2240        // Connect to a server as 2 clients.
2241        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2242        let client_a = server.create_client(&mut cx_a, "user_a").await;
2243        let client_b = server.create_client(&mut cx_b, "user_b").await;
2244
2245        // Share a project as client A
2246        fs.insert_tree(
2247            "/a",
2248            json!({
2249                ".zed.toml": r#"collaborators = ["user_b"]"#,
2250                "main.rs": "fn main() { a }",
2251                "other.rs": "",
2252            }),
2253        )
2254        .await;
2255        let project_a = cx_a.update(|cx| {
2256            Project::local(
2257                client_a.clone(),
2258                client_a.user_store.clone(),
2259                lang_registry.clone(),
2260                fs.clone(),
2261                cx,
2262            )
2263        });
2264        let (worktree_a, _) = project_a
2265            .update(&mut cx_a, |p, cx| {
2266                p.find_or_create_local_worktree("/a", false, cx)
2267            })
2268            .await
2269            .unwrap();
2270        worktree_a
2271            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2272            .await;
2273        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2274        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2275        project_a
2276            .update(&mut cx_a, |p, cx| p.share(cx))
2277            .await
2278            .unwrap();
2279
2280        // Join the worktree as client B.
2281        let project_b = Project::remote(
2282            project_id,
2283            client_b.clone(),
2284            client_b.user_store.clone(),
2285            lang_registry.clone(),
2286            fs.clone(),
2287            &mut cx_b.to_async(),
2288        )
2289        .await
2290        .unwrap();
2291
2292        // Open a file in an editor as the guest.
2293        let buffer_b = project_b
2294            .update(&mut cx_b, |p, cx| {
2295                p.open_buffer((worktree_id, "main.rs"), cx)
2296            })
2297            .await
2298            .unwrap();
2299        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2300        let editor_b = cx_b.add_view(window_b, |cx| {
2301            Editor::for_buffer(
2302                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2303                Arc::new(|cx| EditorSettings::test(cx)),
2304                Some(project_b.clone()),
2305                cx,
2306            )
2307        });
2308
2309        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2310        buffer_b
2311            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2312            .await;
2313
2314        // Type a completion trigger character as the guest.
2315        editor_b.update(&mut cx_b, |editor, cx| {
2316            editor.select_ranges([13..13], None, cx);
2317            editor.handle_input(&Input(".".into()), cx);
2318            cx.focus(&editor_b);
2319        });
2320
2321        // Receive a completion request as the host's language server.
2322        // Return some completions from the host's language server.
2323        cx_a.foreground().start_waiting();
2324        fake_language_server
2325            .handle_request::<lsp::request::Completion, _>(|params| {
2326                assert_eq!(
2327                    params.text_document_position.text_document.uri,
2328                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2329                );
2330                assert_eq!(
2331                    params.text_document_position.position,
2332                    lsp::Position::new(0, 14),
2333                );
2334
2335                Some(lsp::CompletionResponse::Array(vec![
2336                    lsp::CompletionItem {
2337                        label: "first_method(…)".into(),
2338                        detail: Some("fn(&mut self, B) -> C".into()),
2339                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2340                            new_text: "first_method($1)".to_string(),
2341                            range: lsp::Range::new(
2342                                lsp::Position::new(0, 14),
2343                                lsp::Position::new(0, 14),
2344                            ),
2345                        })),
2346                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2347                        ..Default::default()
2348                    },
2349                    lsp::CompletionItem {
2350                        label: "second_method(…)".into(),
2351                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2352                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2353                            new_text: "second_method()".to_string(),
2354                            range: lsp::Range::new(
2355                                lsp::Position::new(0, 14),
2356                                lsp::Position::new(0, 14),
2357                            ),
2358                        })),
2359                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2360                        ..Default::default()
2361                    },
2362                ]))
2363            })
2364            .next()
2365            .await
2366            .unwrap();
2367        cx_a.foreground().finish_waiting();
2368
2369        // Open the buffer on the host.
2370        let buffer_a = project_a
2371            .update(&mut cx_a, |p, cx| {
2372                p.open_buffer((worktree_id, "main.rs"), cx)
2373            })
2374            .await
2375            .unwrap();
2376        buffer_a
2377            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2378            .await;
2379
2380        // Confirm a completion on the guest.
2381        editor_b
2382            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2383            .await;
2384        editor_b.update(&mut cx_b, |editor, cx| {
2385            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2386            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2387        });
2388
2389        // Return a resolved completion from the host's language server.
2390        // The resolved completion has an additional text edit.
2391        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2392            assert_eq!(params.label, "first_method(…)");
2393            lsp::CompletionItem {
2394                label: "first_method(…)".into(),
2395                detail: Some("fn(&mut self, B) -> C".into()),
2396                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2397                    new_text: "first_method($1)".to_string(),
2398                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2399                })),
2400                additional_text_edits: Some(vec![lsp::TextEdit {
2401                    new_text: "use d::SomeTrait;\n".to_string(),
2402                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2403                }]),
2404                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2405                ..Default::default()
2406            }
2407        });
2408
2409        // The additional edit is applied.
2410        buffer_a
2411            .condition(&cx_a, |buffer, _| {
2412                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2413            })
2414            .await;
2415        buffer_b
2416            .condition(&cx_b, |buffer, _| {
2417                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2418            })
2419            .await;
2420    }
2421
2422    #[gpui::test(iterations = 10)]
2423    async fn test_formatting_buffer(
2424        mut cx_a: TestAppContext,
2425        mut cx_b: TestAppContext,
2426        last_iteration: bool,
2427    ) {
2428        cx_a.foreground().forbid_parking();
2429        let mut lang_registry = Arc::new(LanguageRegistry::new());
2430        let fs = Arc::new(FakeFs::new(cx_a.background()));
2431
2432        // Set up a fake language server.
2433        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2434        Arc::get_mut(&mut lang_registry)
2435            .unwrap()
2436            .add(Arc::new(Language::new(
2437                LanguageConfig {
2438                    name: "Rust".to_string(),
2439                    path_suffixes: vec!["rs".to_string()],
2440                    language_server: Some(language_server_config),
2441                    ..Default::default()
2442                },
2443                Some(tree_sitter_rust::language()),
2444            )));
2445
2446        // Connect to a server as 2 clients.
2447        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2448        let client_a = server.create_client(&mut cx_a, "user_a").await;
2449        let client_b = server.create_client(&mut cx_b, "user_b").await;
2450
2451        // Share a project as client A
2452        fs.insert_tree(
2453            "/a",
2454            json!({
2455                ".zed.toml": r#"collaborators = ["user_b"]"#,
2456                "a.rs": "let one = two",
2457            }),
2458        )
2459        .await;
2460        let project_a = cx_a.update(|cx| {
2461            Project::local(
2462                client_a.clone(),
2463                client_a.user_store.clone(),
2464                lang_registry.clone(),
2465                fs.clone(),
2466                cx,
2467            )
2468        });
2469        let (worktree_a, _) = project_a
2470            .update(&mut cx_a, |p, cx| {
2471                p.find_or_create_local_worktree("/a", false, cx)
2472            })
2473            .await
2474            .unwrap();
2475        worktree_a
2476            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2477            .await;
2478        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2479        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2480        project_a
2481            .update(&mut cx_a, |p, cx| p.share(cx))
2482            .await
2483            .unwrap();
2484
2485        // Join the worktree as client B.
2486        let project_b = Project::remote(
2487            project_id,
2488            client_b.clone(),
2489            client_b.user_store.clone(),
2490            lang_registry.clone(),
2491            fs.clone(),
2492            &mut cx_b.to_async(),
2493        )
2494        .await
2495        .unwrap();
2496
2497        let buffer_b = cx_b
2498            .background()
2499            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2500            .await
2501            .unwrap();
2502
2503        let format = project_b.update(&mut cx_b, |project, cx| {
2504            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2505        });
2506
2507        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2508        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2509            Some(vec![
2510                lsp::TextEdit {
2511                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2512                    new_text: "h".to_string(),
2513                },
2514                lsp::TextEdit {
2515                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2516                    new_text: "y".to_string(),
2517                },
2518            ])
2519        });
2520
2521        format.await.unwrap();
2522        assert_eq!(
2523            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2524            "let honey = two"
2525        );
2526    }
2527
2528    #[gpui::test(iterations = 10)]
2529    async fn test_definition(
2530        mut cx_a: TestAppContext,
2531        mut cx_b: TestAppContext,
2532        last_iteration: bool,
2533    ) {
2534        cx_a.foreground().forbid_parking();
2535        let mut lang_registry = Arc::new(LanguageRegistry::new());
2536        let fs = Arc::new(FakeFs::new(cx_a.background()));
2537        fs.insert_tree(
2538            "/root-1",
2539            json!({
2540                ".zed.toml": r#"collaborators = ["user_b"]"#,
2541                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2542            }),
2543        )
2544        .await;
2545        fs.insert_tree(
2546            "/root-2",
2547            json!({
2548                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2549            }),
2550        )
2551        .await;
2552
2553        // Set up a fake language server.
2554        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2555        Arc::get_mut(&mut lang_registry)
2556            .unwrap()
2557            .add(Arc::new(Language::new(
2558                LanguageConfig {
2559                    name: "Rust".to_string(),
2560                    path_suffixes: vec!["rs".to_string()],
2561                    language_server: Some(language_server_config),
2562                    ..Default::default()
2563                },
2564                Some(tree_sitter_rust::language()),
2565            )));
2566
2567        // Connect to a server as 2 clients.
2568        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2569        let client_a = server.create_client(&mut cx_a, "user_a").await;
2570        let client_b = server.create_client(&mut cx_b, "user_b").await;
2571
2572        // Share a project as client A
2573        let project_a = cx_a.update(|cx| {
2574            Project::local(
2575                client_a.clone(),
2576                client_a.user_store.clone(),
2577                lang_registry.clone(),
2578                fs.clone(),
2579                cx,
2580            )
2581        });
2582        let (worktree_a, _) = project_a
2583            .update(&mut cx_a, |p, cx| {
2584                p.find_or_create_local_worktree("/root-1", false, cx)
2585            })
2586            .await
2587            .unwrap();
2588        worktree_a
2589            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2590            .await;
2591        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2592        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2593        project_a
2594            .update(&mut cx_a, |p, cx| p.share(cx))
2595            .await
2596            .unwrap();
2597
2598        // Join the worktree as client B.
2599        let project_b = Project::remote(
2600            project_id,
2601            client_b.clone(),
2602            client_b.user_store.clone(),
2603            lang_registry.clone(),
2604            fs.clone(),
2605            &mut cx_b.to_async(),
2606        )
2607        .await
2608        .unwrap();
2609
2610        // Open the file on client B.
2611        let buffer_b = cx_b
2612            .background()
2613            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2614            .await
2615            .unwrap();
2616
2617        // Request the definition of a symbol as the guest.
2618        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2619
2620        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2621        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2622            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2623                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2624                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2625            )))
2626        });
2627
2628        let definitions_1 = definitions_1.await.unwrap();
2629        cx_b.read(|cx| {
2630            assert_eq!(definitions_1.len(), 1);
2631            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2632            let target_buffer = definitions_1[0].target_buffer.read(cx);
2633            assert_eq!(
2634                target_buffer.text(),
2635                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2636            );
2637            assert_eq!(
2638                definitions_1[0].target_range.to_point(target_buffer),
2639                Point::new(0, 6)..Point::new(0, 9)
2640            );
2641        });
2642
2643        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2644        // the previous call to `definition`.
2645        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2646        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2647            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2648                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2649                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2650            )))
2651        });
2652
2653        let definitions_2 = definitions_2.await.unwrap();
2654        cx_b.read(|cx| {
2655            assert_eq!(definitions_2.len(), 1);
2656            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2657            let target_buffer = definitions_2[0].target_buffer.read(cx);
2658            assert_eq!(
2659                target_buffer.text(),
2660                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2661            );
2662            assert_eq!(
2663                definitions_2[0].target_range.to_point(target_buffer),
2664                Point::new(1, 6)..Point::new(1, 11)
2665            );
2666        });
2667        assert_eq!(
2668            definitions_1[0].target_buffer,
2669            definitions_2[0].target_buffer
2670        );
2671
2672        cx_b.update(|_| {
2673            drop(definitions_1);
2674            drop(definitions_2);
2675        });
2676        project_b
2677            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2678            .await;
2679    }
2680
2681    #[gpui::test(iterations = 10)]
2682    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2683        mut cx_a: TestAppContext,
2684        mut cx_b: TestAppContext,
2685        mut rng: StdRng,
2686        last_iteration: bool,
2687    ) {
2688        cx_a.foreground().forbid_parking();
2689        let mut lang_registry = Arc::new(LanguageRegistry::new());
2690        let fs = Arc::new(FakeFs::new(cx_a.background()));
2691        fs.insert_tree(
2692            "/root",
2693            json!({
2694                ".zed.toml": r#"collaborators = ["user_b"]"#,
2695                "a.rs": "const ONE: usize = b::TWO;",
2696                "b.rs": "const TWO: usize = 2",
2697            }),
2698        )
2699        .await;
2700
2701        // Set up a fake language server.
2702        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2703
2704        Arc::get_mut(&mut lang_registry)
2705            .unwrap()
2706            .add(Arc::new(Language::new(
2707                LanguageConfig {
2708                    name: "Rust".to_string(),
2709                    path_suffixes: vec!["rs".to_string()],
2710                    language_server: Some(language_server_config),
2711                    ..Default::default()
2712                },
2713                Some(tree_sitter_rust::language()),
2714            )));
2715
2716        // Connect to a server as 2 clients.
2717        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2718        let client_a = server.create_client(&mut cx_a, "user_a").await;
2719        let client_b = server.create_client(&mut cx_b, "user_b").await;
2720
2721        // Share a project as client A
2722        let project_a = cx_a.update(|cx| {
2723            Project::local(
2724                client_a.clone(),
2725                client_a.user_store.clone(),
2726                lang_registry.clone(),
2727                fs.clone(),
2728                cx,
2729            )
2730        });
2731
2732        let (worktree_a, _) = project_a
2733            .update(&mut cx_a, |p, cx| {
2734                p.find_or_create_local_worktree("/root", false, cx)
2735            })
2736            .await
2737            .unwrap();
2738        worktree_a
2739            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2740            .await;
2741        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2742        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2743        project_a
2744            .update(&mut cx_a, |p, cx| p.share(cx))
2745            .await
2746            .unwrap();
2747
2748        // Join the worktree as client B.
2749        let project_b = Project::remote(
2750            project_id,
2751            client_b.clone(),
2752            client_b.user_store.clone(),
2753            lang_registry.clone(),
2754            fs.clone(),
2755            &mut cx_b.to_async(),
2756        )
2757        .await
2758        .unwrap();
2759
2760        let buffer_b1 = cx_b
2761            .background()
2762            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2763            .await
2764            .unwrap();
2765
2766        let definitions;
2767        let buffer_b2;
2768        if rng.gen() {
2769            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2770            buffer_b2 =
2771                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2772        } else {
2773            buffer_b2 =
2774                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2775            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2776        }
2777
2778        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2779        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2780            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2781                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2782                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2783            )))
2784        });
2785
2786        let buffer_b2 = buffer_b2.await.unwrap();
2787        let definitions = definitions.await.unwrap();
2788        assert_eq!(definitions.len(), 1);
2789        assert_eq!(definitions[0].target_buffer, buffer_b2);
2790    }
2791
2792    #[gpui::test(iterations = 10)]
2793    async fn test_collaborating_with_code_actions(
2794        mut cx_a: TestAppContext,
2795        mut cx_b: TestAppContext,
2796        last_iteration: bool,
2797    ) {
2798        cx_a.foreground().forbid_parking();
2799        let mut lang_registry = Arc::new(LanguageRegistry::new());
2800        let fs = Arc::new(FakeFs::new(cx_a.background()));
2801        let mut path_openers_b = Vec::new();
2802        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2803
2804        // Set up a fake language server.
2805        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2806        Arc::get_mut(&mut lang_registry)
2807            .unwrap()
2808            .add(Arc::new(Language::new(
2809                LanguageConfig {
2810                    name: "Rust".to_string(),
2811                    path_suffixes: vec!["rs".to_string()],
2812                    language_server: Some(language_server_config),
2813                    ..Default::default()
2814                },
2815                Some(tree_sitter_rust::language()),
2816            )));
2817
2818        // Connect to a server as 2 clients.
2819        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2820        let client_a = server.create_client(&mut cx_a, "user_a").await;
2821        let client_b = server.create_client(&mut cx_b, "user_b").await;
2822
2823        // Share a project as client A
2824        fs.insert_tree(
2825            "/a",
2826            json!({
2827                ".zed.toml": r#"collaborators = ["user_b"]"#,
2828                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2829                "other.rs": "pub fn foo() -> usize { 4 }",
2830            }),
2831        )
2832        .await;
2833        let project_a = cx_a.update(|cx| {
2834            Project::local(
2835                client_a.clone(),
2836                client_a.user_store.clone(),
2837                lang_registry.clone(),
2838                fs.clone(),
2839                cx,
2840            )
2841        });
2842        let (worktree_a, _) = project_a
2843            .update(&mut cx_a, |p, cx| {
2844                p.find_or_create_local_worktree("/a", false, cx)
2845            })
2846            .await
2847            .unwrap();
2848        worktree_a
2849            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2850            .await;
2851        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2852        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2853        project_a
2854            .update(&mut cx_a, |p, cx| p.share(cx))
2855            .await
2856            .unwrap();
2857
2858        // Join the worktree as client B.
2859        let project_b = Project::remote(
2860            project_id,
2861            client_b.clone(),
2862            client_b.user_store.clone(),
2863            lang_registry.clone(),
2864            fs.clone(),
2865            &mut cx_b.to_async(),
2866        )
2867        .await
2868        .unwrap();
2869        let mut params = cx_b.update(WorkspaceParams::test);
2870        params.languages = lang_registry.clone();
2871        params.client = client_b.client.clone();
2872        params.user_store = client_b.user_store.clone();
2873        params.project = project_b;
2874        params.path_openers = path_openers_b.into();
2875
2876        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2877        let editor_b = workspace_b
2878            .update(&mut cx_b, |workspace, cx| {
2879                workspace.open_path((worktree_id, "main.rs").into(), cx)
2880            })
2881            .await
2882            .unwrap()
2883            .downcast::<Editor>()
2884            .unwrap();
2885
2886        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2887        fake_language_server
2888            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2889                assert_eq!(
2890                    params.text_document.uri,
2891                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2892                );
2893                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2894                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2895                None
2896            })
2897            .next()
2898            .await;
2899
2900        // Move cursor to a location that contains code actions.
2901        editor_b.update(&mut cx_b, |editor, cx| {
2902            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2903            cx.focus(&editor_b);
2904        });
2905
2906        fake_language_server
2907            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2908                assert_eq!(
2909                    params.text_document.uri,
2910                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2911                );
2912                assert_eq!(params.range.start, lsp::Position::new(1, 31));
2913                assert_eq!(params.range.end, lsp::Position::new(1, 31));
2914
2915                Some(vec![lsp::CodeActionOrCommand::CodeAction(
2916                    lsp::CodeAction {
2917                        title: "Inline into all callers".to_string(),
2918                        edit: Some(lsp::WorkspaceEdit {
2919                            changes: Some(
2920                                [
2921                                    (
2922                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
2923                                        vec![lsp::TextEdit::new(
2924                                            lsp::Range::new(
2925                                                lsp::Position::new(1, 22),
2926                                                lsp::Position::new(1, 34),
2927                                            ),
2928                                            "4".to_string(),
2929                                        )],
2930                                    ),
2931                                    (
2932                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
2933                                        vec![lsp::TextEdit::new(
2934                                            lsp::Range::new(
2935                                                lsp::Position::new(0, 0),
2936                                                lsp::Position::new(0, 27),
2937                                            ),
2938                                            "".to_string(),
2939                                        )],
2940                                    ),
2941                                ]
2942                                .into_iter()
2943                                .collect(),
2944                            ),
2945                            ..Default::default()
2946                        }),
2947                        data: Some(json!({
2948                            "codeActionParams": {
2949                                "range": {
2950                                    "start": {"line": 1, "column": 31},
2951                                    "end": {"line": 1, "column": 31},
2952                                }
2953                            }
2954                        })),
2955                        ..Default::default()
2956                    },
2957                )])
2958            })
2959            .next()
2960            .await;
2961
2962        // Toggle code actions and wait for them to display.
2963        editor_b.update(&mut cx_b, |editor, cx| {
2964            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2965        });
2966        editor_b
2967            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2968            .await;
2969
2970        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
2971
2972        // Confirming the code action will trigger a resolve request.
2973        let confirm_action = workspace_b
2974            .update(&mut cx_b, |workspace, cx| {
2975                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2976            })
2977            .unwrap();
2978        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2979            lsp::CodeAction {
2980                title: "Inline into all callers".to_string(),
2981                edit: Some(lsp::WorkspaceEdit {
2982                    changes: Some(
2983                        [
2984                            (
2985                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2986                                vec![lsp::TextEdit::new(
2987                                    lsp::Range::new(
2988                                        lsp::Position::new(1, 22),
2989                                        lsp::Position::new(1, 34),
2990                                    ),
2991                                    "4".to_string(),
2992                                )],
2993                            ),
2994                            (
2995                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2996                                vec![lsp::TextEdit::new(
2997                                    lsp::Range::new(
2998                                        lsp::Position::new(0, 0),
2999                                        lsp::Position::new(0, 27),
3000                                    ),
3001                                    "".to_string(),
3002                                )],
3003                            ),
3004                        ]
3005                        .into_iter()
3006                        .collect(),
3007                    ),
3008                    ..Default::default()
3009                }),
3010                ..Default::default()
3011            }
3012        });
3013
3014        // After the action is confirmed, an editor containing both modified files is opened.
3015        confirm_action.await.unwrap();
3016        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3017            workspace
3018                .active_item(cx)
3019                .unwrap()
3020                .downcast::<Editor>()
3021                .unwrap()
3022        });
3023        code_action_editor.update(&mut cx_b, |editor, cx| {
3024            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3025            editor.undo(&Undo, cx);
3026            assert_eq!(
3027                editor.text(cx),
3028                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3029            );
3030            editor.redo(&Redo, cx);
3031            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3032        });
3033    }
3034
3035    #[gpui::test(iterations = 10)]
3036    async fn test_basic_chat(
3037        mut cx_a: TestAppContext,
3038        mut cx_b: TestAppContext,
3039        last_iteration: bool,
3040    ) {
3041        cx_a.foreground().forbid_parking();
3042
3043        // Connect to a server as 2 clients.
3044        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3045        let client_a = server.create_client(&mut cx_a, "user_a").await;
3046        let client_b = server.create_client(&mut cx_b, "user_b").await;
3047
3048        // Create an org that includes these 2 users.
3049        let db = &server.app_state.db;
3050        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3051        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3052            .await
3053            .unwrap();
3054        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3055            .await
3056            .unwrap();
3057
3058        // Create a channel that includes all the users.
3059        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3060        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3061            .await
3062            .unwrap();
3063        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3064            .await
3065            .unwrap();
3066        db.create_channel_message(
3067            channel_id,
3068            client_b.current_user_id(&cx_b),
3069            "hello A, it's B.",
3070            OffsetDateTime::now_utc(),
3071            1,
3072        )
3073        .await
3074        .unwrap();
3075
3076        let channels_a = cx_a
3077            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3078        channels_a
3079            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3080            .await;
3081        channels_a.read_with(&cx_a, |list, _| {
3082            assert_eq!(
3083                list.available_channels().unwrap(),
3084                &[ChannelDetails {
3085                    id: channel_id.to_proto(),
3086                    name: "test-channel".to_string()
3087                }]
3088            )
3089        });
3090        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3091            this.get_channel(channel_id.to_proto(), cx).unwrap()
3092        });
3093        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3094        channel_a
3095            .condition(&cx_a, |channel, _| {
3096                channel_messages(channel)
3097                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3098            })
3099            .await;
3100
3101        let channels_b = cx_b
3102            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3103        channels_b
3104            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3105            .await;
3106        channels_b.read_with(&cx_b, |list, _| {
3107            assert_eq!(
3108                list.available_channels().unwrap(),
3109                &[ChannelDetails {
3110                    id: channel_id.to_proto(),
3111                    name: "test-channel".to_string()
3112                }]
3113            )
3114        });
3115
3116        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3117            this.get_channel(channel_id.to_proto(), cx).unwrap()
3118        });
3119        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3120        channel_b
3121            .condition(&cx_b, |channel, _| {
3122                channel_messages(channel)
3123                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3124            })
3125            .await;
3126
3127        channel_a
3128            .update(&mut cx_a, |channel, cx| {
3129                channel
3130                    .send_message("oh, hi B.".to_string(), cx)
3131                    .unwrap()
3132                    .detach();
3133                let task = channel.send_message("sup".to_string(), cx).unwrap();
3134                assert_eq!(
3135                    channel_messages(channel),
3136                    &[
3137                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3138                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3139                        ("user_a".to_string(), "sup".to_string(), true)
3140                    ]
3141                );
3142                task
3143            })
3144            .await
3145            .unwrap();
3146
3147        channel_b
3148            .condition(&cx_b, |channel, _| {
3149                channel_messages(channel)
3150                    == [
3151                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3152                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3153                        ("user_a".to_string(), "sup".to_string(), false),
3154                    ]
3155            })
3156            .await;
3157
3158        assert_eq!(
3159            server
3160                .state()
3161                .await
3162                .channel(channel_id)
3163                .unwrap()
3164                .connection_ids
3165                .len(),
3166            2
3167        );
3168        cx_b.update(|_| drop(channel_b));
3169        server
3170            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3171            .await;
3172
3173        cx_a.update(|_| drop(channel_a));
3174        server
3175            .condition(|state| state.channel(channel_id).is_none())
3176            .await;
3177    }
3178
3179    #[gpui::test(iterations = 10)]
3180    async fn test_chat_message_validation(mut cx_a: TestAppContext, last_iteration: bool) {
3181        cx_a.foreground().forbid_parking();
3182
3183        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3184        let client_a = server.create_client(&mut cx_a, "user_a").await;
3185
3186        let db = &server.app_state.db;
3187        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3188        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3189        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3190            .await
3191            .unwrap();
3192        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3193            .await
3194            .unwrap();
3195
3196        let channels_a = cx_a
3197            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3198        channels_a
3199            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3200            .await;
3201        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3202            this.get_channel(channel_id.to_proto(), cx).unwrap()
3203        });
3204
3205        // Messages aren't allowed to be too long.
3206        channel_a
3207            .update(&mut cx_a, |channel, cx| {
3208                let long_body = "this is long.\n".repeat(1024);
3209                channel.send_message(long_body, cx).unwrap()
3210            })
3211            .await
3212            .unwrap_err();
3213
3214        // Messages aren't allowed to be blank.
3215        channel_a.update(&mut cx_a, |channel, cx| {
3216            channel.send_message(String::new(), cx).unwrap_err()
3217        });
3218
3219        // Leading and trailing whitespace are trimmed.
3220        channel_a
3221            .update(&mut cx_a, |channel, cx| {
3222                channel
3223                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3224                    .unwrap()
3225            })
3226            .await
3227            .unwrap();
3228        assert_eq!(
3229            db.get_channel_messages(channel_id, 10, None)
3230                .await
3231                .unwrap()
3232                .iter()
3233                .map(|m| &m.body)
3234                .collect::<Vec<_>>(),
3235            &["surrounded by whitespace"]
3236        );
3237    }
3238
3239    #[gpui::test(iterations = 10)]
3240    async fn test_chat_reconnection(
3241        mut cx_a: TestAppContext,
3242        mut cx_b: TestAppContext,
3243        last_iteration: bool,
3244    ) {
3245        cx_a.foreground().forbid_parking();
3246
3247        // Connect to a server as 2 clients.
3248        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3249        let client_a = server.create_client(&mut cx_a, "user_a").await;
3250        let client_b = server.create_client(&mut cx_b, "user_b").await;
3251        let mut status_b = client_b.status();
3252
3253        // Create an org that includes these 2 users.
3254        let db = &server.app_state.db;
3255        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3256        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3257            .await
3258            .unwrap();
3259        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3260            .await
3261            .unwrap();
3262
3263        // Create a channel that includes all the users.
3264        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3265        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3266            .await
3267            .unwrap();
3268        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3269            .await
3270            .unwrap();
3271        db.create_channel_message(
3272            channel_id,
3273            client_b.current_user_id(&cx_b),
3274            "hello A, it's B.",
3275            OffsetDateTime::now_utc(),
3276            2,
3277        )
3278        .await
3279        .unwrap();
3280
3281        let channels_a = cx_a
3282            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3283        channels_a
3284            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3285            .await;
3286
3287        channels_a.read_with(&cx_a, |list, _| {
3288            assert_eq!(
3289                list.available_channels().unwrap(),
3290                &[ChannelDetails {
3291                    id: channel_id.to_proto(),
3292                    name: "test-channel".to_string()
3293                }]
3294            )
3295        });
3296        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3297            this.get_channel(channel_id.to_proto(), cx).unwrap()
3298        });
3299        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3300        channel_a
3301            .condition(&cx_a, |channel, _| {
3302                channel_messages(channel)
3303                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3304            })
3305            .await;
3306
3307        let channels_b = cx_b
3308            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3309        channels_b
3310            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3311            .await;
3312        channels_b.read_with(&cx_b, |list, _| {
3313            assert_eq!(
3314                list.available_channels().unwrap(),
3315                &[ChannelDetails {
3316                    id: channel_id.to_proto(),
3317                    name: "test-channel".to_string()
3318                }]
3319            )
3320        });
3321
3322        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3323            this.get_channel(channel_id.to_proto(), cx).unwrap()
3324        });
3325        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3326        channel_b
3327            .condition(&cx_b, |channel, _| {
3328                channel_messages(channel)
3329                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3330            })
3331            .await;
3332
3333        // Disconnect client B, ensuring we can still access its cached channel data.
3334        server.forbid_connections();
3335        server.disconnect_client(client_b.current_user_id(&cx_b));
3336        while !matches!(
3337            status_b.next().await,
3338            Some(client::Status::ReconnectionError { .. })
3339        ) {}
3340
3341        channels_b.read_with(&cx_b, |channels, _| {
3342            assert_eq!(
3343                channels.available_channels().unwrap(),
3344                [ChannelDetails {
3345                    id: channel_id.to_proto(),
3346                    name: "test-channel".to_string()
3347                }]
3348            )
3349        });
3350        channel_b.read_with(&cx_b, |channel, _| {
3351            assert_eq!(
3352                channel_messages(channel),
3353                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3354            )
3355        });
3356
3357        // Send a message from client B while it is disconnected.
3358        channel_b
3359            .update(&mut cx_b, |channel, cx| {
3360                let task = channel
3361                    .send_message("can you see this?".to_string(), cx)
3362                    .unwrap();
3363                assert_eq!(
3364                    channel_messages(channel),
3365                    &[
3366                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3367                        ("user_b".to_string(), "can you see this?".to_string(), true)
3368                    ]
3369                );
3370                task
3371            })
3372            .await
3373            .unwrap_err();
3374
3375        // Send a message from client A while B is disconnected.
3376        channel_a
3377            .update(&mut cx_a, |channel, cx| {
3378                channel
3379                    .send_message("oh, hi B.".to_string(), cx)
3380                    .unwrap()
3381                    .detach();
3382                let task = channel.send_message("sup".to_string(), cx).unwrap();
3383                assert_eq!(
3384                    channel_messages(channel),
3385                    &[
3386                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3387                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3388                        ("user_a".to_string(), "sup".to_string(), true)
3389                    ]
3390                );
3391                task
3392            })
3393            .await
3394            .unwrap();
3395
3396        // Give client B a chance to reconnect.
3397        server.allow_connections();
3398        cx_b.foreground().advance_clock(Duration::from_secs(10));
3399
3400        // Verify that B sees the new messages upon reconnection, as well as the message client B
3401        // sent while offline.
3402        channel_b
3403            .condition(&cx_b, |channel, _| {
3404                channel_messages(channel)
3405                    == [
3406                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3407                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3408                        ("user_a".to_string(), "sup".to_string(), false),
3409                        ("user_b".to_string(), "can you see this?".to_string(), false),
3410                    ]
3411            })
3412            .await;
3413
3414        // Ensure client A and B can communicate normally after reconnection.
3415        channel_a
3416            .update(&mut cx_a, |channel, cx| {
3417                channel.send_message("you online?".to_string(), cx).unwrap()
3418            })
3419            .await
3420            .unwrap();
3421        channel_b
3422            .condition(&cx_b, |channel, _| {
3423                channel_messages(channel)
3424                    == [
3425                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3426                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3427                        ("user_a".to_string(), "sup".to_string(), false),
3428                        ("user_b".to_string(), "can you see this?".to_string(), false),
3429                        ("user_a".to_string(), "you online?".to_string(), false),
3430                    ]
3431            })
3432            .await;
3433
3434        channel_b
3435            .update(&mut cx_b, |channel, cx| {
3436                channel.send_message("yep".to_string(), cx).unwrap()
3437            })
3438            .await
3439            .unwrap();
3440        channel_a
3441            .condition(&cx_a, |channel, _| {
3442                channel_messages(channel)
3443                    == [
3444                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3445                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3446                        ("user_a".to_string(), "sup".to_string(), false),
3447                        ("user_b".to_string(), "can you see this?".to_string(), false),
3448                        ("user_a".to_string(), "you online?".to_string(), false),
3449                        ("user_b".to_string(), "yep".to_string(), false),
3450                    ]
3451            })
3452            .await;
3453    }
3454
3455    #[gpui::test(iterations = 10)]
3456    async fn test_contacts(
3457        mut cx_a: TestAppContext,
3458        mut cx_b: TestAppContext,
3459        mut cx_c: TestAppContext,
3460        last_iteration: bool,
3461    ) {
3462        cx_a.foreground().forbid_parking();
3463        let lang_registry = Arc::new(LanguageRegistry::new());
3464        let fs = Arc::new(FakeFs::new(cx_a.background()));
3465
3466        // Connect to a server as 3 clients.
3467        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3468        let client_a = server.create_client(&mut cx_a, "user_a").await;
3469        let client_b = server.create_client(&mut cx_b, "user_b").await;
3470        let client_c = server.create_client(&mut cx_c, "user_c").await;
3471
3472        // Share a worktree as client A.
3473        fs.insert_tree(
3474            "/a",
3475            json!({
3476                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3477            }),
3478        )
3479        .await;
3480
3481        let project_a = cx_a.update(|cx| {
3482            Project::local(
3483                client_a.clone(),
3484                client_a.user_store.clone(),
3485                lang_registry.clone(),
3486                fs.clone(),
3487                cx,
3488            )
3489        });
3490        let (worktree_a, _) = project_a
3491            .update(&mut cx_a, |p, cx| {
3492                p.find_or_create_local_worktree("/a", false, cx)
3493            })
3494            .await
3495            .unwrap();
3496        worktree_a
3497            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3498            .await;
3499
3500        client_a
3501            .user_store
3502            .condition(&cx_a, |user_store, _| {
3503                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3504            })
3505            .await;
3506        client_b
3507            .user_store
3508            .condition(&cx_b, |user_store, _| {
3509                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3510            })
3511            .await;
3512        client_c
3513            .user_store
3514            .condition(&cx_c, |user_store, _| {
3515                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3516            })
3517            .await;
3518
3519        let project_id = project_a
3520            .update(&mut cx_a, |project, _| project.next_remote_id())
3521            .await;
3522        project_a
3523            .update(&mut cx_a, |project, cx| project.share(cx))
3524            .await
3525            .unwrap();
3526
3527        let _project_b = Project::remote(
3528            project_id,
3529            client_b.clone(),
3530            client_b.user_store.clone(),
3531            lang_registry.clone(),
3532            fs.clone(),
3533            &mut cx_b.to_async(),
3534        )
3535        .await
3536        .unwrap();
3537
3538        client_a
3539            .user_store
3540            .condition(&cx_a, |user_store, _| {
3541                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3542            })
3543            .await;
3544        client_b
3545            .user_store
3546            .condition(&cx_b, |user_store, _| {
3547                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3548            })
3549            .await;
3550        client_c
3551            .user_store
3552            .condition(&cx_c, |user_store, _| {
3553                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3554            })
3555            .await;
3556
3557        project_a
3558            .condition(&cx_a, |project, _| {
3559                project.collaborators().contains_key(&client_b.peer_id)
3560            })
3561            .await;
3562
3563        cx_a.update(move |_| drop(project_a));
3564        client_a
3565            .user_store
3566            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3567            .await;
3568        client_b
3569            .user_store
3570            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3571            .await;
3572        client_c
3573            .user_store
3574            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3575            .await;
3576
3577        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3578            user_store
3579                .contacts()
3580                .iter()
3581                .map(|contact| {
3582                    let worktrees = contact
3583                        .projects
3584                        .iter()
3585                        .map(|p| {
3586                            (
3587                                p.worktree_root_names[0].as_str(),
3588                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3589                            )
3590                        })
3591                        .collect();
3592                    (contact.user.github_login.as_str(), worktrees)
3593                })
3594                .collect()
3595        }
3596    }
3597
3598    #[gpui::test(iterations = 100)]
3599    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng, last_iteration: bool) {
3600        cx.foreground().forbid_parking();
3601        let max_peers = env::var("MAX_PEERS")
3602            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3603            .unwrap_or(5);
3604        let max_operations = env::var("OPERATIONS")
3605            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3606            .unwrap_or(10);
3607
3608        let rng = Rc::new(RefCell::new(rng));
3609
3610        let mut host_lang_registry = Arc::new(LanguageRegistry::new());
3611        let guest_lang_registry = Arc::new(LanguageRegistry::new());
3612
3613        // Set up a fake language server.
3614        let (mut language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3615        language_server_config.set_fake_initializer(|fake_server| {
3616            fake_server.handle_request::<lsp::request::Completion, _>(|_| {
3617                Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
3618                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3619                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3620                        new_text: "the-new-text".to_string(),
3621                    })),
3622                    ..Default::default()
3623                }]))
3624            });
3625
3626            fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
3627                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3628                    lsp::CodeAction {
3629                        title: "the-code-action".to_string(),
3630                        ..Default::default()
3631                    },
3632                )])
3633            });
3634        });
3635
3636        Arc::get_mut(&mut host_lang_registry)
3637            .unwrap()
3638            .add(Arc::new(Language::new(
3639                LanguageConfig {
3640                    name: "Rust".to_string(),
3641                    path_suffixes: vec!["rs".to_string()],
3642                    language_server: Some(language_server_config),
3643                    ..Default::default()
3644                },
3645                Some(tree_sitter_rust::language()),
3646            )));
3647
3648        let fs = Arc::new(FakeFs::new(cx.background()));
3649        fs.insert_tree(
3650            "/_collab",
3651            json!({
3652                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3653            }),
3654        )
3655        .await;
3656
3657        let operations = Rc::new(Cell::new(0));
3658        let mut server = TestServer::start(cx.foreground(), last_iteration).await;
3659        let mut clients = Vec::new();
3660
3661        let mut next_entity_id = 100000;
3662        let mut host_cx = TestAppContext::new(
3663            cx.foreground_platform(),
3664            cx.platform(),
3665            cx.foreground(),
3666            cx.background(),
3667            cx.font_cache(),
3668            next_entity_id,
3669        );
3670        let host = server.create_client(&mut host_cx, "host").await;
3671        let host_project = host_cx.update(|cx| {
3672            Project::local(
3673                host.client.clone(),
3674                host.user_store.clone(),
3675                host_lang_registry.clone(),
3676                fs.clone(),
3677                cx,
3678            )
3679        });
3680        let host_project_id = host_project
3681            .update(&mut host_cx, |p, _| p.next_remote_id())
3682            .await;
3683
3684        let (collab_worktree, _) = host_project
3685            .update(&mut host_cx, |project, cx| {
3686                project.find_or_create_local_worktree("/_collab", false, cx)
3687            })
3688            .await
3689            .unwrap();
3690        collab_worktree
3691            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3692            .await;
3693        host_project
3694            .update(&mut host_cx, |project, cx| project.share(cx))
3695            .await
3696            .unwrap();
3697
3698        clients.push(cx.foreground().spawn(host.simulate_host(
3699            host_project.clone(),
3700            operations.clone(),
3701            max_operations,
3702            rng.clone(),
3703            host_cx.clone(),
3704        )));
3705
3706        while operations.get() < max_operations {
3707            cx.background().simulate_random_delay().await;
3708            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3709                operations.set(operations.get() + 1);
3710
3711                let guest_id = clients.len();
3712                log::info!("Adding guest {}", guest_id);
3713                next_entity_id += 100000;
3714                let mut guest_cx = TestAppContext::new(
3715                    cx.foreground_platform(),
3716                    cx.platform(),
3717                    cx.foreground(),
3718                    cx.background(),
3719                    cx.font_cache(),
3720                    next_entity_id,
3721                );
3722                let guest = server
3723                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3724                    .await;
3725                let guest_project = Project::remote(
3726                    host_project_id,
3727                    guest.client.clone(),
3728                    guest.user_store.clone(),
3729                    guest_lang_registry.clone(),
3730                    fs.clone(),
3731                    &mut guest_cx.to_async(),
3732                )
3733                .await
3734                .unwrap();
3735                clients.push(cx.foreground().spawn(guest.simulate_guest(
3736                    guest_id,
3737                    guest_project,
3738                    operations.clone(),
3739                    max_operations,
3740                    rng.clone(),
3741                    guest_cx,
3742                )));
3743
3744                log::info!("Guest {} added", guest_id);
3745            }
3746        }
3747
3748        let clients = futures::future::join_all(clients).await;
3749        cx.foreground().run_until_parked();
3750
3751        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3752            project
3753                .worktrees(cx)
3754                .map(|worktree| {
3755                    let snapshot = worktree.read(cx).snapshot();
3756                    (snapshot.id(), snapshot)
3757                })
3758                .collect::<BTreeMap<_, _>>()
3759        });
3760
3761        for (guest_client, guest_cx) in clients.iter().skip(1) {
3762            let guest_id = guest_client.client.id();
3763            let worktree_snapshots =
3764                guest_client
3765                    .project
3766                    .as_ref()
3767                    .unwrap()
3768                    .read_with(guest_cx, |project, cx| {
3769                        project
3770                            .worktrees(cx)
3771                            .map(|worktree| {
3772                                let snapshot = worktree.read(cx).snapshot();
3773                                (snapshot.id(), snapshot)
3774                            })
3775                            .collect::<BTreeMap<_, _>>()
3776                    });
3777
3778            assert_eq!(
3779                worktree_snapshots.keys().collect::<Vec<_>>(),
3780                host_worktree_snapshots.keys().collect::<Vec<_>>(),
3781                "guest {} has different worktrees than the host",
3782                guest_id
3783            );
3784            for (id, host_snapshot) in &host_worktree_snapshots {
3785                let guest_snapshot = &worktree_snapshots[id];
3786                assert_eq!(
3787                    guest_snapshot.root_name(),
3788                    host_snapshot.root_name(),
3789                    "guest {} has different root name than the host for worktree {}",
3790                    guest_id,
3791                    id
3792                );
3793                assert_eq!(
3794                    guest_snapshot.entries(false).collect::<Vec<_>>(),
3795                    host_snapshot.entries(false).collect::<Vec<_>>(),
3796                    "guest {} has different snapshot than the host for worktree {}",
3797                    guest_id,
3798                    id
3799                );
3800            }
3801
3802            guest_client
3803                .project
3804                .as_ref()
3805                .unwrap()
3806                .read_with(guest_cx, |project, _| {
3807                    assert!(
3808                        !project.has_buffered_operations(),
3809                        "guest {} has buffered operations ",
3810                        guest_id,
3811                    );
3812                });
3813
3814            for guest_buffer in &guest_client.buffers {
3815                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
3816                let host_buffer = host_project.read_with(&host_cx, |project, _| {
3817                    project
3818                        .shared_buffer(guest_client.peer_id, buffer_id)
3819                        .expect(&format!(
3820                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
3821                            guest_id, guest_client.peer_id, buffer_id
3822                        ))
3823                });
3824                assert_eq!(
3825                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
3826                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
3827                    "guest {} buffer {} differs from the host's buffer",
3828                    guest_id,
3829                    buffer_id,
3830                );
3831            }
3832        }
3833    }
3834
3835    struct TestServer {
3836        peer: Arc<Peer>,
3837        app_state: Arc<AppState>,
3838        server: Arc<Server>,
3839        foreground: Rc<executor::Foreground>,
3840        notifications: mpsc::Receiver<()>,
3841        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3842        forbid_connections: Arc<AtomicBool>,
3843        _test_db: TestDb,
3844    }
3845
3846    impl TestServer {
3847        async fn start(foreground: Rc<executor::Foreground>, clean_db_pool_on_drop: bool) -> Self {
3848            let mut test_db = TestDb::new();
3849            test_db.set_clean_pool_on_drop(clean_db_pool_on_drop);
3850            let app_state = Self::build_app_state(&test_db).await;
3851            let peer = Peer::new();
3852            let notifications = mpsc::channel(128);
3853            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3854            Self {
3855                peer,
3856                app_state,
3857                server,
3858                foreground,
3859                notifications: notifications.1,
3860                connection_killers: Default::default(),
3861                forbid_connections: Default::default(),
3862                _test_db: test_db,
3863            }
3864        }
3865
3866        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3867            let http = FakeHttpClient::with_404_response();
3868            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3869            let client_name = name.to_string();
3870            let mut client = Client::new(http.clone());
3871            let server = self.server.clone();
3872            let connection_killers = self.connection_killers.clone();
3873            let forbid_connections = self.forbid_connections.clone();
3874            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3875
3876            Arc::get_mut(&mut client)
3877                .unwrap()
3878                .override_authenticate(move |cx| {
3879                    cx.spawn(|_| async move {
3880                        let access_token = "the-token".to_string();
3881                        Ok(Credentials {
3882                            user_id: user_id.0 as u64,
3883                            access_token,
3884                        })
3885                    })
3886                })
3887                .override_establish_connection(move |credentials, cx| {
3888                    assert_eq!(credentials.user_id, user_id.0 as u64);
3889                    assert_eq!(credentials.access_token, "the-token");
3890
3891                    let server = server.clone();
3892                    let connection_killers = connection_killers.clone();
3893                    let forbid_connections = forbid_connections.clone();
3894                    let client_name = client_name.clone();
3895                    let connection_id_tx = connection_id_tx.clone();
3896                    cx.spawn(move |cx| async move {
3897                        if forbid_connections.load(SeqCst) {
3898                            Err(EstablishConnectionError::other(anyhow!(
3899                                "server is forbidding connections"
3900                            )))
3901                        } else {
3902                            let (client_conn, server_conn, kill_conn) =
3903                                Connection::in_memory(cx.background());
3904                            connection_killers.lock().insert(user_id, kill_conn);
3905                            cx.background()
3906                                .spawn(server.handle_connection(
3907                                    server_conn,
3908                                    client_name,
3909                                    user_id,
3910                                    Some(connection_id_tx),
3911                                    cx.background(),
3912                                ))
3913                                .detach();
3914                            Ok(client_conn)
3915                        }
3916                    })
3917                });
3918
3919            client
3920                .authenticate_and_connect(&cx.to_async())
3921                .await
3922                .unwrap();
3923
3924            Channel::init(&client);
3925            Project::init(&client);
3926
3927            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3928            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3929            let mut authed_user =
3930                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3931            while authed_user.next().await.unwrap().is_none() {}
3932
3933            TestClient {
3934                client,
3935                peer_id,
3936                user_store,
3937                project: Default::default(),
3938                buffers: Default::default(),
3939            }
3940        }
3941
3942        fn disconnect_client(&self, user_id: UserId) {
3943            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3944                let _ = kill_conn.try_send(Some(()));
3945            }
3946        }
3947
3948        fn forbid_connections(&self) {
3949            self.forbid_connections.store(true, SeqCst);
3950        }
3951
3952        fn allow_connections(&self) {
3953            self.forbid_connections.store(false, SeqCst);
3954        }
3955
3956        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3957            let mut config = Config::default();
3958            config.session_secret = "a".repeat(32);
3959            config.database_url = test_db.url.clone();
3960            let github_client = github::AppClient::test();
3961            Arc::new(AppState {
3962                db: test_db.db().clone(),
3963                handlebars: Default::default(),
3964                auth_client: auth::build_client("", ""),
3965                repo_client: github::RepoClient::test(&github_client),
3966                github_client,
3967                config,
3968            })
3969        }
3970
3971        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3972            self.server.store.read()
3973        }
3974
3975        async fn condition<F>(&mut self, mut predicate: F)
3976        where
3977            F: FnMut(&Store) -> bool,
3978        {
3979            async_std::future::timeout(Duration::from_millis(500), async {
3980                while !(predicate)(&*self.server.store.read()) {
3981                    self.foreground.start_waiting();
3982                    self.notifications.next().await;
3983                    self.foreground.finish_waiting();
3984                }
3985            })
3986            .await
3987            .expect("condition timed out");
3988        }
3989    }
3990
3991    impl Drop for TestServer {
3992        fn drop(&mut self) {
3993            self.peer.reset();
3994        }
3995    }
3996
3997    struct TestClient {
3998        client: Arc<Client>,
3999        pub peer_id: PeerId,
4000        pub user_store: ModelHandle<UserStore>,
4001        project: Option<ModelHandle<Project>>,
4002        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4003    }
4004
4005    impl Deref for TestClient {
4006        type Target = Arc<Client>;
4007
4008        fn deref(&self) -> &Self::Target {
4009            &self.client
4010        }
4011    }
4012
4013    impl TestClient {
4014        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4015            UserId::from_proto(
4016                self.user_store
4017                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4018            )
4019        }
4020
4021        async fn simulate_host(
4022            mut self,
4023            project: ModelHandle<Project>,
4024            operations: Rc<Cell<usize>>,
4025            max_operations: usize,
4026            rng: Rc<RefCell<StdRng>>,
4027            mut cx: TestAppContext,
4028        ) -> (Self, TestAppContext) {
4029            let fs = project.read_with(&cx, |project, _| project.fs().clone());
4030            let mut files: Vec<PathBuf> = Default::default();
4031            while operations.get() < max_operations {
4032                operations.set(operations.get() + 1);
4033
4034                let distribution = rng.borrow_mut().gen_range(0..100);
4035                match distribution {
4036                    0..=20 if !files.is_empty() => {
4037                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
4038                        while let Some(parent_path) = path.parent() {
4039                            path = parent_path;
4040                            if rng.borrow_mut().gen() {
4041                                break;
4042                            }
4043                        }
4044
4045                        log::info!("Host: find/create local worktree {:?}", path);
4046                        project
4047                            .update(&mut cx, |project, cx| {
4048                                project.find_or_create_local_worktree(path, false, cx)
4049                            })
4050                            .await
4051                            .unwrap();
4052                    }
4053                    10..=80 if !files.is_empty() => {
4054                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4055                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4056                            let (worktree, path) = project
4057                                .update(&mut cx, |project, cx| {
4058                                    project.find_or_create_local_worktree(file, false, cx)
4059                                })
4060                                .await
4061                                .unwrap();
4062                            let project_path =
4063                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4064                            log::info!("Host: opening path {:?}", project_path);
4065                            let buffer = project
4066                                .update(&mut cx, |project, cx| {
4067                                    project.open_buffer(project_path, cx)
4068                                })
4069                                .await
4070                                .unwrap();
4071                            self.buffers.insert(buffer.clone());
4072                            buffer
4073                        } else {
4074                            self.buffers
4075                                .iter()
4076                                .choose(&mut *rng.borrow_mut())
4077                                .unwrap()
4078                                .clone()
4079                        };
4080
4081                        if rng.borrow_mut().gen_bool(0.1) {
4082                            cx.update(|cx| {
4083                                log::info!(
4084                                    "Host: dropping buffer {:?}",
4085                                    buffer.read(cx).file().unwrap().full_path(cx)
4086                                );
4087                                self.buffers.remove(&buffer);
4088                                drop(buffer);
4089                            });
4090                        } else {
4091                            buffer.update(&mut cx, |buffer, cx| {
4092                                log::info!(
4093                                    "Host: updating buffer {:?}",
4094                                    buffer.file().unwrap().full_path(cx)
4095                                );
4096                                buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4097                            });
4098                        }
4099                    }
4100                    _ => loop {
4101                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
4102                        let mut path = PathBuf::new();
4103                        path.push("/");
4104                        for _ in 0..path_component_count {
4105                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4106                            path.push(std::str::from_utf8(&[letter]).unwrap());
4107                        }
4108                        path.set_extension("rs");
4109                        let parent_path = path.parent().unwrap();
4110
4111                        log::info!("Host: creating file {:?}", path);
4112                        if fs.create_dir(&parent_path).await.is_ok()
4113                            && fs.create_file(&path, Default::default()).await.is_ok()
4114                        {
4115                            files.push(path);
4116                            break;
4117                        } else {
4118                            log::info!("Host: cannot create file");
4119                        }
4120                    },
4121                }
4122
4123                cx.background().simulate_random_delay().await;
4124            }
4125
4126            self.project = Some(project);
4127            (self, cx)
4128        }
4129
4130        pub async fn simulate_guest(
4131            mut self,
4132            guest_id: usize,
4133            project: ModelHandle<Project>,
4134            operations: Rc<Cell<usize>>,
4135            max_operations: usize,
4136            rng: Rc<RefCell<StdRng>>,
4137            mut cx: TestAppContext,
4138        ) -> (Self, TestAppContext) {
4139            while operations.get() < max_operations {
4140                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4141                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4142                        project
4143                            .worktrees(&cx)
4144                            .filter(|worktree| {
4145                                worktree.read(cx).entries(false).any(|e| e.is_file())
4146                            })
4147                            .choose(&mut *rng.borrow_mut())
4148                    }) {
4149                        worktree
4150                    } else {
4151                        cx.background().simulate_random_delay().await;
4152                        continue;
4153                    };
4154
4155                    operations.set(operations.get() + 1);
4156                    let project_path = worktree.read_with(&cx, |worktree, _| {
4157                        let entry = worktree
4158                            .entries(false)
4159                            .filter(|e| e.is_file())
4160                            .choose(&mut *rng.borrow_mut())
4161                            .unwrap();
4162                        (worktree.id(), entry.path.clone())
4163                    });
4164                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4165                    let buffer = project
4166                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4167                        .await
4168                        .unwrap();
4169                    self.buffers.insert(buffer.clone());
4170                    buffer
4171                } else {
4172                    self.buffers
4173                        .iter()
4174                        .choose(&mut *rng.borrow_mut())
4175                        .unwrap()
4176                        .clone()
4177                };
4178
4179                let choice = rng.borrow_mut().gen_range(0..100);
4180                match choice {
4181                    0..=9 => {
4182                        cx.update(|cx| {
4183                            log::info!(
4184                                "Guest {}: dropping buffer {:?}",
4185                                guest_id,
4186                                buffer.read(cx).file().unwrap().full_path(cx)
4187                            );
4188                            self.buffers.remove(&buffer);
4189                            drop(buffer);
4190                        });
4191                    }
4192                    10..=14 => {
4193                        project
4194                            .update(&mut cx, |project, cx| {
4195                                log::info!(
4196                                    "Guest {}: requesting completions for buffer {:?}",
4197                                    guest_id,
4198                                    buffer.read(cx).file().unwrap().full_path(cx)
4199                                );
4200                                project.completions(&buffer, 0, cx)
4201                            })
4202                            .await
4203                            .expect("completion request failed");
4204                    }
4205                    15..=19 => {
4206                        project
4207                            .update(&mut cx, |project, cx| {
4208                                log::info!(
4209                                    "Guest {}: requesting code actions for buffer {:?}",
4210                                    guest_id,
4211                                    buffer.read(cx).file().unwrap().full_path(cx)
4212                                );
4213                                project.code_actions(&buffer, 0..0, cx)
4214                            })
4215                            .await
4216                            .expect("completion request failed");
4217                    }
4218                    _ => {
4219                        buffer.update(&mut cx, |buffer, cx| {
4220                            log::info!(
4221                                "Guest {}: updating buffer {:?}",
4222                                guest_id,
4223                                buffer.file().unwrap().full_path(cx)
4224                            );
4225                            buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4226                        });
4227                    }
4228                }
4229                cx.background().simulate_random_delay().await;
4230            }
4231
4232            self.project = Some(project);
4233            (self, cx)
4234        }
4235    }
4236
4237    impl Executor for Arc<gpui::executor::Background> {
4238        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4239            self.spawn(future).detach();
4240        }
4241    }
4242
4243    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4244        channel
4245            .messages()
4246            .cursor::<()>()
4247            .map(|m| {
4248                (
4249                    m.sender.github_login.clone(),
4250                    m.body.clone(),
4251                    m.is_pending(),
4252                )
4253            })
4254            .collect()
4255    }
4256
4257    struct EmptyView;
4258
4259    impl gpui::Entity for EmptyView {
4260        type Event = ();
4261    }
4262
4263    impl gpui::View for EmptyView {
4264        fn ui_name() -> &'static str {
4265            "empty view"
4266        }
4267
4268        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4269            gpui::Element::boxed(gpui::elements::Empty)
4270        }
4271    }
4272}