rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::open_buffer)
  83            .add_message_handler(Server::close_buffer)
  84            .add_request_handler(Server::update_buffer)
  85            .add_message_handler(Server::update_buffer_file)
  86            .add_message_handler(Server::buffer_reloaded)
  87            .add_message_handler(Server::buffer_saved)
  88            .add_request_handler(Server::save_buffer)
  89            .add_request_handler(Server::format_buffers)
  90            .add_request_handler(Server::get_completions)
  91            .add_request_handler(Server::apply_additional_edits_for_completion)
  92            .add_request_handler(Server::get_code_actions)
  93            .add_request_handler(Server::apply_code_action)
  94            .add_request_handler(Server::get_channels)
  95            .add_request_handler(Server::get_users)
  96            .add_request_handler(Server::join_channel)
  97            .add_message_handler(Server::leave_channel)
  98            .add_request_handler(Server::send_channel_message)
  99            .add_request_handler(Server::get_channel_messages);
 100
 101        Arc::new(server)
 102    }
 103
 104    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 105    where
 106        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 107        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 108        M: EnvelopedMessage,
 109    {
 110        let prev_handler = self.handlers.insert(
 111            TypeId::of::<M>(),
 112            Box::new(move |server, envelope| {
 113                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 114                (handler)(server, *envelope).boxed()
 115            }),
 116        );
 117        if prev_handler.is_some() {
 118            panic!("registered a handler for the same message twice");
 119        }
 120        self
 121    }
 122
 123    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 124    where
 125        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 126        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 127        M: RequestMessage,
 128    {
 129        self.add_message_handler(move |server, envelope| {
 130            let receipt = envelope.receipt();
 131            let response = (handler)(server.clone(), envelope);
 132            async move {
 133                match response.await {
 134                    Ok(response) => {
 135                        server.peer.respond(receipt, response)?;
 136                        Ok(())
 137                    }
 138                    Err(error) => {
 139                        server.peer.respond_with_error(
 140                            receipt,
 141                            proto::Error {
 142                                message: error.to_string(),
 143                            },
 144                        )?;
 145                        Err(error)
 146                    }
 147                }
 148            }
 149        })
 150    }
 151
 152    pub fn handle_connection<E: Executor>(
 153        self: &Arc<Self>,
 154        connection: Connection,
 155        addr: String,
 156        user_id: UserId,
 157        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 158        executor: E,
 159    ) -> impl Future<Output = ()> {
 160        let mut this = self.clone();
 161        async move {
 162            let (connection_id, handle_io, mut incoming_rx) =
 163                this.peer.add_connection(connection).await;
 164
 165            if let Some(send_connection_id) = send_connection_id.as_mut() {
 166                let _ = send_connection_id.send(connection_id).await;
 167            }
 168
 169            this.state_mut().add_connection(connection_id, user_id);
 170            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 171                log::error!("error updating contacts for {:?}: {}", user_id, err);
 172            }
 173
 174            let handle_io = handle_io.fuse();
 175            futures::pin_mut!(handle_io);
 176            loop {
 177                let next_message = incoming_rx.next().fuse();
 178                futures::pin_mut!(next_message);
 179                futures::select_biased! {
 180                    result = handle_io => {
 181                        if let Err(err) = result {
 182                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 183                        }
 184                        break;
 185                    }
 186                    message = next_message => {
 187                        if let Some(message) = message {
 188                            let start_time = Instant::now();
 189                            let type_name = message.payload_type_name();
 190                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 191                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 192                                let notifications = this.notifications.clone();
 193                                let is_background = message.is_background();
 194                                let handle_message = (handler)(this.clone(), message);
 195                                let handle_message = async move {
 196                                    if let Err(err) = handle_message.await {
 197                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 198                                    } else {
 199                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 200                                    }
 201                                    if let Some(mut notifications) = notifications {
 202                                        let _ = notifications.send(()).await;
 203                                    }
 204                                };
 205                                if is_background {
 206                                    executor.spawn_detached(handle_message);
 207                                } else {
 208                                    handle_message.await;
 209                                }
 210                            } else {
 211                                log::warn!("unhandled message: {}", type_name);
 212                            }
 213                        } else {
 214                            log::info!("rpc connection closed {:?}", addr);
 215                            break;
 216                        }
 217                    }
 218                }
 219            }
 220
 221            if let Err(err) = this.sign_out(connection_id).await {
 222                log::error!("error signing out connection {:?} - {:?}", addr, err);
 223            }
 224        }
 225    }
 226
 227    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 228        self.peer.disconnect(connection_id);
 229        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 230
 231        for (project_id, project) in removed_connection.hosted_projects {
 232            if let Some(share) = project.share {
 233                broadcast(
 234                    connection_id,
 235                    share.guests.keys().copied().collect(),
 236                    |conn_id| {
 237                        self.peer
 238                            .send(conn_id, proto::UnshareProject { project_id })
 239                    },
 240                )?;
 241            }
 242        }
 243
 244        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 245            broadcast(connection_id, peer_ids, |conn_id| {
 246                self.peer.send(
 247                    conn_id,
 248                    proto::RemoveProjectCollaborator {
 249                        project_id,
 250                        peer_id: connection_id.0,
 251                    },
 252                )
 253            })?;
 254        }
 255
 256        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 257        Ok(())
 258    }
 259
 260    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 261        Ok(proto::Ack {})
 262    }
 263
 264    async fn register_project(
 265        mut self: Arc<Server>,
 266        request: TypedEnvelope<proto::RegisterProject>,
 267    ) -> tide::Result<proto::RegisterProjectResponse> {
 268        let project_id = {
 269            let mut state = self.state_mut();
 270            let user_id = state.user_id_for_connection(request.sender_id)?;
 271            state.register_project(request.sender_id, user_id)
 272        };
 273        Ok(proto::RegisterProjectResponse { project_id })
 274    }
 275
 276    async fn unregister_project(
 277        mut self: Arc<Server>,
 278        request: TypedEnvelope<proto::UnregisterProject>,
 279    ) -> tide::Result<()> {
 280        let project = self
 281            .state_mut()
 282            .unregister_project(request.payload.project_id, request.sender_id)?;
 283        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 284        Ok(())
 285    }
 286
 287    async fn share_project(
 288        mut self: Arc<Server>,
 289        request: TypedEnvelope<proto::ShareProject>,
 290    ) -> tide::Result<proto::Ack> {
 291        self.state_mut()
 292            .share_project(request.payload.project_id, request.sender_id);
 293        Ok(proto::Ack {})
 294    }
 295
 296    async fn unshare_project(
 297        mut self: Arc<Server>,
 298        request: TypedEnvelope<proto::UnshareProject>,
 299    ) -> tide::Result<()> {
 300        let project_id = request.payload.project_id;
 301        let project = self
 302            .state_mut()
 303            .unshare_project(project_id, request.sender_id)?;
 304
 305        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 306            self.peer
 307                .send(conn_id, proto::UnshareProject { project_id })
 308        })?;
 309        self.update_contacts_for_users(&project.authorized_user_ids)?;
 310        Ok(())
 311    }
 312
 313    async fn join_project(
 314        mut self: Arc<Server>,
 315        request: TypedEnvelope<proto::JoinProject>,
 316    ) -> tide::Result<proto::JoinProjectResponse> {
 317        let project_id = request.payload.project_id;
 318
 319        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 320        let (response, connection_ids, contact_user_ids) = self
 321            .state_mut()
 322            .join_project(request.sender_id, user_id, project_id)
 323            .and_then(|joined| {
 324                let share = joined.project.share()?;
 325                let peer_count = share.guests.len();
 326                let mut collaborators = Vec::with_capacity(peer_count);
 327                collaborators.push(proto::Collaborator {
 328                    peer_id: joined.project.host_connection_id.0,
 329                    replica_id: 0,
 330                    user_id: joined.project.host_user_id.to_proto(),
 331                });
 332                let worktrees = joined
 333                    .project
 334                    .worktrees
 335                    .iter()
 336                    .filter_map(|(id, worktree)| {
 337                        worktree.share.as_ref().map(|share| proto::Worktree {
 338                            id: *id,
 339                            root_name: worktree.root_name.clone(),
 340                            entries: share.entries.values().cloned().collect(),
 341                            diagnostic_summaries: share
 342                                .diagnostic_summaries
 343                                .values()
 344                                .cloned()
 345                                .collect(),
 346                            weak: worktree.weak,
 347                            next_update_id: share.next_update_id as u64,
 348                        })
 349                    })
 350                    .collect();
 351                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 352                    if *peer_conn_id != request.sender_id {
 353                        collaborators.push(proto::Collaborator {
 354                            peer_id: peer_conn_id.0,
 355                            replica_id: *peer_replica_id as u32,
 356                            user_id: peer_user_id.to_proto(),
 357                        });
 358                    }
 359                }
 360                let response = proto::JoinProjectResponse {
 361                    worktrees,
 362                    replica_id: joined.replica_id as u32,
 363                    collaborators,
 364                };
 365                let connection_ids = joined.project.connection_ids();
 366                let contact_user_ids = joined.project.authorized_user_ids();
 367                Ok((response, connection_ids, contact_user_ids))
 368            })?;
 369
 370        broadcast(request.sender_id, connection_ids, |conn_id| {
 371            self.peer.send(
 372                conn_id,
 373                proto::AddProjectCollaborator {
 374                    project_id,
 375                    collaborator: Some(proto::Collaborator {
 376                        peer_id: request.sender_id.0,
 377                        replica_id: response.replica_id,
 378                        user_id: user_id.to_proto(),
 379                    }),
 380                },
 381            )
 382        })?;
 383        self.update_contacts_for_users(&contact_user_ids)?;
 384        Ok(response)
 385    }
 386
 387    async fn leave_project(
 388        mut self: Arc<Server>,
 389        request: TypedEnvelope<proto::LeaveProject>,
 390    ) -> tide::Result<()> {
 391        let sender_id = request.sender_id;
 392        let project_id = request.payload.project_id;
 393        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 394
 395        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 396            self.peer.send(
 397                conn_id,
 398                proto::RemoveProjectCollaborator {
 399                    project_id,
 400                    peer_id: sender_id.0,
 401                },
 402            )
 403        })?;
 404        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 405
 406        Ok(())
 407    }
 408
 409    async fn register_worktree(
 410        mut self: Arc<Server>,
 411        request: TypedEnvelope<proto::RegisterWorktree>,
 412    ) -> tide::Result<proto::Ack> {
 413        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 414
 415        let mut contact_user_ids = HashSet::default();
 416        contact_user_ids.insert(host_user_id);
 417        for github_login in request.payload.authorized_logins {
 418            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 419            contact_user_ids.insert(contact_user_id);
 420        }
 421
 422        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 423        self.state_mut().register_worktree(
 424            request.payload.project_id,
 425            request.payload.worktree_id,
 426            request.sender_id,
 427            Worktree {
 428                authorized_user_ids: contact_user_ids.clone(),
 429                root_name: request.payload.root_name,
 430                share: None,
 431                weak: false,
 432            },
 433        )?;
 434        self.update_contacts_for_users(&contact_user_ids)?;
 435        Ok(proto::Ack {})
 436    }
 437
 438    async fn unregister_worktree(
 439        mut self: Arc<Server>,
 440        request: TypedEnvelope<proto::UnregisterWorktree>,
 441    ) -> tide::Result<()> {
 442        let project_id = request.payload.project_id;
 443        let worktree_id = request.payload.worktree_id;
 444        let (worktree, guest_connection_ids) =
 445            self.state_mut()
 446                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 447        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 448            self.peer.send(
 449                conn_id,
 450                proto::UnregisterWorktree {
 451                    project_id,
 452                    worktree_id,
 453                },
 454            )
 455        })?;
 456        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 457        Ok(())
 458    }
 459
 460    async fn share_worktree(
 461        mut self: Arc<Server>,
 462        mut request: TypedEnvelope<proto::ShareWorktree>,
 463    ) -> tide::Result<proto::Ack> {
 464        let worktree = request
 465            .payload
 466            .worktree
 467            .as_mut()
 468            .ok_or_else(|| anyhow!("missing worktree"))?;
 469        let entries = worktree
 470            .entries
 471            .iter()
 472            .map(|entry| (entry.id, entry.clone()))
 473            .collect();
 474        let diagnostic_summaries = worktree
 475            .diagnostic_summaries
 476            .iter()
 477            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 478            .collect();
 479
 480        let shared_worktree = self.state_mut().share_worktree(
 481            request.payload.project_id,
 482            worktree.id,
 483            request.sender_id,
 484            entries,
 485            diagnostic_summaries,
 486            worktree.next_update_id,
 487        )?;
 488
 489        broadcast(
 490            request.sender_id,
 491            shared_worktree.connection_ids,
 492            |connection_id| {
 493                self.peer
 494                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 495            },
 496        )?;
 497        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 498
 499        Ok(proto::Ack {})
 500    }
 501
 502    async fn update_worktree(
 503        mut self: Arc<Server>,
 504        request: TypedEnvelope<proto::UpdateWorktree>,
 505    ) -> tide::Result<proto::Ack> {
 506        let connection_ids = self.state_mut().update_worktree(
 507            request.sender_id,
 508            request.payload.project_id,
 509            request.payload.worktree_id,
 510            request.payload.id,
 511            &request.payload.removed_entries,
 512            &request.payload.updated_entries,
 513        )?;
 514
 515        broadcast(request.sender_id, connection_ids, |connection_id| {
 516            self.peer
 517                .forward_send(request.sender_id, connection_id, request.payload.clone())
 518        })?;
 519
 520        Ok(proto::Ack {})
 521    }
 522
 523    async fn update_diagnostic_summary(
 524        mut self: Arc<Server>,
 525        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 526    ) -> tide::Result<()> {
 527        let summary = request
 528            .payload
 529            .summary
 530            .clone()
 531            .ok_or_else(|| anyhow!("invalid summary"))?;
 532        let receiver_ids = self.state_mut().update_diagnostic_summary(
 533            request.payload.project_id,
 534            request.payload.worktree_id,
 535            request.sender_id,
 536            summary,
 537        )?;
 538
 539        broadcast(request.sender_id, receiver_ids, |connection_id| {
 540            self.peer
 541                .forward_send(request.sender_id, connection_id, request.payload.clone())
 542        })?;
 543        Ok(())
 544    }
 545
 546    async fn disk_based_diagnostics_updating(
 547        self: Arc<Server>,
 548        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 549    ) -> tide::Result<()> {
 550        let receiver_ids = self
 551            .state()
 552            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 553        broadcast(request.sender_id, receiver_ids, |connection_id| {
 554            self.peer
 555                .forward_send(request.sender_id, connection_id, request.payload.clone())
 556        })?;
 557        Ok(())
 558    }
 559
 560    async fn disk_based_diagnostics_updated(
 561        self: Arc<Server>,
 562        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 563    ) -> tide::Result<()> {
 564        let receiver_ids = self
 565            .state()
 566            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 567        broadcast(request.sender_id, receiver_ids, |connection_id| {
 568            self.peer
 569                .forward_send(request.sender_id, connection_id, request.payload.clone())
 570        })?;
 571        Ok(())
 572    }
 573
 574    async fn get_definition(
 575        self: Arc<Server>,
 576        request: TypedEnvelope<proto::GetDefinition>,
 577    ) -> tide::Result<proto::GetDefinitionResponse> {
 578        let host_connection_id = self
 579            .state()
 580            .read_project(request.payload.project_id, request.sender_id)?
 581            .host_connection_id;
 582        Ok(self
 583            .peer
 584            .forward_request(request.sender_id, host_connection_id, request.payload)
 585            .await?)
 586    }
 587
 588    async fn open_buffer(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::OpenBuffer>,
 591    ) -> tide::Result<proto::OpenBufferResponse> {
 592        let host_connection_id = self
 593            .state()
 594            .read_project(request.payload.project_id, request.sender_id)?
 595            .host_connection_id;
 596        Ok(self
 597            .peer
 598            .forward_request(request.sender_id, host_connection_id, request.payload)
 599            .await?)
 600    }
 601
 602    async fn close_buffer(
 603        self: Arc<Server>,
 604        request: TypedEnvelope<proto::CloseBuffer>,
 605    ) -> tide::Result<()> {
 606        let host_connection_id = self
 607            .state()
 608            .read_project(request.payload.project_id, request.sender_id)?
 609            .host_connection_id;
 610        self.peer
 611            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 612        Ok(())
 613    }
 614
 615    async fn save_buffer(
 616        self: Arc<Server>,
 617        request: TypedEnvelope<proto::SaveBuffer>,
 618    ) -> tide::Result<proto::BufferSaved> {
 619        let host;
 620        let mut guests;
 621        {
 622            let state = self.state();
 623            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 624            host = project.host_connection_id;
 625            guests = project.guest_connection_ids()
 626        }
 627
 628        let response = self
 629            .peer
 630            .forward_request(request.sender_id, host, request.payload.clone())
 631            .await?;
 632
 633        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 634        broadcast(host, guests, |conn_id| {
 635            self.peer.forward_send(host, conn_id, response.clone())
 636        })?;
 637
 638        Ok(response)
 639    }
 640
 641    async fn format_buffers(
 642        self: Arc<Server>,
 643        request: TypedEnvelope<proto::FormatBuffers>,
 644    ) -> tide::Result<proto::FormatBuffersResponse> {
 645        let host = self
 646            .state()
 647            .read_project(request.payload.project_id, request.sender_id)?
 648            .host_connection_id;
 649        Ok(self
 650            .peer
 651            .forward_request(request.sender_id, host, request.payload.clone())
 652            .await?)
 653    }
 654
 655    async fn get_completions(
 656        self: Arc<Server>,
 657        request: TypedEnvelope<proto::GetCompletions>,
 658    ) -> tide::Result<proto::GetCompletionsResponse> {
 659        let host = self
 660            .state()
 661            .read_project(request.payload.project_id, request.sender_id)?
 662            .host_connection_id;
 663        Ok(self
 664            .peer
 665            .forward_request(request.sender_id, host, request.payload.clone())
 666            .await?)
 667    }
 668
 669    async fn apply_additional_edits_for_completion(
 670        self: Arc<Server>,
 671        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 672    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 673        let host = self
 674            .state()
 675            .read_project(request.payload.project_id, request.sender_id)?
 676            .host_connection_id;
 677        Ok(self
 678            .peer
 679            .forward_request(request.sender_id, host, request.payload.clone())
 680            .await?)
 681    }
 682
 683    async fn get_code_actions(
 684        self: Arc<Server>,
 685        request: TypedEnvelope<proto::GetCodeActions>,
 686    ) -> tide::Result<proto::GetCodeActionsResponse> {
 687        let host = self
 688            .state()
 689            .read_project(request.payload.project_id, request.sender_id)?
 690            .host_connection_id;
 691        Ok(self
 692            .peer
 693            .forward_request(request.sender_id, host, request.payload.clone())
 694            .await?)
 695    }
 696
 697    async fn apply_code_action(
 698        self: Arc<Server>,
 699        request: TypedEnvelope<proto::ApplyCodeAction>,
 700    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 701        let host = self
 702            .state()
 703            .read_project(request.payload.project_id, request.sender_id)?
 704            .host_connection_id;
 705        Ok(self
 706            .peer
 707            .forward_request(request.sender_id, host, request.payload.clone())
 708            .await?)
 709    }
 710
 711    async fn update_buffer(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<proto::UpdateBuffer>,
 714    ) -> tide::Result<proto::Ack> {
 715        let receiver_ids = self
 716            .state()
 717            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 718        broadcast(request.sender_id, receiver_ids, |connection_id| {
 719            self.peer
 720                .forward_send(request.sender_id, connection_id, request.payload.clone())
 721        })?;
 722        Ok(proto::Ack {})
 723    }
 724
 725    async fn update_buffer_file(
 726        self: Arc<Server>,
 727        request: TypedEnvelope<proto::UpdateBufferFile>,
 728    ) -> tide::Result<()> {
 729        let receiver_ids = self
 730            .state()
 731            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 732        broadcast(request.sender_id, receiver_ids, |connection_id| {
 733            self.peer
 734                .forward_send(request.sender_id, connection_id, request.payload.clone())
 735        })?;
 736        Ok(())
 737    }
 738
 739    async fn buffer_reloaded(
 740        self: Arc<Server>,
 741        request: TypedEnvelope<proto::BufferReloaded>,
 742    ) -> tide::Result<()> {
 743        let receiver_ids = self
 744            .state()
 745            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 746        broadcast(request.sender_id, receiver_ids, |connection_id| {
 747            self.peer
 748                .forward_send(request.sender_id, connection_id, request.payload.clone())
 749        })?;
 750        Ok(())
 751    }
 752
 753    async fn buffer_saved(
 754        self: Arc<Server>,
 755        request: TypedEnvelope<proto::BufferSaved>,
 756    ) -> tide::Result<()> {
 757        let receiver_ids = self
 758            .state()
 759            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 760        broadcast(request.sender_id, receiver_ids, |connection_id| {
 761            self.peer
 762                .forward_send(request.sender_id, connection_id, request.payload.clone())
 763        })?;
 764        Ok(())
 765    }
 766
 767    async fn get_channels(
 768        self: Arc<Server>,
 769        request: TypedEnvelope<proto::GetChannels>,
 770    ) -> tide::Result<proto::GetChannelsResponse> {
 771        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 772        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 773        Ok(proto::GetChannelsResponse {
 774            channels: channels
 775                .into_iter()
 776                .map(|chan| proto::Channel {
 777                    id: chan.id.to_proto(),
 778                    name: chan.name,
 779                })
 780                .collect(),
 781        })
 782    }
 783
 784    async fn get_users(
 785        self: Arc<Server>,
 786        request: TypedEnvelope<proto::GetUsers>,
 787    ) -> tide::Result<proto::GetUsersResponse> {
 788        let user_ids = request
 789            .payload
 790            .user_ids
 791            .into_iter()
 792            .map(UserId::from_proto)
 793            .collect();
 794        let users = self
 795            .app_state
 796            .db
 797            .get_users_by_ids(user_ids)
 798            .await?
 799            .into_iter()
 800            .map(|user| proto::User {
 801                id: user.id.to_proto(),
 802                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 803                github_login: user.github_login,
 804            })
 805            .collect();
 806        Ok(proto::GetUsersResponse { users })
 807    }
 808
 809    fn update_contacts_for_users<'a>(
 810        self: &Arc<Server>,
 811        user_ids: impl IntoIterator<Item = &'a UserId>,
 812    ) -> anyhow::Result<()> {
 813        let mut result = Ok(());
 814        let state = self.state();
 815        for user_id in user_ids {
 816            let contacts = state.contacts_for_user(*user_id);
 817            for connection_id in state.connection_ids_for_user(*user_id) {
 818                if let Err(error) = self.peer.send(
 819                    connection_id,
 820                    proto::UpdateContacts {
 821                        contacts: contacts.clone(),
 822                    },
 823                ) {
 824                    result = Err(error);
 825                }
 826            }
 827        }
 828        result
 829    }
 830
 831    async fn join_channel(
 832        mut self: Arc<Self>,
 833        request: TypedEnvelope<proto::JoinChannel>,
 834    ) -> tide::Result<proto::JoinChannelResponse> {
 835        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 836        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 837        if !self
 838            .app_state
 839            .db
 840            .can_user_access_channel(user_id, channel_id)
 841            .await?
 842        {
 843            Err(anyhow!("access denied"))?;
 844        }
 845
 846        self.state_mut().join_channel(request.sender_id, channel_id);
 847        let messages = self
 848            .app_state
 849            .db
 850            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 851            .await?
 852            .into_iter()
 853            .map(|msg| proto::ChannelMessage {
 854                id: msg.id.to_proto(),
 855                body: msg.body,
 856                timestamp: msg.sent_at.unix_timestamp() as u64,
 857                sender_id: msg.sender_id.to_proto(),
 858                nonce: Some(msg.nonce.as_u128().into()),
 859            })
 860            .collect::<Vec<_>>();
 861        Ok(proto::JoinChannelResponse {
 862            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 863            messages,
 864        })
 865    }
 866
 867    async fn leave_channel(
 868        mut self: Arc<Self>,
 869        request: TypedEnvelope<proto::LeaveChannel>,
 870    ) -> tide::Result<()> {
 871        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 872        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 873        if !self
 874            .app_state
 875            .db
 876            .can_user_access_channel(user_id, channel_id)
 877            .await?
 878        {
 879            Err(anyhow!("access denied"))?;
 880        }
 881
 882        self.state_mut()
 883            .leave_channel(request.sender_id, channel_id);
 884
 885        Ok(())
 886    }
 887
 888    async fn send_channel_message(
 889        self: Arc<Self>,
 890        request: TypedEnvelope<proto::SendChannelMessage>,
 891    ) -> tide::Result<proto::SendChannelMessageResponse> {
 892        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 893        let user_id;
 894        let connection_ids;
 895        {
 896            let state = self.state();
 897            user_id = state.user_id_for_connection(request.sender_id)?;
 898            connection_ids = state.channel_connection_ids(channel_id)?;
 899        }
 900
 901        // Validate the message body.
 902        let body = request.payload.body.trim().to_string();
 903        if body.len() > MAX_MESSAGE_LEN {
 904            return Err(anyhow!("message is too long"))?;
 905        }
 906        if body.is_empty() {
 907            return Err(anyhow!("message can't be blank"))?;
 908        }
 909
 910        let timestamp = OffsetDateTime::now_utc();
 911        let nonce = request
 912            .payload
 913            .nonce
 914            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 915
 916        let message_id = self
 917            .app_state
 918            .db
 919            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 920            .await?
 921            .to_proto();
 922        let message = proto::ChannelMessage {
 923            sender_id: user_id.to_proto(),
 924            id: message_id,
 925            body,
 926            timestamp: timestamp.unix_timestamp() as u64,
 927            nonce: Some(nonce),
 928        };
 929        broadcast(request.sender_id, connection_ids, |conn_id| {
 930            self.peer.send(
 931                conn_id,
 932                proto::ChannelMessageSent {
 933                    channel_id: channel_id.to_proto(),
 934                    message: Some(message.clone()),
 935                },
 936            )
 937        })?;
 938        Ok(proto::SendChannelMessageResponse {
 939            message: Some(message),
 940        })
 941    }
 942
 943    async fn get_channel_messages(
 944        self: Arc<Self>,
 945        request: TypedEnvelope<proto::GetChannelMessages>,
 946    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 947        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 948        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 949        if !self
 950            .app_state
 951            .db
 952            .can_user_access_channel(user_id, channel_id)
 953            .await?
 954        {
 955            Err(anyhow!("access denied"))?;
 956        }
 957
 958        let messages = self
 959            .app_state
 960            .db
 961            .get_channel_messages(
 962                channel_id,
 963                MESSAGE_COUNT_PER_PAGE,
 964                Some(MessageId::from_proto(request.payload.before_message_id)),
 965            )
 966            .await?
 967            .into_iter()
 968            .map(|msg| proto::ChannelMessage {
 969                id: msg.id.to_proto(),
 970                body: msg.body,
 971                timestamp: msg.sent_at.unix_timestamp() as u64,
 972                sender_id: msg.sender_id.to_proto(),
 973                nonce: Some(msg.nonce.as_u128().into()),
 974            })
 975            .collect::<Vec<_>>();
 976
 977        Ok(proto::GetChannelMessagesResponse {
 978            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 979            messages,
 980        })
 981    }
 982
 983    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 984        self.store.read()
 985    }
 986
 987    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 988        self.store.write()
 989    }
 990}
 991
 992impl Executor for RealExecutor {
 993    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 994        task::spawn(future);
 995    }
 996}
 997
 998fn broadcast<F>(
 999    sender_id: ConnectionId,
1000    receiver_ids: Vec<ConnectionId>,
1001    mut f: F,
1002) -> anyhow::Result<()>
1003where
1004    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1005{
1006    let mut result = Ok(());
1007    for receiver_id in receiver_ids {
1008        if receiver_id != sender_id {
1009            if let Err(error) = f(receiver_id) {
1010                if result.is_ok() {
1011                    result = Err(error);
1012                }
1013            }
1014        }
1015    }
1016    result
1017}
1018
1019pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1020    let server = Server::new(app.state().clone(), rpc.clone(), None);
1021    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1022        let server = server.clone();
1023        async move {
1024            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1025
1026            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1027            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1028            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1029            let client_protocol_version: Option<u32> = request
1030                .header("X-Zed-Protocol-Version")
1031                .and_then(|v| v.as_str().parse().ok());
1032
1033            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1034                return Ok(Response::new(StatusCode::UpgradeRequired));
1035            }
1036
1037            let header = match request.header("Sec-Websocket-Key") {
1038                Some(h) => h.as_str(),
1039                None => return Err(anyhow!("expected sec-websocket-key"))?,
1040            };
1041
1042            let user_id = process_auth_header(&request).await?;
1043
1044            let mut response = Response::new(StatusCode::SwitchingProtocols);
1045            response.insert_header(UPGRADE, "websocket");
1046            response.insert_header(CONNECTION, "Upgrade");
1047            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1048            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1049            response.insert_header("Sec-Websocket-Version", "13");
1050
1051            let http_res: &mut tide::http::Response = response.as_mut();
1052            let upgrade_receiver = http_res.recv_upgrade().await;
1053            let addr = request.remote().unwrap_or("unknown").to_string();
1054            task::spawn(async move {
1055                if let Some(stream) = upgrade_receiver.await {
1056                    server
1057                        .handle_connection(
1058                            Connection::new(
1059                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1060                            ),
1061                            addr,
1062                            user_id,
1063                            None,
1064                            RealExecutor,
1065                        )
1066                        .await;
1067                }
1068            });
1069
1070            Ok(response)
1071        }
1072    });
1073}
1074
1075fn header_contains_ignore_case<T>(
1076    request: &tide::Request<T>,
1077    header_name: HeaderName,
1078    value: &str,
1079) -> bool {
1080    request
1081        .header(header_name)
1082        .map(|h| {
1083            h.as_str()
1084                .split(',')
1085                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1086        })
1087        .unwrap_or(false)
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use super::*;
1093    use crate::{
1094        auth,
1095        db::{tests::TestDb, UserId},
1096        github, AppState, Config,
1097    };
1098    use ::rpc::Peer;
1099    use collections::BTreeMap;
1100    use gpui::{executor, ModelHandle, TestAppContext};
1101    use parking_lot::Mutex;
1102    use postage::{sink::Sink, watch};
1103    use rand::prelude::*;
1104    use rpc::PeerId;
1105    use serde_json::json;
1106    use sqlx::types::time::OffsetDateTime;
1107    use std::{
1108        cell::{Cell, RefCell},
1109        env,
1110        ops::Deref,
1111        path::Path,
1112        rc::Rc,
1113        sync::{
1114            atomic::{AtomicBool, Ordering::SeqCst},
1115            Arc,
1116        },
1117        time::Duration,
1118    };
1119    use zed::{
1120        client::{
1121            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1122            EstablishConnectionError, UserStore,
1123        },
1124        editor::{
1125            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1126            Redo, ToggleCodeActions, Undo,
1127        },
1128        fs::{FakeFs, Fs as _},
1129        language::{
1130            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1131            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1132        },
1133        lsp,
1134        project::{DiagnosticSummary, Project, ProjectPath},
1135        workspace::{Workspace, WorkspaceParams},
1136    };
1137
1138    #[cfg(test)]
1139    #[ctor::ctor]
1140    fn init_logger() {
1141        if std::env::var("RUST_LOG").is_ok() {
1142            env_logger::init();
1143        }
1144    }
1145
1146    #[gpui::test(iterations = 10)]
1147    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1148        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1149        let lang_registry = Arc::new(LanguageRegistry::new());
1150        let fs = FakeFs::new(cx_a.background());
1151        cx_a.foreground().forbid_parking();
1152
1153        // Connect to a server as 2 clients.
1154        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1155        let client_a = server.create_client(&mut cx_a, "user_a").await;
1156        let client_b = server.create_client(&mut cx_b, "user_b").await;
1157
1158        // Share a project as client A
1159        fs.insert_tree(
1160            "/a",
1161            json!({
1162                ".zed.toml": r#"collaborators = ["user_b"]"#,
1163                "a.txt": "a-contents",
1164                "b.txt": "b-contents",
1165            }),
1166        )
1167        .await;
1168        let project_a = cx_a.update(|cx| {
1169            Project::local(
1170                client_a.clone(),
1171                client_a.user_store.clone(),
1172                lang_registry.clone(),
1173                fs.clone(),
1174                cx,
1175            )
1176        });
1177        let (worktree_a, _) = project_a
1178            .update(&mut cx_a, |p, cx| {
1179                p.find_or_create_local_worktree("/a", false, cx)
1180            })
1181            .await
1182            .unwrap();
1183        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1184        worktree_a
1185            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1186            .await;
1187        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1188        project_a
1189            .update(&mut cx_a, |p, cx| p.share(cx))
1190            .await
1191            .unwrap();
1192
1193        // Join that project as client B
1194        let project_b = Project::remote(
1195            project_id,
1196            client_b.clone(),
1197            client_b.user_store.clone(),
1198            lang_registry.clone(),
1199            fs.clone(),
1200            &mut cx_b.to_async(),
1201        )
1202        .await
1203        .unwrap();
1204
1205        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1206            assert_eq!(
1207                project
1208                    .collaborators()
1209                    .get(&client_a.peer_id)
1210                    .unwrap()
1211                    .user
1212                    .github_login,
1213                "user_a"
1214            );
1215            project.replica_id()
1216        });
1217        project_a
1218            .condition(&cx_a, |tree, _| {
1219                tree.collaborators()
1220                    .get(&client_b.peer_id)
1221                    .map_or(false, |collaborator| {
1222                        collaborator.replica_id == replica_id_b
1223                            && collaborator.user.github_login == "user_b"
1224                    })
1225            })
1226            .await;
1227
1228        // Open the same file as client B and client A.
1229        let buffer_b = project_b
1230            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1231            .await
1232            .unwrap();
1233        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1234        buffer_b.read_with(&cx_b, |buf, cx| {
1235            assert_eq!(buf.read(cx).text(), "b-contents")
1236        });
1237        project_a.read_with(&cx_a, |project, cx| {
1238            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1239        });
1240        let buffer_a = project_a
1241            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1242            .await
1243            .unwrap();
1244
1245        let editor_b = cx_b.add_view(window_b, |cx| {
1246            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1247        });
1248
1249        // TODO
1250        // // Create a selection set as client B and see that selection set as client A.
1251        // buffer_a
1252        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1253        //     .await;
1254
1255        // Edit the buffer as client B and see that edit as client A.
1256        editor_b.update(&mut cx_b, |editor, cx| {
1257            editor.handle_input(&Input("ok, ".into()), cx)
1258        });
1259        buffer_a
1260            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1261            .await;
1262
1263        // TODO
1264        // // Remove the selection set as client B, see those selections disappear as client A.
1265        cx_b.update(move |_| drop(editor_b));
1266        // buffer_a
1267        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1268        //     .await;
1269
1270        // Close the buffer as client A, see that the buffer is closed.
1271        cx_a.update(move |_| drop(buffer_a));
1272        project_a
1273            .condition(&cx_a, |project, cx| {
1274                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1275            })
1276            .await;
1277
1278        // Dropping the client B's project removes client B from client A's collaborators.
1279        cx_b.update(move |_| drop(project_b));
1280        project_a
1281            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1282            .await;
1283    }
1284
1285    #[gpui::test(iterations = 10)]
1286    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1287        let lang_registry = Arc::new(LanguageRegistry::new());
1288        let fs = FakeFs::new(cx_a.background());
1289        cx_a.foreground().forbid_parking();
1290
1291        // Connect to a server as 2 clients.
1292        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1293        let client_a = server.create_client(&mut cx_a, "user_a").await;
1294        let client_b = server.create_client(&mut cx_b, "user_b").await;
1295
1296        // Share a project as client A
1297        fs.insert_tree(
1298            "/a",
1299            json!({
1300                ".zed.toml": r#"collaborators = ["user_b"]"#,
1301                "a.txt": "a-contents",
1302                "b.txt": "b-contents",
1303            }),
1304        )
1305        .await;
1306        let project_a = cx_a.update(|cx| {
1307            Project::local(
1308                client_a.clone(),
1309                client_a.user_store.clone(),
1310                lang_registry.clone(),
1311                fs.clone(),
1312                cx,
1313            )
1314        });
1315        let (worktree_a, _) = project_a
1316            .update(&mut cx_a, |p, cx| {
1317                p.find_or_create_local_worktree("/a", false, cx)
1318            })
1319            .await
1320            .unwrap();
1321        worktree_a
1322            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1323            .await;
1324        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1325        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1326        project_a
1327            .update(&mut cx_a, |p, cx| p.share(cx))
1328            .await
1329            .unwrap();
1330        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1331
1332        // Join that project as client B
1333        let project_b = Project::remote(
1334            project_id,
1335            client_b.clone(),
1336            client_b.user_store.clone(),
1337            lang_registry.clone(),
1338            fs.clone(),
1339            &mut cx_b.to_async(),
1340        )
1341        .await
1342        .unwrap();
1343        project_b
1344            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1345            .await
1346            .unwrap();
1347
1348        // Unshare the project as client A
1349        project_a
1350            .update(&mut cx_a, |project, cx| project.unshare(cx))
1351            .await
1352            .unwrap();
1353        project_b
1354            .condition(&mut cx_b, |project, _| project.is_read_only())
1355            .await;
1356        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1357        drop(project_b);
1358
1359        // Share the project again and ensure guests can still join.
1360        project_a
1361            .update(&mut cx_a, |project, cx| project.share(cx))
1362            .await
1363            .unwrap();
1364        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1365
1366        let project_c = Project::remote(
1367            project_id,
1368            client_b.clone(),
1369            client_b.user_store.clone(),
1370            lang_registry.clone(),
1371            fs.clone(),
1372            &mut cx_b.to_async(),
1373        )
1374        .await
1375        .unwrap();
1376        project_c
1377            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1378            .await
1379            .unwrap();
1380    }
1381
1382    #[gpui::test(iterations = 10)]
1383    async fn test_propagate_saves_and_fs_changes(
1384        mut cx_a: TestAppContext,
1385        mut cx_b: TestAppContext,
1386        mut cx_c: TestAppContext,
1387    ) {
1388        let lang_registry = Arc::new(LanguageRegistry::new());
1389        let fs = FakeFs::new(cx_a.background());
1390        cx_a.foreground().forbid_parking();
1391
1392        // Connect to a server as 3 clients.
1393        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1394        let client_a = server.create_client(&mut cx_a, "user_a").await;
1395        let client_b = server.create_client(&mut cx_b, "user_b").await;
1396        let client_c = server.create_client(&mut cx_c, "user_c").await;
1397
1398        // Share a worktree as client A.
1399        fs.insert_tree(
1400            "/a",
1401            json!({
1402                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1403                "file1": "",
1404                "file2": ""
1405            }),
1406        )
1407        .await;
1408        let project_a = cx_a.update(|cx| {
1409            Project::local(
1410                client_a.clone(),
1411                client_a.user_store.clone(),
1412                lang_registry.clone(),
1413                fs.clone(),
1414                cx,
1415            )
1416        });
1417        let (worktree_a, _) = project_a
1418            .update(&mut cx_a, |p, cx| {
1419                p.find_or_create_local_worktree("/a", false, cx)
1420            })
1421            .await
1422            .unwrap();
1423        worktree_a
1424            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1425            .await;
1426        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1427        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1428        project_a
1429            .update(&mut cx_a, |p, cx| p.share(cx))
1430            .await
1431            .unwrap();
1432
1433        // Join that worktree as clients B and C.
1434        let project_b = Project::remote(
1435            project_id,
1436            client_b.clone(),
1437            client_b.user_store.clone(),
1438            lang_registry.clone(),
1439            fs.clone(),
1440            &mut cx_b.to_async(),
1441        )
1442        .await
1443        .unwrap();
1444        let project_c = Project::remote(
1445            project_id,
1446            client_c.clone(),
1447            client_c.user_store.clone(),
1448            lang_registry.clone(),
1449            fs.clone(),
1450            &mut cx_c.to_async(),
1451        )
1452        .await
1453        .unwrap();
1454        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1455        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1456
1457        // Open and edit a buffer as both guests B and C.
1458        let buffer_b = project_b
1459            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1460            .await
1461            .unwrap();
1462        let buffer_c = project_c
1463            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1464            .await
1465            .unwrap();
1466        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1467        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1468
1469        // Open and edit that buffer as the host.
1470        let buffer_a = project_a
1471            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1472            .await
1473            .unwrap();
1474
1475        buffer_a
1476            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1477            .await;
1478        buffer_a.update(&mut cx_a, |buf, cx| {
1479            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1480        });
1481
1482        // Wait for edits to propagate
1483        buffer_a
1484            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1485            .await;
1486        buffer_b
1487            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1488            .await;
1489        buffer_c
1490            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1491            .await;
1492
1493        // Edit the buffer as the host and concurrently save as guest B.
1494        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1495        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1496        save_b.await.unwrap();
1497        assert_eq!(
1498            fs.load("/a/file1".as_ref()).await.unwrap(),
1499            "hi-a, i-am-c, i-am-b, i-am-a"
1500        );
1501        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1502        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1503        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1504
1505        // Make changes on host's file system, see those changes on guest worktrees.
1506        fs.rename(
1507            "/a/file1".as_ref(),
1508            "/a/file1-renamed".as_ref(),
1509            Default::default(),
1510        )
1511        .await
1512        .unwrap();
1513
1514        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1515            .await
1516            .unwrap();
1517        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1518
1519        worktree_a
1520            .condition(&cx_a, |tree, _| {
1521                tree.paths()
1522                    .map(|p| p.to_string_lossy())
1523                    .collect::<Vec<_>>()
1524                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1525            })
1526            .await;
1527        worktree_b
1528            .condition(&cx_b, |tree, _| {
1529                tree.paths()
1530                    .map(|p| p.to_string_lossy())
1531                    .collect::<Vec<_>>()
1532                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1533            })
1534            .await;
1535        worktree_c
1536            .condition(&cx_c, |tree, _| {
1537                tree.paths()
1538                    .map(|p| p.to_string_lossy())
1539                    .collect::<Vec<_>>()
1540                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1541            })
1542            .await;
1543
1544        // Ensure buffer files are updated as well.
1545        buffer_a
1546            .condition(&cx_a, |buf, _| {
1547                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1548            })
1549            .await;
1550        buffer_b
1551            .condition(&cx_b, |buf, _| {
1552                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1553            })
1554            .await;
1555        buffer_c
1556            .condition(&cx_c, |buf, _| {
1557                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1558            })
1559            .await;
1560    }
1561
1562    #[gpui::test(iterations = 10)]
1563    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1564        cx_a.foreground().forbid_parking();
1565        let lang_registry = Arc::new(LanguageRegistry::new());
1566        let fs = FakeFs::new(cx_a.background());
1567
1568        // Connect to a server as 2 clients.
1569        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1570        let client_a = server.create_client(&mut cx_a, "user_a").await;
1571        let client_b = server.create_client(&mut cx_b, "user_b").await;
1572
1573        // Share a project as client A
1574        fs.insert_tree(
1575            "/dir",
1576            json!({
1577                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1578                "a.txt": "a-contents",
1579            }),
1580        )
1581        .await;
1582
1583        let project_a = cx_a.update(|cx| {
1584            Project::local(
1585                client_a.clone(),
1586                client_a.user_store.clone(),
1587                lang_registry.clone(),
1588                fs.clone(),
1589                cx,
1590            )
1591        });
1592        let (worktree_a, _) = project_a
1593            .update(&mut cx_a, |p, cx| {
1594                p.find_or_create_local_worktree("/dir", false, cx)
1595            })
1596            .await
1597            .unwrap();
1598        worktree_a
1599            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1600            .await;
1601        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1602        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1603        project_a
1604            .update(&mut cx_a, |p, cx| p.share(cx))
1605            .await
1606            .unwrap();
1607
1608        // Join that project as client B
1609        let project_b = Project::remote(
1610            project_id,
1611            client_b.clone(),
1612            client_b.user_store.clone(),
1613            lang_registry.clone(),
1614            fs.clone(),
1615            &mut cx_b.to_async(),
1616        )
1617        .await
1618        .unwrap();
1619
1620        // Open a buffer as client B
1621        let buffer_b = project_b
1622            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1623            .await
1624            .unwrap();
1625
1626        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1627        buffer_b.read_with(&cx_b, |buf, _| {
1628            assert!(buf.is_dirty());
1629            assert!(!buf.has_conflict());
1630        });
1631
1632        buffer_b
1633            .update(&mut cx_b, |buf, cx| buf.save(cx))
1634            .await
1635            .unwrap();
1636        buffer_b
1637            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1638            .await;
1639        buffer_b.read_with(&cx_b, |buf, _| {
1640            assert!(!buf.has_conflict());
1641        });
1642
1643        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1644        buffer_b.read_with(&cx_b, |buf, _| {
1645            assert!(buf.is_dirty());
1646            assert!(!buf.has_conflict());
1647        });
1648    }
1649
1650    #[gpui::test(iterations = 10)]
1651    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1652        cx_a.foreground().forbid_parking();
1653        let lang_registry = Arc::new(LanguageRegistry::new());
1654        let fs = FakeFs::new(cx_a.background());
1655
1656        // Connect to a server as 2 clients.
1657        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1658        let client_a = server.create_client(&mut cx_a, "user_a").await;
1659        let client_b = server.create_client(&mut cx_b, "user_b").await;
1660
1661        // Share a project as client A
1662        fs.insert_tree(
1663            "/dir",
1664            json!({
1665                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1666                "a.txt": "a-contents",
1667            }),
1668        )
1669        .await;
1670
1671        let project_a = cx_a.update(|cx| {
1672            Project::local(
1673                client_a.clone(),
1674                client_a.user_store.clone(),
1675                lang_registry.clone(),
1676                fs.clone(),
1677                cx,
1678            )
1679        });
1680        let (worktree_a, _) = project_a
1681            .update(&mut cx_a, |p, cx| {
1682                p.find_or_create_local_worktree("/dir", false, cx)
1683            })
1684            .await
1685            .unwrap();
1686        worktree_a
1687            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1688            .await;
1689        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1690        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1691        project_a
1692            .update(&mut cx_a, |p, cx| p.share(cx))
1693            .await
1694            .unwrap();
1695
1696        // Join that project as client B
1697        let project_b = Project::remote(
1698            project_id,
1699            client_b.clone(),
1700            client_b.user_store.clone(),
1701            lang_registry.clone(),
1702            fs.clone(),
1703            &mut cx_b.to_async(),
1704        )
1705        .await
1706        .unwrap();
1707        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1708
1709        // Open a buffer as client B
1710        let buffer_b = project_b
1711            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1712            .await
1713            .unwrap();
1714        buffer_b.read_with(&cx_b, |buf, _| {
1715            assert!(!buf.is_dirty());
1716            assert!(!buf.has_conflict());
1717        });
1718
1719        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1720            .await
1721            .unwrap();
1722        buffer_b
1723            .condition(&cx_b, |buf, _| {
1724                buf.text() == "new contents" && !buf.is_dirty()
1725            })
1726            .await;
1727        buffer_b.read_with(&cx_b, |buf, _| {
1728            assert!(!buf.has_conflict());
1729        });
1730    }
1731
1732    #[gpui::test(iterations = 10)]
1733    async fn test_editing_while_guest_opens_buffer(
1734        mut cx_a: TestAppContext,
1735        mut cx_b: TestAppContext,
1736    ) {
1737        cx_a.foreground().forbid_parking();
1738        let lang_registry = Arc::new(LanguageRegistry::new());
1739        let fs = FakeFs::new(cx_a.background());
1740
1741        // Connect to a server as 2 clients.
1742        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1743        let client_a = server.create_client(&mut cx_a, "user_a").await;
1744        let client_b = server.create_client(&mut cx_b, "user_b").await;
1745
1746        // Share a project as client A
1747        fs.insert_tree(
1748            "/dir",
1749            json!({
1750                ".zed.toml": r#"collaborators = ["user_b"]"#,
1751                "a.txt": "a-contents",
1752            }),
1753        )
1754        .await;
1755        let project_a = cx_a.update(|cx| {
1756            Project::local(
1757                client_a.clone(),
1758                client_a.user_store.clone(),
1759                lang_registry.clone(),
1760                fs.clone(),
1761                cx,
1762            )
1763        });
1764        let (worktree_a, _) = project_a
1765            .update(&mut cx_a, |p, cx| {
1766                p.find_or_create_local_worktree("/dir", false, cx)
1767            })
1768            .await
1769            .unwrap();
1770        worktree_a
1771            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1772            .await;
1773        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1774        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1775        project_a
1776            .update(&mut cx_a, |p, cx| p.share(cx))
1777            .await
1778            .unwrap();
1779
1780        // Join that project as client B
1781        let project_b = Project::remote(
1782            project_id,
1783            client_b.clone(),
1784            client_b.user_store.clone(),
1785            lang_registry.clone(),
1786            fs.clone(),
1787            &mut cx_b.to_async(),
1788        )
1789        .await
1790        .unwrap();
1791
1792        // Open a buffer as client A
1793        let buffer_a = project_a
1794            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1795            .await
1796            .unwrap();
1797
1798        // Start opening the same buffer as client B
1799        let buffer_b = cx_b
1800            .background()
1801            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1802
1803        // Edit the buffer as client A while client B is still opening it.
1804        cx_b.background().simulate_random_delay().await;
1805        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1806        cx_b.background().simulate_random_delay().await;
1807        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1808
1809        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1810        let buffer_b = buffer_b.await.unwrap();
1811        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1812    }
1813
1814    #[gpui::test(iterations = 10)]
1815    async fn test_leaving_worktree_while_opening_buffer(
1816        mut cx_a: TestAppContext,
1817        mut cx_b: TestAppContext,
1818    ) {
1819        cx_a.foreground().forbid_parking();
1820        let lang_registry = Arc::new(LanguageRegistry::new());
1821        let fs = FakeFs::new(cx_a.background());
1822
1823        // Connect to a server as 2 clients.
1824        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1825        let client_a = server.create_client(&mut cx_a, "user_a").await;
1826        let client_b = server.create_client(&mut cx_b, "user_b").await;
1827
1828        // Share a project as client A
1829        fs.insert_tree(
1830            "/dir",
1831            json!({
1832                ".zed.toml": r#"collaborators = ["user_b"]"#,
1833                "a.txt": "a-contents",
1834            }),
1835        )
1836        .await;
1837        let project_a = cx_a.update(|cx| {
1838            Project::local(
1839                client_a.clone(),
1840                client_a.user_store.clone(),
1841                lang_registry.clone(),
1842                fs.clone(),
1843                cx,
1844            )
1845        });
1846        let (worktree_a, _) = project_a
1847            .update(&mut cx_a, |p, cx| {
1848                p.find_or_create_local_worktree("/dir", false, cx)
1849            })
1850            .await
1851            .unwrap();
1852        worktree_a
1853            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1854            .await;
1855        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1856        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1857        project_a
1858            .update(&mut cx_a, |p, cx| p.share(cx))
1859            .await
1860            .unwrap();
1861
1862        // Join that project as client B
1863        let project_b = Project::remote(
1864            project_id,
1865            client_b.clone(),
1866            client_b.user_store.clone(),
1867            lang_registry.clone(),
1868            fs.clone(),
1869            &mut cx_b.to_async(),
1870        )
1871        .await
1872        .unwrap();
1873
1874        // See that a guest has joined as client A.
1875        project_a
1876            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1877            .await;
1878
1879        // Begin opening a buffer as client B, but leave the project before the open completes.
1880        let buffer_b = cx_b
1881            .background()
1882            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1883        cx_b.update(|_| drop(project_b));
1884        drop(buffer_b);
1885
1886        // See that the guest has left.
1887        project_a
1888            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1889            .await;
1890    }
1891
1892    #[gpui::test(iterations = 10)]
1893    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1894        cx_a.foreground().forbid_parking();
1895        let lang_registry = Arc::new(LanguageRegistry::new());
1896        let fs = FakeFs::new(cx_a.background());
1897
1898        // Connect to a server as 2 clients.
1899        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1900        let client_a = server.create_client(&mut cx_a, "user_a").await;
1901        let client_b = server.create_client(&mut cx_b, "user_b").await;
1902
1903        // Share a project as client A
1904        fs.insert_tree(
1905            "/a",
1906            json!({
1907                ".zed.toml": r#"collaborators = ["user_b"]"#,
1908                "a.txt": "a-contents",
1909                "b.txt": "b-contents",
1910            }),
1911        )
1912        .await;
1913        let project_a = cx_a.update(|cx| {
1914            Project::local(
1915                client_a.clone(),
1916                client_a.user_store.clone(),
1917                lang_registry.clone(),
1918                fs.clone(),
1919                cx,
1920            )
1921        });
1922        let (worktree_a, _) = project_a
1923            .update(&mut cx_a, |p, cx| {
1924                p.find_or_create_local_worktree("/a", false, cx)
1925            })
1926            .await
1927            .unwrap();
1928        worktree_a
1929            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1930            .await;
1931        let project_id = project_a
1932            .update(&mut cx_a, |project, _| project.next_remote_id())
1933            .await;
1934        project_a
1935            .update(&mut cx_a, |project, cx| project.share(cx))
1936            .await
1937            .unwrap();
1938
1939        // Join that project as client B
1940        let _project_b = Project::remote(
1941            project_id,
1942            client_b.clone(),
1943            client_b.user_store.clone(),
1944            lang_registry.clone(),
1945            fs.clone(),
1946            &mut cx_b.to_async(),
1947        )
1948        .await
1949        .unwrap();
1950
1951        // See that a guest has joined as client A.
1952        project_a
1953            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1954            .await;
1955
1956        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1957        client_b.disconnect(&cx_b.to_async()).unwrap();
1958        project_a
1959            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1960            .await;
1961    }
1962
1963    #[gpui::test(iterations = 10)]
1964    async fn test_collaborating_with_diagnostics(
1965        mut cx_a: TestAppContext,
1966        mut cx_b: TestAppContext,
1967    ) {
1968        cx_a.foreground().forbid_parking();
1969        let mut lang_registry = Arc::new(LanguageRegistry::new());
1970        let fs = FakeFs::new(cx_a.background());
1971
1972        // Set up a fake language server.
1973        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1974        Arc::get_mut(&mut lang_registry)
1975            .unwrap()
1976            .add(Arc::new(Language::new(
1977                LanguageConfig {
1978                    name: "Rust".to_string(),
1979                    path_suffixes: vec!["rs".to_string()],
1980                    language_server: Some(language_server_config),
1981                    ..Default::default()
1982                },
1983                Some(tree_sitter_rust::language()),
1984            )));
1985
1986        // Connect to a server as 2 clients.
1987        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1988        let client_a = server.create_client(&mut cx_a, "user_a").await;
1989        let client_b = server.create_client(&mut cx_b, "user_b").await;
1990
1991        // Share a project as client A
1992        fs.insert_tree(
1993            "/a",
1994            json!({
1995                ".zed.toml": r#"collaborators = ["user_b"]"#,
1996                "a.rs": "let one = two",
1997                "other.rs": "",
1998            }),
1999        )
2000        .await;
2001        let project_a = cx_a.update(|cx| {
2002            Project::local(
2003                client_a.clone(),
2004                client_a.user_store.clone(),
2005                lang_registry.clone(),
2006                fs.clone(),
2007                cx,
2008            )
2009        });
2010        let (worktree_a, _) = project_a
2011            .update(&mut cx_a, |p, cx| {
2012                p.find_or_create_local_worktree("/a", false, cx)
2013            })
2014            .await
2015            .unwrap();
2016        worktree_a
2017            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2018            .await;
2019        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2020        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2021        project_a
2022            .update(&mut cx_a, |p, cx| p.share(cx))
2023            .await
2024            .unwrap();
2025
2026        // Cause the language server to start.
2027        let _ = cx_a
2028            .background()
2029            .spawn(project_a.update(&mut cx_a, |project, cx| {
2030                project.open_buffer(
2031                    ProjectPath {
2032                        worktree_id,
2033                        path: Path::new("other.rs").into(),
2034                    },
2035                    cx,
2036                )
2037            }))
2038            .await
2039            .unwrap();
2040
2041        // Simulate a language server reporting errors for a file.
2042        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2043        fake_language_server
2044            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2045                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2046                version: None,
2047                diagnostics: vec![lsp::Diagnostic {
2048                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2049                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2050                    message: "message 1".to_string(),
2051                    ..Default::default()
2052                }],
2053            })
2054            .await;
2055
2056        // Wait for server to see the diagnostics update.
2057        server
2058            .condition(|store| {
2059                let worktree = store
2060                    .project(project_id)
2061                    .unwrap()
2062                    .worktrees
2063                    .get(&worktree_id.to_proto())
2064                    .unwrap();
2065
2066                !worktree
2067                    .share
2068                    .as_ref()
2069                    .unwrap()
2070                    .diagnostic_summaries
2071                    .is_empty()
2072            })
2073            .await;
2074
2075        // Join the worktree as client B.
2076        let project_b = Project::remote(
2077            project_id,
2078            client_b.clone(),
2079            client_b.user_store.clone(),
2080            lang_registry.clone(),
2081            fs.clone(),
2082            &mut cx_b.to_async(),
2083        )
2084        .await
2085        .unwrap();
2086
2087        project_b.read_with(&cx_b, |project, cx| {
2088            assert_eq!(
2089                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2090                &[(
2091                    ProjectPath {
2092                        worktree_id,
2093                        path: Arc::from(Path::new("a.rs")),
2094                    },
2095                    DiagnosticSummary {
2096                        error_count: 1,
2097                        warning_count: 0,
2098                        ..Default::default()
2099                    },
2100                )]
2101            )
2102        });
2103
2104        // Simulate a language server reporting more errors for a file.
2105        fake_language_server
2106            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2107                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2108                version: None,
2109                diagnostics: vec![
2110                    lsp::Diagnostic {
2111                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2112                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2113                        message: "message 1".to_string(),
2114                        ..Default::default()
2115                    },
2116                    lsp::Diagnostic {
2117                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2118                        range: lsp::Range::new(
2119                            lsp::Position::new(0, 10),
2120                            lsp::Position::new(0, 13),
2121                        ),
2122                        message: "message 2".to_string(),
2123                        ..Default::default()
2124                    },
2125                ],
2126            })
2127            .await;
2128
2129        // Client b gets the updated summaries
2130        project_b
2131            .condition(&cx_b, |project, cx| {
2132                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2133                    == &[(
2134                        ProjectPath {
2135                            worktree_id,
2136                            path: Arc::from(Path::new("a.rs")),
2137                        },
2138                        DiagnosticSummary {
2139                            error_count: 1,
2140                            warning_count: 1,
2141                            ..Default::default()
2142                        },
2143                    )]
2144            })
2145            .await;
2146
2147        // Open the file with the errors on client B. They should be present.
2148        let buffer_b = cx_b
2149            .background()
2150            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2151            .await
2152            .unwrap();
2153
2154        buffer_b.read_with(&cx_b, |buffer, _| {
2155            assert_eq!(
2156                buffer
2157                    .snapshot()
2158                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2159                    .map(|entry| entry)
2160                    .collect::<Vec<_>>(),
2161                &[
2162                    DiagnosticEntry {
2163                        range: Point::new(0, 4)..Point::new(0, 7),
2164                        diagnostic: Diagnostic {
2165                            group_id: 0,
2166                            message: "message 1".to_string(),
2167                            severity: lsp::DiagnosticSeverity::ERROR,
2168                            is_primary: true,
2169                            ..Default::default()
2170                        }
2171                    },
2172                    DiagnosticEntry {
2173                        range: Point::new(0, 10)..Point::new(0, 13),
2174                        diagnostic: Diagnostic {
2175                            group_id: 1,
2176                            severity: lsp::DiagnosticSeverity::WARNING,
2177                            message: "message 2".to_string(),
2178                            is_primary: true,
2179                            ..Default::default()
2180                        }
2181                    }
2182                ]
2183            );
2184        });
2185    }
2186
2187    #[gpui::test(iterations = 10)]
2188    async fn test_collaborating_with_completion(
2189        mut cx_a: TestAppContext,
2190        mut cx_b: TestAppContext,
2191    ) {
2192        cx_a.foreground().forbid_parking();
2193        let mut lang_registry = Arc::new(LanguageRegistry::new());
2194        let fs = FakeFs::new(cx_a.background());
2195
2196        // Set up a fake language server.
2197        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2198        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2199            completion_provider: Some(lsp::CompletionOptions {
2200                trigger_characters: Some(vec![".".to_string()]),
2201                ..Default::default()
2202            }),
2203            ..Default::default()
2204        });
2205        Arc::get_mut(&mut lang_registry)
2206            .unwrap()
2207            .add(Arc::new(Language::new(
2208                LanguageConfig {
2209                    name: "Rust".to_string(),
2210                    path_suffixes: vec!["rs".to_string()],
2211                    language_server: Some(language_server_config),
2212                    ..Default::default()
2213                },
2214                Some(tree_sitter_rust::language()),
2215            )));
2216
2217        // Connect to a server as 2 clients.
2218        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2219        let client_a = server.create_client(&mut cx_a, "user_a").await;
2220        let client_b = server.create_client(&mut cx_b, "user_b").await;
2221
2222        // Share a project as client A
2223        fs.insert_tree(
2224            "/a",
2225            json!({
2226                ".zed.toml": r#"collaborators = ["user_b"]"#,
2227                "main.rs": "fn main() { a }",
2228                "other.rs": "",
2229            }),
2230        )
2231        .await;
2232        let project_a = cx_a.update(|cx| {
2233            Project::local(
2234                client_a.clone(),
2235                client_a.user_store.clone(),
2236                lang_registry.clone(),
2237                fs.clone(),
2238                cx,
2239            )
2240        });
2241        let (worktree_a, _) = project_a
2242            .update(&mut cx_a, |p, cx| {
2243                p.find_or_create_local_worktree("/a", false, cx)
2244            })
2245            .await
2246            .unwrap();
2247        worktree_a
2248            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2249            .await;
2250        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2251        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2252        project_a
2253            .update(&mut cx_a, |p, cx| p.share(cx))
2254            .await
2255            .unwrap();
2256
2257        // Join the worktree as client B.
2258        let project_b = Project::remote(
2259            project_id,
2260            client_b.clone(),
2261            client_b.user_store.clone(),
2262            lang_registry.clone(),
2263            fs.clone(),
2264            &mut cx_b.to_async(),
2265        )
2266        .await
2267        .unwrap();
2268
2269        // Open a file in an editor as the guest.
2270        let buffer_b = project_b
2271            .update(&mut cx_b, |p, cx| {
2272                p.open_buffer((worktree_id, "main.rs"), cx)
2273            })
2274            .await
2275            .unwrap();
2276        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2277        let editor_b = cx_b.add_view(window_b, |cx| {
2278            Editor::for_buffer(
2279                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2280                Arc::new(|cx| EditorSettings::test(cx)),
2281                Some(project_b.clone()),
2282                cx,
2283            )
2284        });
2285
2286        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2287        buffer_b
2288            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2289            .await;
2290
2291        // Type a completion trigger character as the guest.
2292        editor_b.update(&mut cx_b, |editor, cx| {
2293            editor.select_ranges([13..13], None, cx);
2294            editor.handle_input(&Input(".".into()), cx);
2295            cx.focus(&editor_b);
2296        });
2297
2298        // Receive a completion request as the host's language server.
2299        // Return some completions from the host's language server.
2300        cx_a.foreground().start_waiting();
2301        fake_language_server
2302            .handle_request::<lsp::request::Completion, _>(|params| {
2303                assert_eq!(
2304                    params.text_document_position.text_document.uri,
2305                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2306                );
2307                assert_eq!(
2308                    params.text_document_position.position,
2309                    lsp::Position::new(0, 14),
2310                );
2311
2312                Some(lsp::CompletionResponse::Array(vec![
2313                    lsp::CompletionItem {
2314                        label: "first_method(…)".into(),
2315                        detail: Some("fn(&mut self, B) -> C".into()),
2316                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2317                            new_text: "first_method($1)".to_string(),
2318                            range: lsp::Range::new(
2319                                lsp::Position::new(0, 14),
2320                                lsp::Position::new(0, 14),
2321                            ),
2322                        })),
2323                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2324                        ..Default::default()
2325                    },
2326                    lsp::CompletionItem {
2327                        label: "second_method(…)".into(),
2328                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2329                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2330                            new_text: "second_method()".to_string(),
2331                            range: lsp::Range::new(
2332                                lsp::Position::new(0, 14),
2333                                lsp::Position::new(0, 14),
2334                            ),
2335                        })),
2336                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2337                        ..Default::default()
2338                    },
2339                ]))
2340            })
2341            .next()
2342            .await
2343            .unwrap();
2344        cx_a.foreground().finish_waiting();
2345
2346        // Open the buffer on the host.
2347        let buffer_a = project_a
2348            .update(&mut cx_a, |p, cx| {
2349                p.open_buffer((worktree_id, "main.rs"), cx)
2350            })
2351            .await
2352            .unwrap();
2353        buffer_a
2354            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2355            .await;
2356
2357        // Confirm a completion on the guest.
2358        editor_b
2359            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2360            .await;
2361        editor_b.update(&mut cx_b, |editor, cx| {
2362            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2363            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2364        });
2365
2366        // Return a resolved completion from the host's language server.
2367        // The resolved completion has an additional text edit.
2368        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2369            assert_eq!(params.label, "first_method(…)");
2370            lsp::CompletionItem {
2371                label: "first_method(…)".into(),
2372                detail: Some("fn(&mut self, B) -> C".into()),
2373                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2374                    new_text: "first_method($1)".to_string(),
2375                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2376                })),
2377                additional_text_edits: Some(vec![lsp::TextEdit {
2378                    new_text: "use d::SomeTrait;\n".to_string(),
2379                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2380                }]),
2381                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2382                ..Default::default()
2383            }
2384        });
2385
2386        // The additional edit is applied.
2387        buffer_a
2388            .condition(&cx_a, |buffer, _| {
2389                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2390            })
2391            .await;
2392        buffer_b
2393            .condition(&cx_b, |buffer, _| {
2394                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2395            })
2396            .await;
2397    }
2398
2399    #[gpui::test(iterations = 10)]
2400    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2401        cx_a.foreground().forbid_parking();
2402        let mut lang_registry = Arc::new(LanguageRegistry::new());
2403        let fs = FakeFs::new(cx_a.background());
2404
2405        // Set up a fake language server.
2406        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2407        Arc::get_mut(&mut lang_registry)
2408            .unwrap()
2409            .add(Arc::new(Language::new(
2410                LanguageConfig {
2411                    name: "Rust".to_string(),
2412                    path_suffixes: vec!["rs".to_string()],
2413                    language_server: Some(language_server_config),
2414                    ..Default::default()
2415                },
2416                Some(tree_sitter_rust::language()),
2417            )));
2418
2419        // Connect to a server as 2 clients.
2420        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2421        let client_a = server.create_client(&mut cx_a, "user_a").await;
2422        let client_b = server.create_client(&mut cx_b, "user_b").await;
2423
2424        // Share a project as client A
2425        fs.insert_tree(
2426            "/a",
2427            json!({
2428                ".zed.toml": r#"collaborators = ["user_b"]"#,
2429                "a.rs": "let one = two",
2430            }),
2431        )
2432        .await;
2433        let project_a = cx_a.update(|cx| {
2434            Project::local(
2435                client_a.clone(),
2436                client_a.user_store.clone(),
2437                lang_registry.clone(),
2438                fs.clone(),
2439                cx,
2440            )
2441        });
2442        let (worktree_a, _) = project_a
2443            .update(&mut cx_a, |p, cx| {
2444                p.find_or_create_local_worktree("/a", false, cx)
2445            })
2446            .await
2447            .unwrap();
2448        worktree_a
2449            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2450            .await;
2451        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2452        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2453        project_a
2454            .update(&mut cx_a, |p, cx| p.share(cx))
2455            .await
2456            .unwrap();
2457
2458        // Join the worktree as client B.
2459        let project_b = Project::remote(
2460            project_id,
2461            client_b.clone(),
2462            client_b.user_store.clone(),
2463            lang_registry.clone(),
2464            fs.clone(),
2465            &mut cx_b.to_async(),
2466        )
2467        .await
2468        .unwrap();
2469
2470        let buffer_b = cx_b
2471            .background()
2472            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2473            .await
2474            .unwrap();
2475
2476        let format = project_b.update(&mut cx_b, |project, cx| {
2477            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2478        });
2479
2480        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2481        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2482            Some(vec![
2483                lsp::TextEdit {
2484                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2485                    new_text: "h".to_string(),
2486                },
2487                lsp::TextEdit {
2488                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2489                    new_text: "y".to_string(),
2490                },
2491            ])
2492        });
2493
2494        format.await.unwrap();
2495        assert_eq!(
2496            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2497            "let honey = two"
2498        );
2499    }
2500
2501    #[gpui::test(iterations = 10)]
2502    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2503        cx_a.foreground().forbid_parking();
2504        let mut lang_registry = Arc::new(LanguageRegistry::new());
2505        let fs = FakeFs::new(cx_a.background());
2506        fs.insert_tree(
2507            "/root-1",
2508            json!({
2509                ".zed.toml": r#"collaborators = ["user_b"]"#,
2510                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2511            }),
2512        )
2513        .await;
2514        fs.insert_tree(
2515            "/root-2",
2516            json!({
2517                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2518            }),
2519        )
2520        .await;
2521
2522        // Set up a fake language server.
2523        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2524        Arc::get_mut(&mut lang_registry)
2525            .unwrap()
2526            .add(Arc::new(Language::new(
2527                LanguageConfig {
2528                    name: "Rust".to_string(),
2529                    path_suffixes: vec!["rs".to_string()],
2530                    language_server: Some(language_server_config),
2531                    ..Default::default()
2532                },
2533                Some(tree_sitter_rust::language()),
2534            )));
2535
2536        // Connect to a server as 2 clients.
2537        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2538        let client_a = server.create_client(&mut cx_a, "user_a").await;
2539        let client_b = server.create_client(&mut cx_b, "user_b").await;
2540
2541        // Share a project as client A
2542        let project_a = cx_a.update(|cx| {
2543            Project::local(
2544                client_a.clone(),
2545                client_a.user_store.clone(),
2546                lang_registry.clone(),
2547                fs.clone(),
2548                cx,
2549            )
2550        });
2551        let (worktree_a, _) = project_a
2552            .update(&mut cx_a, |p, cx| {
2553                p.find_or_create_local_worktree("/root-1", false, cx)
2554            })
2555            .await
2556            .unwrap();
2557        worktree_a
2558            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2559            .await;
2560        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2561        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2562        project_a
2563            .update(&mut cx_a, |p, cx| p.share(cx))
2564            .await
2565            .unwrap();
2566
2567        // Join the worktree as client B.
2568        let project_b = Project::remote(
2569            project_id,
2570            client_b.clone(),
2571            client_b.user_store.clone(),
2572            lang_registry.clone(),
2573            fs.clone(),
2574            &mut cx_b.to_async(),
2575        )
2576        .await
2577        .unwrap();
2578
2579        // Open the file on client B.
2580        let buffer_b = cx_b
2581            .background()
2582            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2583            .await
2584            .unwrap();
2585
2586        // Request the definition of a symbol as the guest.
2587        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2588
2589        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2590        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2591            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2592                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2593                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2594            )))
2595        });
2596
2597        let definitions_1 = definitions_1.await.unwrap();
2598        cx_b.read(|cx| {
2599            assert_eq!(definitions_1.len(), 1);
2600            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2601            let target_buffer = definitions_1[0].target_buffer.read(cx);
2602            assert_eq!(
2603                target_buffer.text(),
2604                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2605            );
2606            assert_eq!(
2607                definitions_1[0].target_range.to_point(target_buffer),
2608                Point::new(0, 6)..Point::new(0, 9)
2609            );
2610        });
2611
2612        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2613        // the previous call to `definition`.
2614        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2615        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2616            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2617                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2618                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2619            )))
2620        });
2621
2622        let definitions_2 = definitions_2.await.unwrap();
2623        cx_b.read(|cx| {
2624            assert_eq!(definitions_2.len(), 1);
2625            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2626            let target_buffer = definitions_2[0].target_buffer.read(cx);
2627            assert_eq!(
2628                target_buffer.text(),
2629                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2630            );
2631            assert_eq!(
2632                definitions_2[0].target_range.to_point(target_buffer),
2633                Point::new(1, 6)..Point::new(1, 11)
2634            );
2635        });
2636        assert_eq!(
2637            definitions_1[0].target_buffer,
2638            definitions_2[0].target_buffer
2639        );
2640
2641        cx_b.update(|_| {
2642            drop(definitions_1);
2643            drop(definitions_2);
2644        });
2645        project_b
2646            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2647            .await;
2648    }
2649
2650    #[gpui::test(iterations = 10)]
2651    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2652        mut cx_a: TestAppContext,
2653        mut cx_b: TestAppContext,
2654        mut rng: StdRng,
2655    ) {
2656        cx_a.foreground().forbid_parking();
2657        let mut lang_registry = Arc::new(LanguageRegistry::new());
2658        let fs = FakeFs::new(cx_a.background());
2659        fs.insert_tree(
2660            "/root",
2661            json!({
2662                ".zed.toml": r#"collaborators = ["user_b"]"#,
2663                "a.rs": "const ONE: usize = b::TWO;",
2664                "b.rs": "const TWO: usize = 2",
2665            }),
2666        )
2667        .await;
2668
2669        // Set up a fake language server.
2670        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2671
2672        Arc::get_mut(&mut lang_registry)
2673            .unwrap()
2674            .add(Arc::new(Language::new(
2675                LanguageConfig {
2676                    name: "Rust".to_string(),
2677                    path_suffixes: vec!["rs".to_string()],
2678                    language_server: Some(language_server_config),
2679                    ..Default::default()
2680                },
2681                Some(tree_sitter_rust::language()),
2682            )));
2683
2684        // Connect to a server as 2 clients.
2685        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2686        let client_a = server.create_client(&mut cx_a, "user_a").await;
2687        let client_b = server.create_client(&mut cx_b, "user_b").await;
2688
2689        // Share a project as client A
2690        let project_a = cx_a.update(|cx| {
2691            Project::local(
2692                client_a.clone(),
2693                client_a.user_store.clone(),
2694                lang_registry.clone(),
2695                fs.clone(),
2696                cx,
2697            )
2698        });
2699
2700        let (worktree_a, _) = project_a
2701            .update(&mut cx_a, |p, cx| {
2702                p.find_or_create_local_worktree("/root", false, cx)
2703            })
2704            .await
2705            .unwrap();
2706        worktree_a
2707            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2708            .await;
2709        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2710        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2711        project_a
2712            .update(&mut cx_a, |p, cx| p.share(cx))
2713            .await
2714            .unwrap();
2715
2716        // Join the worktree as client B.
2717        let project_b = Project::remote(
2718            project_id,
2719            client_b.clone(),
2720            client_b.user_store.clone(),
2721            lang_registry.clone(),
2722            fs.clone(),
2723            &mut cx_b.to_async(),
2724        )
2725        .await
2726        .unwrap();
2727
2728        let buffer_b1 = cx_b
2729            .background()
2730            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2731            .await
2732            .unwrap();
2733
2734        let definitions;
2735        let buffer_b2;
2736        if rng.gen() {
2737            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2738            buffer_b2 =
2739                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2740        } else {
2741            buffer_b2 =
2742                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2743            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2744        }
2745
2746        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2747        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2748            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2749                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2750                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2751            )))
2752        });
2753
2754        let buffer_b2 = buffer_b2.await.unwrap();
2755        let definitions = definitions.await.unwrap();
2756        assert_eq!(definitions.len(), 1);
2757        assert_eq!(definitions[0].target_buffer, buffer_b2);
2758    }
2759
2760    #[gpui::test(iterations = 10)]
2761    async fn test_collaborating_with_code_actions(
2762        mut cx_a: TestAppContext,
2763        mut cx_b: TestAppContext,
2764    ) {
2765        cx_a.foreground().forbid_parking();
2766        let mut lang_registry = Arc::new(LanguageRegistry::new());
2767        let fs = FakeFs::new(cx_a.background());
2768        let mut path_openers_b = Vec::new();
2769        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2770
2771        // Set up a fake language server.
2772        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2773        Arc::get_mut(&mut lang_registry)
2774            .unwrap()
2775            .add(Arc::new(Language::new(
2776                LanguageConfig {
2777                    name: "Rust".to_string(),
2778                    path_suffixes: vec!["rs".to_string()],
2779                    language_server: Some(language_server_config),
2780                    ..Default::default()
2781                },
2782                Some(tree_sitter_rust::language()),
2783            )));
2784
2785        // Connect to a server as 2 clients.
2786        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2787        let client_a = server.create_client(&mut cx_a, "user_a").await;
2788        let client_b = server.create_client(&mut cx_b, "user_b").await;
2789
2790        // Share a project as client A
2791        fs.insert_tree(
2792            "/a",
2793            json!({
2794                ".zed.toml": r#"collaborators = ["user_b"]"#,
2795                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2796                "other.rs": "pub fn foo() -> usize { 4 }",
2797            }),
2798        )
2799        .await;
2800        let project_a = cx_a.update(|cx| {
2801            Project::local(
2802                client_a.clone(),
2803                client_a.user_store.clone(),
2804                lang_registry.clone(),
2805                fs.clone(),
2806                cx,
2807            )
2808        });
2809        let (worktree_a, _) = project_a
2810            .update(&mut cx_a, |p, cx| {
2811                p.find_or_create_local_worktree("/a", false, cx)
2812            })
2813            .await
2814            .unwrap();
2815        worktree_a
2816            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2817            .await;
2818        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2819        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2820        project_a
2821            .update(&mut cx_a, |p, cx| p.share(cx))
2822            .await
2823            .unwrap();
2824
2825        // Join the worktree as client B.
2826        let project_b = Project::remote(
2827            project_id,
2828            client_b.clone(),
2829            client_b.user_store.clone(),
2830            lang_registry.clone(),
2831            fs.clone(),
2832            &mut cx_b.to_async(),
2833        )
2834        .await
2835        .unwrap();
2836        let mut params = cx_b.update(WorkspaceParams::test);
2837        params.languages = lang_registry.clone();
2838        params.client = client_b.client.clone();
2839        params.user_store = client_b.user_store.clone();
2840        params.project = project_b;
2841        params.path_openers = path_openers_b.into();
2842
2843        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2844        let editor_b = workspace_b
2845            .update(&mut cx_b, |workspace, cx| {
2846                workspace.open_path((worktree_id, "main.rs").into(), cx)
2847            })
2848            .await
2849            .unwrap()
2850            .downcast::<Editor>()
2851            .unwrap();
2852
2853        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2854        fake_language_server
2855            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2856                assert_eq!(
2857                    params.text_document.uri,
2858                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2859                );
2860                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2861                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2862                None
2863            })
2864            .next()
2865            .await;
2866
2867        // Move cursor to a location that contains code actions.
2868        editor_b.update(&mut cx_b, |editor, cx| {
2869            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2870            cx.focus(&editor_b);
2871        });
2872
2873        fake_language_server
2874            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2875                assert_eq!(
2876                    params.text_document.uri,
2877                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2878                );
2879                assert_eq!(params.range.start, lsp::Position::new(1, 31));
2880                assert_eq!(params.range.end, lsp::Position::new(1, 31));
2881
2882                Some(vec![lsp::CodeActionOrCommand::CodeAction(
2883                    lsp::CodeAction {
2884                        title: "Inline into all callers".to_string(),
2885                        edit: Some(lsp::WorkspaceEdit {
2886                            changes: Some(
2887                                [
2888                                    (
2889                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
2890                                        vec![lsp::TextEdit::new(
2891                                            lsp::Range::new(
2892                                                lsp::Position::new(1, 22),
2893                                                lsp::Position::new(1, 34),
2894                                            ),
2895                                            "4".to_string(),
2896                                        )],
2897                                    ),
2898                                    (
2899                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
2900                                        vec![lsp::TextEdit::new(
2901                                            lsp::Range::new(
2902                                                lsp::Position::new(0, 0),
2903                                                lsp::Position::new(0, 27),
2904                                            ),
2905                                            "".to_string(),
2906                                        )],
2907                                    ),
2908                                ]
2909                                .into_iter()
2910                                .collect(),
2911                            ),
2912                            ..Default::default()
2913                        }),
2914                        data: Some(json!({
2915                            "codeActionParams": {
2916                                "range": {
2917                                    "start": {"line": 1, "column": 31},
2918                                    "end": {"line": 1, "column": 31},
2919                                }
2920                            }
2921                        })),
2922                        ..Default::default()
2923                    },
2924                )])
2925            })
2926            .next()
2927            .await;
2928
2929        // Toggle code actions and wait for them to display.
2930        editor_b.update(&mut cx_b, |editor, cx| {
2931            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2932        });
2933        editor_b
2934            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2935            .await;
2936
2937        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
2938
2939        // Confirming the code action will trigger a resolve request.
2940        let confirm_action = workspace_b
2941            .update(&mut cx_b, |workspace, cx| {
2942                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2943            })
2944            .unwrap();
2945        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2946            lsp::CodeAction {
2947                title: "Inline into all callers".to_string(),
2948                edit: Some(lsp::WorkspaceEdit {
2949                    changes: Some(
2950                        [
2951                            (
2952                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2953                                vec![lsp::TextEdit::new(
2954                                    lsp::Range::new(
2955                                        lsp::Position::new(1, 22),
2956                                        lsp::Position::new(1, 34),
2957                                    ),
2958                                    "4".to_string(),
2959                                )],
2960                            ),
2961                            (
2962                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2963                                vec![lsp::TextEdit::new(
2964                                    lsp::Range::new(
2965                                        lsp::Position::new(0, 0),
2966                                        lsp::Position::new(0, 27),
2967                                    ),
2968                                    "".to_string(),
2969                                )],
2970                            ),
2971                        ]
2972                        .into_iter()
2973                        .collect(),
2974                    ),
2975                    ..Default::default()
2976                }),
2977                ..Default::default()
2978            }
2979        });
2980
2981        // After the action is confirmed, an editor containing both modified files is opened.
2982        confirm_action.await.unwrap();
2983        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2984            workspace
2985                .active_item(cx)
2986                .unwrap()
2987                .downcast::<Editor>()
2988                .unwrap()
2989        });
2990        code_action_editor.update(&mut cx_b, |editor, cx| {
2991            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2992            editor.undo(&Undo, cx);
2993            assert_eq!(
2994                editor.text(cx),
2995                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2996            );
2997            editor.redo(&Redo, cx);
2998            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2999        });
3000    }
3001
3002    #[gpui::test(iterations = 10)]
3003    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3004        cx_a.foreground().forbid_parking();
3005
3006        // Connect to a server as 2 clients.
3007        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3008        let client_a = server.create_client(&mut cx_a, "user_a").await;
3009        let client_b = server.create_client(&mut cx_b, "user_b").await;
3010
3011        // Create an org that includes these 2 users.
3012        let db = &server.app_state.db;
3013        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3014        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3015            .await
3016            .unwrap();
3017        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3018            .await
3019            .unwrap();
3020
3021        // Create a channel that includes all the users.
3022        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3023        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3024            .await
3025            .unwrap();
3026        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3027            .await
3028            .unwrap();
3029        db.create_channel_message(
3030            channel_id,
3031            client_b.current_user_id(&cx_b),
3032            "hello A, it's B.",
3033            OffsetDateTime::now_utc(),
3034            1,
3035        )
3036        .await
3037        .unwrap();
3038
3039        let channels_a = cx_a
3040            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3041        channels_a
3042            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3043            .await;
3044        channels_a.read_with(&cx_a, |list, _| {
3045            assert_eq!(
3046                list.available_channels().unwrap(),
3047                &[ChannelDetails {
3048                    id: channel_id.to_proto(),
3049                    name: "test-channel".to_string()
3050                }]
3051            )
3052        });
3053        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3054            this.get_channel(channel_id.to_proto(), cx).unwrap()
3055        });
3056        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3057        channel_a
3058            .condition(&cx_a, |channel, _| {
3059                channel_messages(channel)
3060                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3061            })
3062            .await;
3063
3064        let channels_b = cx_b
3065            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3066        channels_b
3067            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3068            .await;
3069        channels_b.read_with(&cx_b, |list, _| {
3070            assert_eq!(
3071                list.available_channels().unwrap(),
3072                &[ChannelDetails {
3073                    id: channel_id.to_proto(),
3074                    name: "test-channel".to_string()
3075                }]
3076            )
3077        });
3078
3079        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3080            this.get_channel(channel_id.to_proto(), cx).unwrap()
3081        });
3082        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3083        channel_b
3084            .condition(&cx_b, |channel, _| {
3085                channel_messages(channel)
3086                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3087            })
3088            .await;
3089
3090        channel_a
3091            .update(&mut cx_a, |channel, cx| {
3092                channel
3093                    .send_message("oh, hi B.".to_string(), cx)
3094                    .unwrap()
3095                    .detach();
3096                let task = channel.send_message("sup".to_string(), cx).unwrap();
3097                assert_eq!(
3098                    channel_messages(channel),
3099                    &[
3100                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3101                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3102                        ("user_a".to_string(), "sup".to_string(), true)
3103                    ]
3104                );
3105                task
3106            })
3107            .await
3108            .unwrap();
3109
3110        channel_b
3111            .condition(&cx_b, |channel, _| {
3112                channel_messages(channel)
3113                    == [
3114                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3115                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3116                        ("user_a".to_string(), "sup".to_string(), false),
3117                    ]
3118            })
3119            .await;
3120
3121        assert_eq!(
3122            server
3123                .state()
3124                .await
3125                .channel(channel_id)
3126                .unwrap()
3127                .connection_ids
3128                .len(),
3129            2
3130        );
3131        cx_b.update(|_| drop(channel_b));
3132        server
3133            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3134            .await;
3135
3136        cx_a.update(|_| drop(channel_a));
3137        server
3138            .condition(|state| state.channel(channel_id).is_none())
3139            .await;
3140    }
3141
3142    #[gpui::test(iterations = 10)]
3143    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3144        cx_a.foreground().forbid_parking();
3145
3146        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3147        let client_a = server.create_client(&mut cx_a, "user_a").await;
3148
3149        let db = &server.app_state.db;
3150        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3151        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3152        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3153            .await
3154            .unwrap();
3155        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3156            .await
3157            .unwrap();
3158
3159        let channels_a = cx_a
3160            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3161        channels_a
3162            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3163            .await;
3164        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3165            this.get_channel(channel_id.to_proto(), cx).unwrap()
3166        });
3167
3168        // Messages aren't allowed to be too long.
3169        channel_a
3170            .update(&mut cx_a, |channel, cx| {
3171                let long_body = "this is long.\n".repeat(1024);
3172                channel.send_message(long_body, cx).unwrap()
3173            })
3174            .await
3175            .unwrap_err();
3176
3177        // Messages aren't allowed to be blank.
3178        channel_a.update(&mut cx_a, |channel, cx| {
3179            channel.send_message(String::new(), cx).unwrap_err()
3180        });
3181
3182        // Leading and trailing whitespace are trimmed.
3183        channel_a
3184            .update(&mut cx_a, |channel, cx| {
3185                channel
3186                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3187                    .unwrap()
3188            })
3189            .await
3190            .unwrap();
3191        assert_eq!(
3192            db.get_channel_messages(channel_id, 10, None)
3193                .await
3194                .unwrap()
3195                .iter()
3196                .map(|m| &m.body)
3197                .collect::<Vec<_>>(),
3198            &["surrounded by whitespace"]
3199        );
3200    }
3201
3202    #[gpui::test(iterations = 10)]
3203    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3204        cx_a.foreground().forbid_parking();
3205
3206        // Connect to a server as 2 clients.
3207        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3208        let client_a = server.create_client(&mut cx_a, "user_a").await;
3209        let client_b = server.create_client(&mut cx_b, "user_b").await;
3210        let mut status_b = client_b.status();
3211
3212        // Create an org that includes these 2 users.
3213        let db = &server.app_state.db;
3214        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3215        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3216            .await
3217            .unwrap();
3218        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3219            .await
3220            .unwrap();
3221
3222        // Create a channel that includes all the users.
3223        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3224        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3225            .await
3226            .unwrap();
3227        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3228            .await
3229            .unwrap();
3230        db.create_channel_message(
3231            channel_id,
3232            client_b.current_user_id(&cx_b),
3233            "hello A, it's B.",
3234            OffsetDateTime::now_utc(),
3235            2,
3236        )
3237        .await
3238        .unwrap();
3239
3240        let channels_a = cx_a
3241            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3242        channels_a
3243            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3244            .await;
3245
3246        channels_a.read_with(&cx_a, |list, _| {
3247            assert_eq!(
3248                list.available_channels().unwrap(),
3249                &[ChannelDetails {
3250                    id: channel_id.to_proto(),
3251                    name: "test-channel".to_string()
3252                }]
3253            )
3254        });
3255        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3256            this.get_channel(channel_id.to_proto(), cx).unwrap()
3257        });
3258        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3259        channel_a
3260            .condition(&cx_a, |channel, _| {
3261                channel_messages(channel)
3262                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3263            })
3264            .await;
3265
3266        let channels_b = cx_b
3267            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3268        channels_b
3269            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3270            .await;
3271        channels_b.read_with(&cx_b, |list, _| {
3272            assert_eq!(
3273                list.available_channels().unwrap(),
3274                &[ChannelDetails {
3275                    id: channel_id.to_proto(),
3276                    name: "test-channel".to_string()
3277                }]
3278            )
3279        });
3280
3281        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3282            this.get_channel(channel_id.to_proto(), cx).unwrap()
3283        });
3284        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3285        channel_b
3286            .condition(&cx_b, |channel, _| {
3287                channel_messages(channel)
3288                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3289            })
3290            .await;
3291
3292        // Disconnect client B, ensuring we can still access its cached channel data.
3293        server.forbid_connections();
3294        server.disconnect_client(client_b.current_user_id(&cx_b));
3295        while !matches!(
3296            status_b.next().await,
3297            Some(client::Status::ReconnectionError { .. })
3298        ) {}
3299
3300        channels_b.read_with(&cx_b, |channels, _| {
3301            assert_eq!(
3302                channels.available_channels().unwrap(),
3303                [ChannelDetails {
3304                    id: channel_id.to_proto(),
3305                    name: "test-channel".to_string()
3306                }]
3307            )
3308        });
3309        channel_b.read_with(&cx_b, |channel, _| {
3310            assert_eq!(
3311                channel_messages(channel),
3312                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3313            )
3314        });
3315
3316        // Send a message from client B while it is disconnected.
3317        channel_b
3318            .update(&mut cx_b, |channel, cx| {
3319                let task = channel
3320                    .send_message("can you see this?".to_string(), cx)
3321                    .unwrap();
3322                assert_eq!(
3323                    channel_messages(channel),
3324                    &[
3325                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3326                        ("user_b".to_string(), "can you see this?".to_string(), true)
3327                    ]
3328                );
3329                task
3330            })
3331            .await
3332            .unwrap_err();
3333
3334        // Send a message from client A while B is disconnected.
3335        channel_a
3336            .update(&mut cx_a, |channel, cx| {
3337                channel
3338                    .send_message("oh, hi B.".to_string(), cx)
3339                    .unwrap()
3340                    .detach();
3341                let task = channel.send_message("sup".to_string(), cx).unwrap();
3342                assert_eq!(
3343                    channel_messages(channel),
3344                    &[
3345                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3346                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3347                        ("user_a".to_string(), "sup".to_string(), true)
3348                    ]
3349                );
3350                task
3351            })
3352            .await
3353            .unwrap();
3354
3355        // Give client B a chance to reconnect.
3356        server.allow_connections();
3357        cx_b.foreground().advance_clock(Duration::from_secs(10));
3358
3359        // Verify that B sees the new messages upon reconnection, as well as the message client B
3360        // sent while offline.
3361        channel_b
3362            .condition(&cx_b, |channel, _| {
3363                channel_messages(channel)
3364                    == [
3365                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3366                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3367                        ("user_a".to_string(), "sup".to_string(), false),
3368                        ("user_b".to_string(), "can you see this?".to_string(), false),
3369                    ]
3370            })
3371            .await;
3372
3373        // Ensure client A and B can communicate normally after reconnection.
3374        channel_a
3375            .update(&mut cx_a, |channel, cx| {
3376                channel.send_message("you online?".to_string(), cx).unwrap()
3377            })
3378            .await
3379            .unwrap();
3380        channel_b
3381            .condition(&cx_b, |channel, _| {
3382                channel_messages(channel)
3383                    == [
3384                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3385                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3386                        ("user_a".to_string(), "sup".to_string(), false),
3387                        ("user_b".to_string(), "can you see this?".to_string(), false),
3388                        ("user_a".to_string(), "you online?".to_string(), false),
3389                    ]
3390            })
3391            .await;
3392
3393        channel_b
3394            .update(&mut cx_b, |channel, cx| {
3395                channel.send_message("yep".to_string(), cx).unwrap()
3396            })
3397            .await
3398            .unwrap();
3399        channel_a
3400            .condition(&cx_a, |channel, _| {
3401                channel_messages(channel)
3402                    == [
3403                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3404                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3405                        ("user_a".to_string(), "sup".to_string(), false),
3406                        ("user_b".to_string(), "can you see this?".to_string(), false),
3407                        ("user_a".to_string(), "you online?".to_string(), false),
3408                        ("user_b".to_string(), "yep".to_string(), false),
3409                    ]
3410            })
3411            .await;
3412    }
3413
3414    #[gpui::test(iterations = 10)]
3415    async fn test_contacts(
3416        mut cx_a: TestAppContext,
3417        mut cx_b: TestAppContext,
3418        mut cx_c: TestAppContext,
3419    ) {
3420        cx_a.foreground().forbid_parking();
3421        let lang_registry = Arc::new(LanguageRegistry::new());
3422        let fs = FakeFs::new(cx_a.background());
3423
3424        // Connect to a server as 3 clients.
3425        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3426        let client_a = server.create_client(&mut cx_a, "user_a").await;
3427        let client_b = server.create_client(&mut cx_b, "user_b").await;
3428        let client_c = server.create_client(&mut cx_c, "user_c").await;
3429
3430        // Share a worktree as client A.
3431        fs.insert_tree(
3432            "/a",
3433            json!({
3434                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3435            }),
3436        )
3437        .await;
3438
3439        let project_a = cx_a.update(|cx| {
3440            Project::local(
3441                client_a.clone(),
3442                client_a.user_store.clone(),
3443                lang_registry.clone(),
3444                fs.clone(),
3445                cx,
3446            )
3447        });
3448        let (worktree_a, _) = project_a
3449            .update(&mut cx_a, |p, cx| {
3450                p.find_or_create_local_worktree("/a", false, cx)
3451            })
3452            .await
3453            .unwrap();
3454        worktree_a
3455            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3456            .await;
3457
3458        client_a
3459            .user_store
3460            .condition(&cx_a, |user_store, _| {
3461                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3462            })
3463            .await;
3464        client_b
3465            .user_store
3466            .condition(&cx_b, |user_store, _| {
3467                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3468            })
3469            .await;
3470        client_c
3471            .user_store
3472            .condition(&cx_c, |user_store, _| {
3473                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3474            })
3475            .await;
3476
3477        let project_id = project_a
3478            .update(&mut cx_a, |project, _| project.next_remote_id())
3479            .await;
3480        project_a
3481            .update(&mut cx_a, |project, cx| project.share(cx))
3482            .await
3483            .unwrap();
3484
3485        let _project_b = Project::remote(
3486            project_id,
3487            client_b.clone(),
3488            client_b.user_store.clone(),
3489            lang_registry.clone(),
3490            fs.clone(),
3491            &mut cx_b.to_async(),
3492        )
3493        .await
3494        .unwrap();
3495
3496        client_a
3497            .user_store
3498            .condition(&cx_a, |user_store, _| {
3499                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3500            })
3501            .await;
3502        client_b
3503            .user_store
3504            .condition(&cx_b, |user_store, _| {
3505                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3506            })
3507            .await;
3508        client_c
3509            .user_store
3510            .condition(&cx_c, |user_store, _| {
3511                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3512            })
3513            .await;
3514
3515        project_a
3516            .condition(&cx_a, |project, _| {
3517                project.collaborators().contains_key(&client_b.peer_id)
3518            })
3519            .await;
3520
3521        cx_a.update(move |_| drop(project_a));
3522        client_a
3523            .user_store
3524            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3525            .await;
3526        client_b
3527            .user_store
3528            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3529            .await;
3530        client_c
3531            .user_store
3532            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3533            .await;
3534
3535        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3536            user_store
3537                .contacts()
3538                .iter()
3539                .map(|contact| {
3540                    let worktrees = contact
3541                        .projects
3542                        .iter()
3543                        .map(|p| {
3544                            (
3545                                p.worktree_root_names[0].as_str(),
3546                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3547                            )
3548                        })
3549                        .collect();
3550                    (contact.user.github_login.as_str(), worktrees)
3551                })
3552                .collect()
3553        }
3554    }
3555
3556    #[gpui::test(iterations = 100)]
3557    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3558        cx.foreground().forbid_parking();
3559        let max_peers = env::var("MAX_PEERS")
3560            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3561            .unwrap_or(5);
3562        let max_operations = env::var("OPERATIONS")
3563            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3564            .unwrap_or(10);
3565
3566        let rng = Rc::new(RefCell::new(rng));
3567
3568        let mut host_lang_registry = Arc::new(LanguageRegistry::new());
3569        let guest_lang_registry = Arc::new(LanguageRegistry::new());
3570
3571        // Set up a fake language server.
3572        let (mut language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3573        language_server_config.set_fake_initializer(|fake_server| {
3574            fake_server.handle_request::<lsp::request::Completion, _>(|_| {
3575                Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
3576                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3577                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3578                        new_text: "the-new-text".to_string(),
3579                    })),
3580                    ..Default::default()
3581                }]))
3582            });
3583
3584            fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
3585                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3586                    lsp::CodeAction {
3587                        title: "the-code-action".to_string(),
3588                        ..Default::default()
3589                    },
3590                )])
3591            });
3592        });
3593
3594        Arc::get_mut(&mut host_lang_registry)
3595            .unwrap()
3596            .add(Arc::new(Language::new(
3597                LanguageConfig {
3598                    name: "Rust".to_string(),
3599                    path_suffixes: vec!["rs".to_string()],
3600                    language_server: Some(language_server_config),
3601                    ..Default::default()
3602                },
3603                None,
3604            )));
3605
3606        let fs = FakeFs::new(cx.background());
3607        fs.insert_tree(
3608            "/_collab",
3609            json!({
3610                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3611            }),
3612        )
3613        .await;
3614
3615        let operations = Rc::new(Cell::new(0));
3616        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
3617        let mut clients = Vec::new();
3618
3619        let mut next_entity_id = 100000;
3620        let mut host_cx = TestAppContext::new(
3621            cx.foreground_platform(),
3622            cx.platform(),
3623            cx.foreground(),
3624            cx.background(),
3625            cx.font_cache(),
3626            next_entity_id,
3627        );
3628        let host = server.create_client(&mut host_cx, "host").await;
3629        let host_project = host_cx.update(|cx| {
3630            Project::local(
3631                host.client.clone(),
3632                host.user_store.clone(),
3633                host_lang_registry.clone(),
3634                fs.clone(),
3635                cx,
3636            )
3637        });
3638        let host_project_id = host_project
3639            .update(&mut host_cx, |p, _| p.next_remote_id())
3640            .await;
3641
3642        let (collab_worktree, _) = host_project
3643            .update(&mut host_cx, |project, cx| {
3644                project.find_or_create_local_worktree("/_collab", false, cx)
3645            })
3646            .await
3647            .unwrap();
3648        collab_worktree
3649            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3650            .await;
3651        host_project
3652            .update(&mut host_cx, |project, cx| project.share(cx))
3653            .await
3654            .unwrap();
3655
3656        clients.push(cx.foreground().spawn(host.simulate_host(
3657            host_project.clone(),
3658            operations.clone(),
3659            max_operations,
3660            rng.clone(),
3661            host_cx.clone(),
3662        )));
3663
3664        while operations.get() < max_operations {
3665            cx.background().simulate_random_delay().await;
3666            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3667                operations.set(operations.get() + 1);
3668
3669                let guest_id = clients.len();
3670                log::info!("Adding guest {}", guest_id);
3671                next_entity_id += 100000;
3672                let mut guest_cx = TestAppContext::new(
3673                    cx.foreground_platform(),
3674                    cx.platform(),
3675                    cx.foreground(),
3676                    cx.background(),
3677                    cx.font_cache(),
3678                    next_entity_id,
3679                );
3680                let guest = server
3681                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3682                    .await;
3683                let guest_project = Project::remote(
3684                    host_project_id,
3685                    guest.client.clone(),
3686                    guest.user_store.clone(),
3687                    guest_lang_registry.clone(),
3688                    fs.clone(),
3689                    &mut guest_cx.to_async(),
3690                )
3691                .await
3692                .unwrap();
3693                clients.push(cx.foreground().spawn(guest.simulate_guest(
3694                    guest_id,
3695                    guest_project,
3696                    operations.clone(),
3697                    max_operations,
3698                    rng.clone(),
3699                    guest_cx,
3700                )));
3701
3702                log::info!("Guest {} added", guest_id);
3703            }
3704        }
3705
3706        let clients = futures::future::join_all(clients).await;
3707        cx.foreground().run_until_parked();
3708
3709        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3710            project
3711                .worktrees(cx)
3712                .map(|worktree| {
3713                    let snapshot = worktree.read(cx).snapshot();
3714                    (snapshot.id(), snapshot)
3715                })
3716                .collect::<BTreeMap<_, _>>()
3717        });
3718
3719        for (guest_client, guest_cx) in clients.iter().skip(1) {
3720            let guest_id = guest_client.client.id();
3721            let worktree_snapshots =
3722                guest_client
3723                    .project
3724                    .as_ref()
3725                    .unwrap()
3726                    .read_with(guest_cx, |project, cx| {
3727                        project
3728                            .worktrees(cx)
3729                            .map(|worktree| {
3730                                let worktree = worktree.read(cx);
3731                                assert!(
3732                                    !worktree.as_remote().unwrap().has_pending_updates(),
3733                                    "Guest {} worktree {:?} contains deferred updates",
3734                                    guest_id,
3735                                    worktree.id()
3736                                );
3737                                (worktree.id(), worktree.snapshot())
3738                            })
3739                            .collect::<BTreeMap<_, _>>()
3740                    });
3741
3742            assert_eq!(
3743                worktree_snapshots.keys().collect::<Vec<_>>(),
3744                host_worktree_snapshots.keys().collect::<Vec<_>>(),
3745                "guest {} has different worktrees than the host",
3746                guest_id
3747            );
3748            for (id, host_snapshot) in &host_worktree_snapshots {
3749                let guest_snapshot = &worktree_snapshots[id];
3750                assert_eq!(
3751                    guest_snapshot.root_name(),
3752                    host_snapshot.root_name(),
3753                    "guest {} has different root name than the host for worktree {}",
3754                    guest_id,
3755                    id
3756                );
3757                assert_eq!(
3758                    guest_snapshot.entries(false).collect::<Vec<_>>(),
3759                    host_snapshot.entries(false).collect::<Vec<_>>(),
3760                    "guest {} has different snapshot than the host for worktree {}",
3761                    guest_id,
3762                    id
3763                );
3764            }
3765
3766            guest_client
3767                .project
3768                .as_ref()
3769                .unwrap()
3770                .read_with(guest_cx, |project, _| {
3771                    assert!(
3772                        !project.has_buffered_operations(),
3773                        "guest {} has buffered operations ",
3774                        guest_id,
3775                    );
3776                });
3777
3778            for guest_buffer in &guest_client.buffers {
3779                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
3780                let host_buffer = host_project.read_with(&host_cx, |project, _| {
3781                    project
3782                        .shared_buffer(guest_client.peer_id, buffer_id)
3783                        .expect(&format!(
3784                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
3785                            guest_id, guest_client.peer_id, buffer_id
3786                        ))
3787                });
3788                assert_eq!(
3789                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
3790                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
3791                    "guest {} buffer {} differs from the host's buffer",
3792                    guest_id,
3793                    buffer_id,
3794                );
3795            }
3796        }
3797    }
3798
3799    struct TestServer {
3800        peer: Arc<Peer>,
3801        app_state: Arc<AppState>,
3802        server: Arc<Server>,
3803        foreground: Rc<executor::Foreground>,
3804        notifications: mpsc::UnboundedReceiver<()>,
3805        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3806        forbid_connections: Arc<AtomicBool>,
3807        _test_db: TestDb,
3808    }
3809
3810    impl TestServer {
3811        async fn start(
3812            foreground: Rc<executor::Foreground>,
3813            background: Arc<executor::Background>,
3814        ) -> Self {
3815            let test_db = TestDb::fake(background);
3816            let app_state = Self::build_app_state(&test_db).await;
3817            let peer = Peer::new();
3818            let notifications = mpsc::unbounded();
3819            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3820            Self {
3821                peer,
3822                app_state,
3823                server,
3824                foreground,
3825                notifications: notifications.1,
3826                connection_killers: Default::default(),
3827                forbid_connections: Default::default(),
3828                _test_db: test_db,
3829            }
3830        }
3831
3832        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3833            let http = FakeHttpClient::with_404_response();
3834            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3835            let client_name = name.to_string();
3836            let mut client = Client::new(http.clone());
3837            let server = self.server.clone();
3838            let connection_killers = self.connection_killers.clone();
3839            let forbid_connections = self.forbid_connections.clone();
3840            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
3841
3842            Arc::get_mut(&mut client)
3843                .unwrap()
3844                .override_authenticate(move |cx| {
3845                    cx.spawn(|_| async move {
3846                        let access_token = "the-token".to_string();
3847                        Ok(Credentials {
3848                            user_id: user_id.0 as u64,
3849                            access_token,
3850                        })
3851                    })
3852                })
3853                .override_establish_connection(move |credentials, cx| {
3854                    assert_eq!(credentials.user_id, user_id.0 as u64);
3855                    assert_eq!(credentials.access_token, "the-token");
3856
3857                    let server = server.clone();
3858                    let connection_killers = connection_killers.clone();
3859                    let forbid_connections = forbid_connections.clone();
3860                    let client_name = client_name.clone();
3861                    let connection_id_tx = connection_id_tx.clone();
3862                    cx.spawn(move |cx| async move {
3863                        if forbid_connections.load(SeqCst) {
3864                            Err(EstablishConnectionError::other(anyhow!(
3865                                "server is forbidding connections"
3866                            )))
3867                        } else {
3868                            let (client_conn, server_conn, kill_conn) =
3869                                Connection::in_memory(cx.background());
3870                            connection_killers.lock().insert(user_id, kill_conn);
3871                            cx.background()
3872                                .spawn(server.handle_connection(
3873                                    server_conn,
3874                                    client_name,
3875                                    user_id,
3876                                    Some(connection_id_tx),
3877                                    cx.background(),
3878                                ))
3879                                .detach();
3880                            Ok(client_conn)
3881                        }
3882                    })
3883                });
3884
3885            client
3886                .authenticate_and_connect(&cx.to_async())
3887                .await
3888                .unwrap();
3889
3890            Channel::init(&client);
3891            Project::init(&client);
3892
3893            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3894            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3895            let mut authed_user =
3896                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3897            while authed_user.next().await.unwrap().is_none() {}
3898
3899            TestClient {
3900                client,
3901                peer_id,
3902                user_store,
3903                project: Default::default(),
3904                buffers: Default::default(),
3905            }
3906        }
3907
3908        fn disconnect_client(&self, user_id: UserId) {
3909            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3910                let _ = kill_conn.try_send(Some(()));
3911            }
3912        }
3913
3914        fn forbid_connections(&self) {
3915            self.forbid_connections.store(true, SeqCst);
3916        }
3917
3918        fn allow_connections(&self) {
3919            self.forbid_connections.store(false, SeqCst);
3920        }
3921
3922        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3923            let mut config = Config::default();
3924            config.session_secret = "a".repeat(32);
3925            config.database_url = test_db.url.clone();
3926            let github_client = github::AppClient::test();
3927            Arc::new(AppState {
3928                db: test_db.db().clone(),
3929                handlebars: Default::default(),
3930                auth_client: auth::build_client("", ""),
3931                repo_client: github::RepoClient::test(&github_client),
3932                github_client,
3933                config,
3934            })
3935        }
3936
3937        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3938            self.server.store.read()
3939        }
3940
3941        async fn condition<F>(&mut self, mut predicate: F)
3942        where
3943            F: FnMut(&Store) -> bool,
3944        {
3945            async_std::future::timeout(Duration::from_millis(500), async {
3946                while !(predicate)(&*self.server.store.read()) {
3947                    self.foreground.start_waiting();
3948                    self.notifications.next().await;
3949                    self.foreground.finish_waiting();
3950                }
3951            })
3952            .await
3953            .expect("condition timed out");
3954        }
3955    }
3956
3957    impl Drop for TestServer {
3958        fn drop(&mut self) {
3959            self.peer.reset();
3960        }
3961    }
3962
3963    struct TestClient {
3964        client: Arc<Client>,
3965        pub peer_id: PeerId,
3966        pub user_store: ModelHandle<UserStore>,
3967        project: Option<ModelHandle<Project>>,
3968        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3969    }
3970
3971    impl Deref for TestClient {
3972        type Target = Arc<Client>;
3973
3974        fn deref(&self) -> &Self::Target {
3975            &self.client
3976        }
3977    }
3978
3979    impl TestClient {
3980        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3981            UserId::from_proto(
3982                self.user_store
3983                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3984            )
3985        }
3986
3987        async fn simulate_host(
3988            mut self,
3989            project: ModelHandle<Project>,
3990            operations: Rc<Cell<usize>>,
3991            max_operations: usize,
3992            rng: Rc<RefCell<StdRng>>,
3993            mut cx: TestAppContext,
3994        ) -> (Self, TestAppContext) {
3995            let fs = project.read_with(&cx, |project, _| project.fs().clone());
3996            let mut files: Vec<PathBuf> = Default::default();
3997            while operations.get() < max_operations {
3998                operations.set(operations.get() + 1);
3999
4000                let distribution = rng.borrow_mut().gen_range(0..100);
4001                match distribution {
4002                    0..=20 if !files.is_empty() => {
4003                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
4004                        while let Some(parent_path) = path.parent() {
4005                            path = parent_path;
4006                            if rng.borrow_mut().gen() {
4007                                break;
4008                            }
4009                        }
4010
4011                        log::info!("Host: find/create local worktree {:?}", path);
4012                        project
4013                            .update(&mut cx, |project, cx| {
4014                                project.find_or_create_local_worktree(path, false, cx)
4015                            })
4016                            .await
4017                            .unwrap();
4018                    }
4019                    10..=80 if !files.is_empty() => {
4020                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4021                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4022                            let (worktree, path) = project
4023                                .update(&mut cx, |project, cx| {
4024                                    project.find_or_create_local_worktree(file, false, cx)
4025                                })
4026                                .await
4027                                .unwrap();
4028                            let project_path =
4029                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4030                            log::info!("Host: opening path {:?}", project_path);
4031                            let buffer = project
4032                                .update(&mut cx, |project, cx| {
4033                                    project.open_buffer(project_path, cx)
4034                                })
4035                                .await
4036                                .unwrap();
4037                            self.buffers.insert(buffer.clone());
4038                            buffer
4039                        } else {
4040                            self.buffers
4041                                .iter()
4042                                .choose(&mut *rng.borrow_mut())
4043                                .unwrap()
4044                                .clone()
4045                        };
4046
4047                        if rng.borrow_mut().gen_bool(0.1) {
4048                            cx.update(|cx| {
4049                                log::info!(
4050                                    "Host: dropping buffer {:?}",
4051                                    buffer.read(cx).file().unwrap().full_path(cx)
4052                                );
4053                                self.buffers.remove(&buffer);
4054                                drop(buffer);
4055                            });
4056                        } else {
4057                            buffer.update(&mut cx, |buffer, cx| {
4058                                log::info!(
4059                                    "Host: updating buffer {:?}",
4060                                    buffer.file().unwrap().full_path(cx)
4061                                );
4062                                buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4063                            });
4064                        }
4065                    }
4066                    _ => loop {
4067                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
4068                        let mut path = PathBuf::new();
4069                        path.push("/");
4070                        for _ in 0..path_component_count {
4071                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4072                            path.push(std::str::from_utf8(&[letter]).unwrap());
4073                        }
4074                        path.set_extension("rs");
4075                        let parent_path = path.parent().unwrap();
4076
4077                        log::info!("Host: creating file {:?}", path);
4078                        if fs.create_dir(&parent_path).await.is_ok()
4079                            && fs.create_file(&path, Default::default()).await.is_ok()
4080                        {
4081                            files.push(path);
4082                            break;
4083                        } else {
4084                            log::info!("Host: cannot create file");
4085                        }
4086                    },
4087                }
4088
4089                cx.background().simulate_random_delay().await;
4090            }
4091
4092            self.project = Some(project);
4093            (self, cx)
4094        }
4095
4096        pub async fn simulate_guest(
4097            mut self,
4098            guest_id: usize,
4099            project: ModelHandle<Project>,
4100            operations: Rc<Cell<usize>>,
4101            max_operations: usize,
4102            rng: Rc<RefCell<StdRng>>,
4103            mut cx: TestAppContext,
4104        ) -> (Self, TestAppContext) {
4105            while operations.get() < max_operations {
4106                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4107                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4108                        project
4109                            .worktrees(&cx)
4110                            .filter(|worktree| {
4111                                worktree.read(cx).entries(false).any(|e| e.is_file())
4112                            })
4113                            .choose(&mut *rng.borrow_mut())
4114                    }) {
4115                        worktree
4116                    } else {
4117                        cx.background().simulate_random_delay().await;
4118                        continue;
4119                    };
4120
4121                    operations.set(operations.get() + 1);
4122                    let project_path = worktree.read_with(&cx, |worktree, _| {
4123                        let entry = worktree
4124                            .entries(false)
4125                            .filter(|e| e.is_file())
4126                            .choose(&mut *rng.borrow_mut())
4127                            .unwrap();
4128                        (worktree.id(), entry.path.clone())
4129                    });
4130                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4131                    let buffer = project
4132                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4133                        .await
4134                        .unwrap();
4135                    self.buffers.insert(buffer.clone());
4136                    buffer
4137                } else {
4138                    operations.set(operations.get() + 1);
4139
4140                    self.buffers
4141                        .iter()
4142                        .choose(&mut *rng.borrow_mut())
4143                        .unwrap()
4144                        .clone()
4145                };
4146
4147                let choice = rng.borrow_mut().gen_range(0..100);
4148                match choice {
4149                    0..=9 => {
4150                        cx.update(|cx| {
4151                            log::info!(
4152                                "Guest {}: dropping buffer {:?}",
4153                                guest_id,
4154                                buffer.read(cx).file().unwrap().full_path(cx)
4155                            );
4156                            self.buffers.remove(&buffer);
4157                            drop(buffer);
4158                        });
4159                    }
4160                    10..=19 => {
4161                        let completions = project.update(&mut cx, |project, cx| {
4162                            log::info!(
4163                                "Guest {}: requesting completions for buffer {:?}",
4164                                guest_id,
4165                                buffer.read(cx).file().unwrap().full_path(cx)
4166                            );
4167                            let offset = rng.borrow_mut().gen_range(0..=buffer.read(cx).len());
4168                            project.completions(&buffer, offset, cx)
4169                        });
4170                        let completions = cx.background().spawn(async move {
4171                            completions.await.expect("completions request failed");
4172                        });
4173                        if rng.borrow_mut().gen_bool(0.3) {
4174                            log::info!("Guest {}: detaching completions request", guest_id);
4175                            completions.detach();
4176                        } else {
4177                            completions.await;
4178                        }
4179                    }
4180                    20..=29 => {
4181                        let code_actions = project.update(&mut cx, |project, cx| {
4182                            log::info!(
4183                                "Guest {}: requesting code actions for buffer {:?}",
4184                                guest_id,
4185                                buffer.read(cx).file().unwrap().full_path(cx)
4186                            );
4187                            let range =
4188                                buffer.read(cx).random_byte_range(0, &mut *rng.borrow_mut());
4189                            project.code_actions(&buffer, range, cx)
4190                        });
4191                        let code_actions = cx.background().spawn(async move {
4192                            code_actions.await.expect("code actions request failed");
4193                        });
4194                        if rng.borrow_mut().gen_bool(0.3) {
4195                            log::info!("Guest {}: detaching code actions request", guest_id);
4196                            code_actions.detach();
4197                        } else {
4198                            code_actions.await;
4199                        }
4200                    }
4201                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4202                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4203                            log::info!(
4204                                "Guest {}: saving buffer {:?}",
4205                                guest_id,
4206                                buffer.file().unwrap().full_path(cx)
4207                            );
4208                            (buffer.version(), buffer.save(cx))
4209                        });
4210                        let save = cx.spawn(|cx| async move {
4211                            let (saved_version, _) = save.await.expect("save request failed");
4212                            buffer.read_with(&cx, |buffer, _| {
4213                                assert!(buffer.version().observed_all(&saved_version));
4214                                assert!(saved_version.observed_all(&requested_version));
4215                            });
4216                        });
4217                        if rng.borrow_mut().gen_bool(0.3) {
4218                            log::info!("Guest {}: detaching save request", guest_id);
4219                            save.detach();
4220                        } else {
4221                            save.await;
4222                        }
4223                    }
4224                    _ => {
4225                        buffer.update(&mut cx, |buffer, cx| {
4226                            log::info!(
4227                                "Guest {}: updating buffer {:?}",
4228                                guest_id,
4229                                buffer.file().unwrap().full_path(cx)
4230                            );
4231                            buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4232                        });
4233                    }
4234                }
4235                cx.background().simulate_random_delay().await;
4236            }
4237
4238            self.project = Some(project);
4239            (self, cx)
4240        }
4241    }
4242
4243    impl Executor for Arc<gpui::executor::Background> {
4244        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4245            self.spawn(future).detach();
4246        }
4247    }
4248
4249    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4250        channel
4251            .messages()
4252            .cursor::<()>()
4253            .map(|m| {
4254                (
4255                    m.sender.github_login.clone(),
4256                    m.body.clone(),
4257                    m.is_pending(),
4258                )
4259            })
4260            .collect()
4261    }
4262
4263    struct EmptyView;
4264
4265    impl gpui::Entity for EmptyView {
4266        type Event = ();
4267    }
4268
4269    impl gpui::View for EmptyView {
4270        fn ui_name() -> &'static str {
4271            "empty view"
4272        }
4273
4274        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4275            gpui::Element::boxed(gpui::elements::Empty)
4276        }
4277    }
4278}