rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::open_buffer)
  83            .add_message_handler(Server::close_buffer)
  84            .add_request_handler(Server::update_buffer)
  85            .add_message_handler(Server::update_buffer_file)
  86            .add_message_handler(Server::buffer_reloaded)
  87            .add_message_handler(Server::buffer_saved)
  88            .add_request_handler(Server::save_buffer)
  89            .add_request_handler(Server::format_buffers)
  90            .add_request_handler(Server::get_completions)
  91            .add_request_handler(Server::apply_additional_edits_for_completion)
  92            .add_request_handler(Server::get_code_actions)
  93            .add_request_handler(Server::apply_code_action)
  94            .add_request_handler(Server::get_channels)
  95            .add_request_handler(Server::get_users)
  96            .add_request_handler(Server::join_channel)
  97            .add_message_handler(Server::leave_channel)
  98            .add_request_handler(Server::send_channel_message)
  99            .add_request_handler(Server::get_channel_messages);
 100
 101        Arc::new(server)
 102    }
 103
 104    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 105    where
 106        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 107        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 108        M: EnvelopedMessage,
 109    {
 110        let prev_handler = self.handlers.insert(
 111            TypeId::of::<M>(),
 112            Box::new(move |server, envelope| {
 113                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 114                (handler)(server, *envelope).boxed()
 115            }),
 116        );
 117        if prev_handler.is_some() {
 118            panic!("registered a handler for the same message twice");
 119        }
 120        self
 121    }
 122
 123    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 124    where
 125        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 126        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 127        M: RequestMessage,
 128    {
 129        self.add_message_handler(move |server, envelope| {
 130            let receipt = envelope.receipt();
 131            let response = (handler)(server.clone(), envelope);
 132            async move {
 133                match response.await {
 134                    Ok(response) => {
 135                        server.peer.respond(receipt, response)?;
 136                        Ok(())
 137                    }
 138                    Err(error) => {
 139                        server.peer.respond_with_error(
 140                            receipt,
 141                            proto::Error {
 142                                message: error.to_string(),
 143                            },
 144                        )?;
 145                        Err(error)
 146                    }
 147                }
 148            }
 149        })
 150    }
 151
 152    pub fn handle_connection<E: Executor>(
 153        self: &Arc<Self>,
 154        connection: Connection,
 155        addr: String,
 156        user_id: UserId,
 157        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 158        executor: E,
 159    ) -> impl Future<Output = ()> {
 160        let mut this = self.clone();
 161        async move {
 162            let (connection_id, handle_io, mut incoming_rx) =
 163                this.peer.add_connection(connection).await;
 164
 165            if let Some(send_connection_id) = send_connection_id.as_mut() {
 166                let _ = send_connection_id.send(connection_id).await;
 167            }
 168
 169            this.state_mut().add_connection(connection_id, user_id);
 170            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 171                log::error!("error updating contacts for {:?}: {}", user_id, err);
 172            }
 173
 174            let handle_io = handle_io.fuse();
 175            futures::pin_mut!(handle_io);
 176            loop {
 177                let next_message = incoming_rx.next().fuse();
 178                futures::pin_mut!(next_message);
 179                futures::select_biased! {
 180                    result = handle_io => {
 181                        if let Err(err) = result {
 182                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 183                        }
 184                        break;
 185                    }
 186                    message = next_message => {
 187                        if let Some(message) = message {
 188                            let start_time = Instant::now();
 189                            let type_name = message.payload_type_name();
 190                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 191                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 192                                let notifications = this.notifications.clone();
 193                                let is_background = message.is_background();
 194                                let handle_message = (handler)(this.clone(), message);
 195                                let handle_message = async move {
 196                                    if let Err(err) = handle_message.await {
 197                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 198                                    } else {
 199                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 200                                    }
 201                                    if let Some(mut notifications) = notifications {
 202                                        let _ = notifications.send(()).await;
 203                                    }
 204                                };
 205                                if is_background {
 206                                    executor.spawn_detached(handle_message);
 207                                } else {
 208                                    handle_message.await;
 209                                }
 210                            } else {
 211                                log::warn!("unhandled message: {}", type_name);
 212                            }
 213                        } else {
 214                            log::info!("rpc connection closed {:?}", addr);
 215                            break;
 216                        }
 217                    }
 218                }
 219            }
 220
 221            if let Err(err) = this.sign_out(connection_id).await {
 222                log::error!("error signing out connection {:?} - {:?}", addr, err);
 223            }
 224        }
 225    }
 226
 227    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 228        self.peer.disconnect(connection_id);
 229        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 230
 231        for (project_id, project) in removed_connection.hosted_projects {
 232            if let Some(share) = project.share {
 233                broadcast(
 234                    connection_id,
 235                    share.guests.keys().copied().collect(),
 236                    |conn_id| {
 237                        self.peer
 238                            .send(conn_id, proto::UnshareProject { project_id })
 239                    },
 240                )?;
 241            }
 242        }
 243
 244        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 245            broadcast(connection_id, peer_ids, |conn_id| {
 246                self.peer.send(
 247                    conn_id,
 248                    proto::RemoveProjectCollaborator {
 249                        project_id,
 250                        peer_id: connection_id.0,
 251                    },
 252                )
 253            })?;
 254        }
 255
 256        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 257        Ok(())
 258    }
 259
 260    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 261        Ok(proto::Ack {})
 262    }
 263
 264    async fn register_project(
 265        mut self: Arc<Server>,
 266        request: TypedEnvelope<proto::RegisterProject>,
 267    ) -> tide::Result<proto::RegisterProjectResponse> {
 268        let project_id = {
 269            let mut state = self.state_mut();
 270            let user_id = state.user_id_for_connection(request.sender_id)?;
 271            state.register_project(request.sender_id, user_id)
 272        };
 273        Ok(proto::RegisterProjectResponse { project_id })
 274    }
 275
 276    async fn unregister_project(
 277        mut self: Arc<Server>,
 278        request: TypedEnvelope<proto::UnregisterProject>,
 279    ) -> tide::Result<()> {
 280        let project = self
 281            .state_mut()
 282            .unregister_project(request.payload.project_id, request.sender_id)?;
 283        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 284        Ok(())
 285    }
 286
 287    async fn share_project(
 288        mut self: Arc<Server>,
 289        request: TypedEnvelope<proto::ShareProject>,
 290    ) -> tide::Result<proto::Ack> {
 291        self.state_mut()
 292            .share_project(request.payload.project_id, request.sender_id);
 293        Ok(proto::Ack {})
 294    }
 295
 296    async fn unshare_project(
 297        mut self: Arc<Server>,
 298        request: TypedEnvelope<proto::UnshareProject>,
 299    ) -> tide::Result<()> {
 300        let project_id = request.payload.project_id;
 301        let project = self
 302            .state_mut()
 303            .unshare_project(project_id, request.sender_id)?;
 304
 305        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 306            self.peer
 307                .send(conn_id, proto::UnshareProject { project_id })
 308        })?;
 309        self.update_contacts_for_users(&project.authorized_user_ids)?;
 310        Ok(())
 311    }
 312
 313    async fn join_project(
 314        mut self: Arc<Server>,
 315        request: TypedEnvelope<proto::JoinProject>,
 316    ) -> tide::Result<proto::JoinProjectResponse> {
 317        let project_id = request.payload.project_id;
 318
 319        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 320        let (response, connection_ids, contact_user_ids) = self
 321            .state_mut()
 322            .join_project(request.sender_id, user_id, project_id)
 323            .and_then(|joined| {
 324                let share = joined.project.share()?;
 325                let peer_count = share.guests.len();
 326                let mut collaborators = Vec::with_capacity(peer_count);
 327                collaborators.push(proto::Collaborator {
 328                    peer_id: joined.project.host_connection_id.0,
 329                    replica_id: 0,
 330                    user_id: joined.project.host_user_id.to_proto(),
 331                });
 332                let worktrees = joined
 333                    .project
 334                    .worktrees
 335                    .iter()
 336                    .filter_map(|(id, worktree)| {
 337                        worktree.share.as_ref().map(|share| proto::Worktree {
 338                            id: *id,
 339                            root_name: worktree.root_name.clone(),
 340                            entries: share.entries.values().cloned().collect(),
 341                            diagnostic_summaries: share
 342                                .diagnostic_summaries
 343                                .values()
 344                                .cloned()
 345                                .collect(),
 346                            weak: worktree.weak,
 347                            next_update_id: share.next_update_id as u64,
 348                        })
 349                    })
 350                    .collect();
 351                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 352                    if *peer_conn_id != request.sender_id {
 353                        collaborators.push(proto::Collaborator {
 354                            peer_id: peer_conn_id.0,
 355                            replica_id: *peer_replica_id as u32,
 356                            user_id: peer_user_id.to_proto(),
 357                        });
 358                    }
 359                }
 360                let response = proto::JoinProjectResponse {
 361                    worktrees,
 362                    replica_id: joined.replica_id as u32,
 363                    collaborators,
 364                };
 365                let connection_ids = joined.project.connection_ids();
 366                let contact_user_ids = joined.project.authorized_user_ids();
 367                Ok((response, connection_ids, contact_user_ids))
 368            })?;
 369
 370        broadcast(request.sender_id, connection_ids, |conn_id| {
 371            self.peer.send(
 372                conn_id,
 373                proto::AddProjectCollaborator {
 374                    project_id,
 375                    collaborator: Some(proto::Collaborator {
 376                        peer_id: request.sender_id.0,
 377                        replica_id: response.replica_id,
 378                        user_id: user_id.to_proto(),
 379                    }),
 380                },
 381            )
 382        })?;
 383        self.update_contacts_for_users(&contact_user_ids)?;
 384        Ok(response)
 385    }
 386
 387    async fn leave_project(
 388        mut self: Arc<Server>,
 389        request: TypedEnvelope<proto::LeaveProject>,
 390    ) -> tide::Result<()> {
 391        let sender_id = request.sender_id;
 392        let project_id = request.payload.project_id;
 393        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 394
 395        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 396            self.peer.send(
 397                conn_id,
 398                proto::RemoveProjectCollaborator {
 399                    project_id,
 400                    peer_id: sender_id.0,
 401                },
 402            )
 403        })?;
 404        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 405
 406        Ok(())
 407    }
 408
 409    async fn register_worktree(
 410        mut self: Arc<Server>,
 411        request: TypedEnvelope<proto::RegisterWorktree>,
 412    ) -> tide::Result<proto::Ack> {
 413        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 414
 415        let mut contact_user_ids = HashSet::default();
 416        contact_user_ids.insert(host_user_id);
 417        for github_login in request.payload.authorized_logins {
 418            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 419            contact_user_ids.insert(contact_user_id);
 420        }
 421
 422        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 423        self.state_mut().register_worktree(
 424            request.payload.project_id,
 425            request.payload.worktree_id,
 426            request.sender_id,
 427            Worktree {
 428                authorized_user_ids: contact_user_ids.clone(),
 429                root_name: request.payload.root_name,
 430                share: None,
 431                weak: false,
 432            },
 433        )?;
 434        self.update_contacts_for_users(&contact_user_ids)?;
 435        Ok(proto::Ack {})
 436    }
 437
 438    async fn unregister_worktree(
 439        mut self: Arc<Server>,
 440        request: TypedEnvelope<proto::UnregisterWorktree>,
 441    ) -> tide::Result<()> {
 442        let project_id = request.payload.project_id;
 443        let worktree_id = request.payload.worktree_id;
 444        let (worktree, guest_connection_ids) =
 445            self.state_mut()
 446                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 447        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 448            self.peer.send(
 449                conn_id,
 450                proto::UnregisterWorktree {
 451                    project_id,
 452                    worktree_id,
 453                },
 454            )
 455        })?;
 456        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 457        Ok(())
 458    }
 459
 460    async fn share_worktree(
 461        mut self: Arc<Server>,
 462        mut request: TypedEnvelope<proto::ShareWorktree>,
 463    ) -> tide::Result<proto::Ack> {
 464        let worktree = request
 465            .payload
 466            .worktree
 467            .as_mut()
 468            .ok_or_else(|| anyhow!("missing worktree"))?;
 469        let entries = worktree
 470            .entries
 471            .iter()
 472            .map(|entry| (entry.id, entry.clone()))
 473            .collect();
 474        let diagnostic_summaries = worktree
 475            .diagnostic_summaries
 476            .iter()
 477            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 478            .collect();
 479
 480        let shared_worktree = self.state_mut().share_worktree(
 481            request.payload.project_id,
 482            worktree.id,
 483            request.sender_id,
 484            entries,
 485            diagnostic_summaries,
 486            worktree.next_update_id,
 487        )?;
 488
 489        broadcast(
 490            request.sender_id,
 491            shared_worktree.connection_ids,
 492            |connection_id| {
 493                self.peer
 494                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 495            },
 496        )?;
 497        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 498
 499        Ok(proto::Ack {})
 500    }
 501
 502    async fn update_worktree(
 503        mut self: Arc<Server>,
 504        request: TypedEnvelope<proto::UpdateWorktree>,
 505    ) -> tide::Result<proto::Ack> {
 506        let connection_ids = self.state_mut().update_worktree(
 507            request.sender_id,
 508            request.payload.project_id,
 509            request.payload.worktree_id,
 510            request.payload.id,
 511            &request.payload.removed_entries,
 512            &request.payload.updated_entries,
 513        )?;
 514
 515        broadcast(request.sender_id, connection_ids, |connection_id| {
 516            self.peer
 517                .forward_send(request.sender_id, connection_id, request.payload.clone())
 518        })?;
 519
 520        Ok(proto::Ack {})
 521    }
 522
 523    async fn update_diagnostic_summary(
 524        mut self: Arc<Server>,
 525        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 526    ) -> tide::Result<()> {
 527        let summary = request
 528            .payload
 529            .summary
 530            .clone()
 531            .ok_or_else(|| anyhow!("invalid summary"))?;
 532        let receiver_ids = self.state_mut().update_diagnostic_summary(
 533            request.payload.project_id,
 534            request.payload.worktree_id,
 535            request.sender_id,
 536            summary,
 537        )?;
 538
 539        broadcast(request.sender_id, receiver_ids, |connection_id| {
 540            self.peer
 541                .forward_send(request.sender_id, connection_id, request.payload.clone())
 542        })?;
 543        Ok(())
 544    }
 545
 546    async fn disk_based_diagnostics_updating(
 547        self: Arc<Server>,
 548        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 549    ) -> tide::Result<()> {
 550        let receiver_ids = self
 551            .state()
 552            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 553        broadcast(request.sender_id, receiver_ids, |connection_id| {
 554            self.peer
 555                .forward_send(request.sender_id, connection_id, request.payload.clone())
 556        })?;
 557        Ok(())
 558    }
 559
 560    async fn disk_based_diagnostics_updated(
 561        self: Arc<Server>,
 562        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 563    ) -> tide::Result<()> {
 564        let receiver_ids = self
 565            .state()
 566            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 567        broadcast(request.sender_id, receiver_ids, |connection_id| {
 568            self.peer
 569                .forward_send(request.sender_id, connection_id, request.payload.clone())
 570        })?;
 571        Ok(())
 572    }
 573
 574    async fn get_definition(
 575        self: Arc<Server>,
 576        request: TypedEnvelope<proto::GetDefinition>,
 577    ) -> tide::Result<proto::GetDefinitionResponse> {
 578        let host_connection_id = self
 579            .state()
 580            .read_project(request.payload.project_id, request.sender_id)?
 581            .host_connection_id;
 582        Ok(self
 583            .peer
 584            .forward_request(request.sender_id, host_connection_id, request.payload)
 585            .await?)
 586    }
 587
 588    async fn open_buffer(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::OpenBuffer>,
 591    ) -> tide::Result<proto::OpenBufferResponse> {
 592        let host_connection_id = self
 593            .state()
 594            .read_project(request.payload.project_id, request.sender_id)?
 595            .host_connection_id;
 596        Ok(self
 597            .peer
 598            .forward_request(request.sender_id, host_connection_id, request.payload)
 599            .await?)
 600    }
 601
 602    async fn close_buffer(
 603        self: Arc<Server>,
 604        request: TypedEnvelope<proto::CloseBuffer>,
 605    ) -> tide::Result<()> {
 606        let host_connection_id = self
 607            .state()
 608            .read_project(request.payload.project_id, request.sender_id)?
 609            .host_connection_id;
 610        self.peer
 611            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 612        Ok(())
 613    }
 614
 615    async fn save_buffer(
 616        self: Arc<Server>,
 617        request: TypedEnvelope<proto::SaveBuffer>,
 618    ) -> tide::Result<proto::BufferSaved> {
 619        let host;
 620        let mut guests;
 621        {
 622            let state = self.state();
 623            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 624            host = project.host_connection_id;
 625            guests = project.guest_connection_ids()
 626        }
 627
 628        let response = self
 629            .peer
 630            .forward_request(request.sender_id, host, request.payload.clone())
 631            .await?;
 632
 633        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 634        broadcast(host, guests, |conn_id| {
 635            self.peer.forward_send(host, conn_id, response.clone())
 636        })?;
 637
 638        Ok(response)
 639    }
 640
 641    async fn format_buffers(
 642        self: Arc<Server>,
 643        request: TypedEnvelope<proto::FormatBuffers>,
 644    ) -> tide::Result<proto::FormatBuffersResponse> {
 645        let host = self
 646            .state()
 647            .read_project(request.payload.project_id, request.sender_id)?
 648            .host_connection_id;
 649        Ok(self
 650            .peer
 651            .forward_request(request.sender_id, host, request.payload.clone())
 652            .await?)
 653    }
 654
 655    async fn get_completions(
 656        self: Arc<Server>,
 657        request: TypedEnvelope<proto::GetCompletions>,
 658    ) -> tide::Result<proto::GetCompletionsResponse> {
 659        let host = self
 660            .state()
 661            .read_project(request.payload.project_id, request.sender_id)?
 662            .host_connection_id;
 663        Ok(self
 664            .peer
 665            .forward_request(request.sender_id, host, request.payload.clone())
 666            .await?)
 667    }
 668
 669    async fn apply_additional_edits_for_completion(
 670        self: Arc<Server>,
 671        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 672    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 673        let host = self
 674            .state()
 675            .read_project(request.payload.project_id, request.sender_id)?
 676            .host_connection_id;
 677        Ok(self
 678            .peer
 679            .forward_request(request.sender_id, host, request.payload.clone())
 680            .await?)
 681    }
 682
 683    async fn get_code_actions(
 684        self: Arc<Server>,
 685        request: TypedEnvelope<proto::GetCodeActions>,
 686    ) -> tide::Result<proto::GetCodeActionsResponse> {
 687        let host = self
 688            .state()
 689            .read_project(request.payload.project_id, request.sender_id)?
 690            .host_connection_id;
 691        Ok(self
 692            .peer
 693            .forward_request(request.sender_id, host, request.payload.clone())
 694            .await?)
 695    }
 696
 697    async fn apply_code_action(
 698        self: Arc<Server>,
 699        request: TypedEnvelope<proto::ApplyCodeAction>,
 700    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 701        let host = self
 702            .state()
 703            .read_project(request.payload.project_id, request.sender_id)?
 704            .host_connection_id;
 705        Ok(self
 706            .peer
 707            .forward_request(request.sender_id, host, request.payload.clone())
 708            .await?)
 709    }
 710
 711    async fn update_buffer(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<proto::UpdateBuffer>,
 714    ) -> tide::Result<proto::Ack> {
 715        let receiver_ids = self
 716            .state()
 717            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 718        broadcast(request.sender_id, receiver_ids, |connection_id| {
 719            self.peer
 720                .forward_send(request.sender_id, connection_id, request.payload.clone())
 721        })?;
 722        Ok(proto::Ack {})
 723    }
 724
 725    async fn update_buffer_file(
 726        self: Arc<Server>,
 727        request: TypedEnvelope<proto::UpdateBufferFile>,
 728    ) -> tide::Result<()> {
 729        let receiver_ids = self
 730            .state()
 731            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 732        broadcast(request.sender_id, receiver_ids, |connection_id| {
 733            self.peer
 734                .forward_send(request.sender_id, connection_id, request.payload.clone())
 735        })?;
 736        Ok(())
 737    }
 738
 739    async fn buffer_reloaded(
 740        self: Arc<Server>,
 741        request: TypedEnvelope<proto::BufferReloaded>,
 742    ) -> tide::Result<()> {
 743        let receiver_ids = self
 744            .state()
 745            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 746        broadcast(request.sender_id, receiver_ids, |connection_id| {
 747            self.peer
 748                .forward_send(request.sender_id, connection_id, request.payload.clone())
 749        })?;
 750        Ok(())
 751    }
 752
 753    async fn buffer_saved(
 754        self: Arc<Server>,
 755        request: TypedEnvelope<proto::BufferSaved>,
 756    ) -> tide::Result<()> {
 757        let receiver_ids = self
 758            .state()
 759            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 760        broadcast(request.sender_id, receiver_ids, |connection_id| {
 761            self.peer
 762                .forward_send(request.sender_id, connection_id, request.payload.clone())
 763        })?;
 764        Ok(())
 765    }
 766
 767    async fn get_channels(
 768        self: Arc<Server>,
 769        request: TypedEnvelope<proto::GetChannels>,
 770    ) -> tide::Result<proto::GetChannelsResponse> {
 771        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 772        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 773        Ok(proto::GetChannelsResponse {
 774            channels: channels
 775                .into_iter()
 776                .map(|chan| proto::Channel {
 777                    id: chan.id.to_proto(),
 778                    name: chan.name,
 779                })
 780                .collect(),
 781        })
 782    }
 783
 784    async fn get_users(
 785        self: Arc<Server>,
 786        request: TypedEnvelope<proto::GetUsers>,
 787    ) -> tide::Result<proto::GetUsersResponse> {
 788        let user_ids = request
 789            .payload
 790            .user_ids
 791            .into_iter()
 792            .map(UserId::from_proto)
 793            .collect();
 794        let users = self
 795            .app_state
 796            .db
 797            .get_users_by_ids(user_ids)
 798            .await?
 799            .into_iter()
 800            .map(|user| proto::User {
 801                id: user.id.to_proto(),
 802                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 803                github_login: user.github_login,
 804            })
 805            .collect();
 806        Ok(proto::GetUsersResponse { users })
 807    }
 808
 809    fn update_contacts_for_users<'a>(
 810        self: &Arc<Server>,
 811        user_ids: impl IntoIterator<Item = &'a UserId>,
 812    ) -> anyhow::Result<()> {
 813        let mut result = Ok(());
 814        let state = self.state();
 815        for user_id in user_ids {
 816            let contacts = state.contacts_for_user(*user_id);
 817            for connection_id in state.connection_ids_for_user(*user_id) {
 818                if let Err(error) = self.peer.send(
 819                    connection_id,
 820                    proto::UpdateContacts {
 821                        contacts: contacts.clone(),
 822                    },
 823                ) {
 824                    result = Err(error);
 825                }
 826            }
 827        }
 828        result
 829    }
 830
 831    async fn join_channel(
 832        mut self: Arc<Self>,
 833        request: TypedEnvelope<proto::JoinChannel>,
 834    ) -> tide::Result<proto::JoinChannelResponse> {
 835        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 836        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 837        if !self
 838            .app_state
 839            .db
 840            .can_user_access_channel(user_id, channel_id)
 841            .await?
 842        {
 843            Err(anyhow!("access denied"))?;
 844        }
 845
 846        self.state_mut().join_channel(request.sender_id, channel_id);
 847        let messages = self
 848            .app_state
 849            .db
 850            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 851            .await?
 852            .into_iter()
 853            .map(|msg| proto::ChannelMessage {
 854                id: msg.id.to_proto(),
 855                body: msg.body,
 856                timestamp: msg.sent_at.unix_timestamp() as u64,
 857                sender_id: msg.sender_id.to_proto(),
 858                nonce: Some(msg.nonce.as_u128().into()),
 859            })
 860            .collect::<Vec<_>>();
 861        Ok(proto::JoinChannelResponse {
 862            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 863            messages,
 864        })
 865    }
 866
 867    async fn leave_channel(
 868        mut self: Arc<Self>,
 869        request: TypedEnvelope<proto::LeaveChannel>,
 870    ) -> tide::Result<()> {
 871        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 872        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 873        if !self
 874            .app_state
 875            .db
 876            .can_user_access_channel(user_id, channel_id)
 877            .await?
 878        {
 879            Err(anyhow!("access denied"))?;
 880        }
 881
 882        self.state_mut()
 883            .leave_channel(request.sender_id, channel_id);
 884
 885        Ok(())
 886    }
 887
 888    async fn send_channel_message(
 889        self: Arc<Self>,
 890        request: TypedEnvelope<proto::SendChannelMessage>,
 891    ) -> tide::Result<proto::SendChannelMessageResponse> {
 892        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 893        let user_id;
 894        let connection_ids;
 895        {
 896            let state = self.state();
 897            user_id = state.user_id_for_connection(request.sender_id)?;
 898            connection_ids = state.channel_connection_ids(channel_id)?;
 899        }
 900
 901        // Validate the message body.
 902        let body = request.payload.body.trim().to_string();
 903        if body.len() > MAX_MESSAGE_LEN {
 904            return Err(anyhow!("message is too long"))?;
 905        }
 906        if body.is_empty() {
 907            return Err(anyhow!("message can't be blank"))?;
 908        }
 909
 910        let timestamp = OffsetDateTime::now_utc();
 911        let nonce = request
 912            .payload
 913            .nonce
 914            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 915
 916        let message_id = self
 917            .app_state
 918            .db
 919            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 920            .await?
 921            .to_proto();
 922        let message = proto::ChannelMessage {
 923            sender_id: user_id.to_proto(),
 924            id: message_id,
 925            body,
 926            timestamp: timestamp.unix_timestamp() as u64,
 927            nonce: Some(nonce),
 928        };
 929        broadcast(request.sender_id, connection_ids, |conn_id| {
 930            self.peer.send(
 931                conn_id,
 932                proto::ChannelMessageSent {
 933                    channel_id: channel_id.to_proto(),
 934                    message: Some(message.clone()),
 935                },
 936            )
 937        })?;
 938        Ok(proto::SendChannelMessageResponse {
 939            message: Some(message),
 940        })
 941    }
 942
 943    async fn get_channel_messages(
 944        self: Arc<Self>,
 945        request: TypedEnvelope<proto::GetChannelMessages>,
 946    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 947        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 948        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 949        if !self
 950            .app_state
 951            .db
 952            .can_user_access_channel(user_id, channel_id)
 953            .await?
 954        {
 955            Err(anyhow!("access denied"))?;
 956        }
 957
 958        let messages = self
 959            .app_state
 960            .db
 961            .get_channel_messages(
 962                channel_id,
 963                MESSAGE_COUNT_PER_PAGE,
 964                Some(MessageId::from_proto(request.payload.before_message_id)),
 965            )
 966            .await?
 967            .into_iter()
 968            .map(|msg| proto::ChannelMessage {
 969                id: msg.id.to_proto(),
 970                body: msg.body,
 971                timestamp: msg.sent_at.unix_timestamp() as u64,
 972                sender_id: msg.sender_id.to_proto(),
 973                nonce: Some(msg.nonce.as_u128().into()),
 974            })
 975            .collect::<Vec<_>>();
 976
 977        Ok(proto::GetChannelMessagesResponse {
 978            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 979            messages,
 980        })
 981    }
 982
 983    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 984        self.store.read()
 985    }
 986
 987    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 988        self.store.write()
 989    }
 990}
 991
 992impl Executor for RealExecutor {
 993    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 994        task::spawn(future);
 995    }
 996}
 997
 998fn broadcast<F>(
 999    sender_id: ConnectionId,
1000    receiver_ids: Vec<ConnectionId>,
1001    mut f: F,
1002) -> anyhow::Result<()>
1003where
1004    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1005{
1006    let mut result = Ok(());
1007    for receiver_id in receiver_ids {
1008        if receiver_id != sender_id {
1009            if let Err(error) = f(receiver_id) {
1010                if result.is_ok() {
1011                    result = Err(error);
1012                }
1013            }
1014        }
1015    }
1016    result
1017}
1018
1019pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1020    let server = Server::new(app.state().clone(), rpc.clone(), None);
1021    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1022        let server = server.clone();
1023        async move {
1024            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1025
1026            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1027            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1028            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1029            let client_protocol_version: Option<u32> = request
1030                .header("X-Zed-Protocol-Version")
1031                .and_then(|v| v.as_str().parse().ok());
1032
1033            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1034                return Ok(Response::new(StatusCode::UpgradeRequired));
1035            }
1036
1037            let header = match request.header("Sec-Websocket-Key") {
1038                Some(h) => h.as_str(),
1039                None => return Err(anyhow!("expected sec-websocket-key"))?,
1040            };
1041
1042            let user_id = process_auth_header(&request).await?;
1043
1044            let mut response = Response::new(StatusCode::SwitchingProtocols);
1045            response.insert_header(UPGRADE, "websocket");
1046            response.insert_header(CONNECTION, "Upgrade");
1047            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1048            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1049            response.insert_header("Sec-Websocket-Version", "13");
1050
1051            let http_res: &mut tide::http::Response = response.as_mut();
1052            let upgrade_receiver = http_res.recv_upgrade().await;
1053            let addr = request.remote().unwrap_or("unknown").to_string();
1054            task::spawn(async move {
1055                if let Some(stream) = upgrade_receiver.await {
1056                    server
1057                        .handle_connection(
1058                            Connection::new(
1059                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1060                            ),
1061                            addr,
1062                            user_id,
1063                            None,
1064                            RealExecutor,
1065                        )
1066                        .await;
1067                }
1068            });
1069
1070            Ok(response)
1071        }
1072    });
1073}
1074
1075fn header_contains_ignore_case<T>(
1076    request: &tide::Request<T>,
1077    header_name: HeaderName,
1078    value: &str,
1079) -> bool {
1080    request
1081        .header(header_name)
1082        .map(|h| {
1083            h.as_str()
1084                .split(',')
1085                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1086        })
1087        .unwrap_or(false)
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use super::*;
1093    use crate::{
1094        auth,
1095        db::{tests::TestDb, UserId},
1096        github, AppState, Config,
1097    };
1098    use ::rpc::Peer;
1099    use collections::BTreeMap;
1100    use gpui::{executor, ModelHandle, TestAppContext};
1101    use parking_lot::Mutex;
1102    use postage::{sink::Sink, watch};
1103    use rand::prelude::*;
1104    use rpc::PeerId;
1105    use serde_json::json;
1106    use sqlx::types::time::OffsetDateTime;
1107    use std::{
1108        cell::{Cell, RefCell},
1109        env,
1110        ops::Deref,
1111        path::Path,
1112        rc::Rc,
1113        sync::{
1114            atomic::{AtomicBool, Ordering::SeqCst},
1115            Arc,
1116        },
1117        time::Duration,
1118    };
1119    use zed::{
1120        client::{
1121            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1122            EstablishConnectionError, UserStore,
1123        },
1124        editor::{
1125            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1126            Redo, ToggleCodeActions, Undo,
1127        },
1128        fs::{FakeFs, Fs as _},
1129        language::{
1130            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1131            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1132        },
1133        lsp,
1134        project::{DiagnosticSummary, Project, ProjectPath},
1135        workspace::{Workspace, WorkspaceParams},
1136    };
1137
1138    #[cfg(test)]
1139    #[ctor::ctor]
1140    fn init_logger() {
1141        if std::env::var("RUST_LOG").is_ok() {
1142            env_logger::init();
1143        }
1144    }
1145
1146    #[gpui::test(iterations = 10)]
1147    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1148        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1149        let lang_registry = Arc::new(LanguageRegistry::new());
1150        let fs = Arc::new(FakeFs::new(cx_a.background()));
1151        cx_a.foreground().forbid_parking();
1152
1153        // Connect to a server as 2 clients.
1154        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1155        let client_a = server.create_client(&mut cx_a, "user_a").await;
1156        let client_b = server.create_client(&mut cx_b, "user_b").await;
1157
1158        // Share a project as client A
1159        fs.insert_tree(
1160            "/a",
1161            json!({
1162                ".zed.toml": r#"collaborators = ["user_b"]"#,
1163                "a.txt": "a-contents",
1164                "b.txt": "b-contents",
1165            }),
1166        )
1167        .await;
1168        let project_a = cx_a.update(|cx| {
1169            Project::local(
1170                client_a.clone(),
1171                client_a.user_store.clone(),
1172                lang_registry.clone(),
1173                fs.clone(),
1174                cx,
1175            )
1176        });
1177        let (worktree_a, _) = project_a
1178            .update(&mut cx_a, |p, cx| {
1179                p.find_or_create_local_worktree("/a", false, cx)
1180            })
1181            .await
1182            .unwrap();
1183        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1184        worktree_a
1185            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1186            .await;
1187        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1188        project_a
1189            .update(&mut cx_a, |p, cx| p.share(cx))
1190            .await
1191            .unwrap();
1192
1193        // Join that project as client B
1194        let project_b = Project::remote(
1195            project_id,
1196            client_b.clone(),
1197            client_b.user_store.clone(),
1198            lang_registry.clone(),
1199            fs.clone(),
1200            &mut cx_b.to_async(),
1201        )
1202        .await
1203        .unwrap();
1204
1205        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1206            assert_eq!(
1207                project
1208                    .collaborators()
1209                    .get(&client_a.peer_id)
1210                    .unwrap()
1211                    .user
1212                    .github_login,
1213                "user_a"
1214            );
1215            project.replica_id()
1216        });
1217        project_a
1218            .condition(&cx_a, |tree, _| {
1219                tree.collaborators()
1220                    .get(&client_b.peer_id)
1221                    .map_or(false, |collaborator| {
1222                        collaborator.replica_id == replica_id_b
1223                            && collaborator.user.github_login == "user_b"
1224                    })
1225            })
1226            .await;
1227
1228        // Open the same file as client B and client A.
1229        let buffer_b = project_b
1230            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1231            .await
1232            .unwrap();
1233        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1234        buffer_b.read_with(&cx_b, |buf, cx| {
1235            assert_eq!(buf.read(cx).text(), "b-contents")
1236        });
1237        project_a.read_with(&cx_a, |project, cx| {
1238            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1239        });
1240        let buffer_a = project_a
1241            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1242            .await
1243            .unwrap();
1244
1245        let editor_b = cx_b.add_view(window_b, |cx| {
1246            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1247        });
1248
1249        // TODO
1250        // // Create a selection set as client B and see that selection set as client A.
1251        // buffer_a
1252        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1253        //     .await;
1254
1255        // Edit the buffer as client B and see that edit as client A.
1256        editor_b.update(&mut cx_b, |editor, cx| {
1257            editor.handle_input(&Input("ok, ".into()), cx)
1258        });
1259        buffer_a
1260            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1261            .await;
1262
1263        // TODO
1264        // // Remove the selection set as client B, see those selections disappear as client A.
1265        cx_b.update(move |_| drop(editor_b));
1266        // buffer_a
1267        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1268        //     .await;
1269
1270        // Close the buffer as client A, see that the buffer is closed.
1271        cx_a.update(move |_| drop(buffer_a));
1272        project_a
1273            .condition(&cx_a, |project, cx| {
1274                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1275            })
1276            .await;
1277
1278        // Dropping the client B's project removes client B from client A's collaborators.
1279        cx_b.update(move |_| drop(project_b));
1280        project_a
1281            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1282            .await;
1283    }
1284
1285    #[gpui::test(iterations = 10)]
1286    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1287        let lang_registry = Arc::new(LanguageRegistry::new());
1288        let fs = Arc::new(FakeFs::new(cx_a.background()));
1289        cx_a.foreground().forbid_parking();
1290
1291        // Connect to a server as 2 clients.
1292        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1293        let client_a = server.create_client(&mut cx_a, "user_a").await;
1294        let client_b = server.create_client(&mut cx_b, "user_b").await;
1295
1296        // Share a project as client A
1297        fs.insert_tree(
1298            "/a",
1299            json!({
1300                ".zed.toml": r#"collaborators = ["user_b"]"#,
1301                "a.txt": "a-contents",
1302                "b.txt": "b-contents",
1303            }),
1304        )
1305        .await;
1306        let project_a = cx_a.update(|cx| {
1307            Project::local(
1308                client_a.clone(),
1309                client_a.user_store.clone(),
1310                lang_registry.clone(),
1311                fs.clone(),
1312                cx,
1313            )
1314        });
1315        let (worktree_a, _) = project_a
1316            .update(&mut cx_a, |p, cx| {
1317                p.find_or_create_local_worktree("/a", false, cx)
1318            })
1319            .await
1320            .unwrap();
1321        worktree_a
1322            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1323            .await;
1324        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1325        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1326        project_a
1327            .update(&mut cx_a, |p, cx| p.share(cx))
1328            .await
1329            .unwrap();
1330        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1331
1332        // Join that project as client B
1333        let project_b = Project::remote(
1334            project_id,
1335            client_b.clone(),
1336            client_b.user_store.clone(),
1337            lang_registry.clone(),
1338            fs.clone(),
1339            &mut cx_b.to_async(),
1340        )
1341        .await
1342        .unwrap();
1343        project_b
1344            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1345            .await
1346            .unwrap();
1347
1348        // Unshare the project as client A
1349        project_a
1350            .update(&mut cx_a, |project, cx| project.unshare(cx))
1351            .await
1352            .unwrap();
1353        project_b
1354            .condition(&mut cx_b, |project, _| project.is_read_only())
1355            .await;
1356        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1357        drop(project_b);
1358
1359        // Share the project again and ensure guests can still join.
1360        project_a
1361            .update(&mut cx_a, |project, cx| project.share(cx))
1362            .await
1363            .unwrap();
1364        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1365
1366        let project_c = Project::remote(
1367            project_id,
1368            client_b.clone(),
1369            client_b.user_store.clone(),
1370            lang_registry.clone(),
1371            fs.clone(),
1372            &mut cx_b.to_async(),
1373        )
1374        .await
1375        .unwrap();
1376        project_c
1377            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1378            .await
1379            .unwrap();
1380    }
1381
1382    #[gpui::test(iterations = 10)]
1383    async fn test_propagate_saves_and_fs_changes(
1384        mut cx_a: TestAppContext,
1385        mut cx_b: TestAppContext,
1386        mut cx_c: TestAppContext,
1387    ) {
1388        let lang_registry = Arc::new(LanguageRegistry::new());
1389        let fs = Arc::new(FakeFs::new(cx_a.background()));
1390        cx_a.foreground().forbid_parking();
1391
1392        // Connect to a server as 3 clients.
1393        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1394        let client_a = server.create_client(&mut cx_a, "user_a").await;
1395        let client_b = server.create_client(&mut cx_b, "user_b").await;
1396        let client_c = server.create_client(&mut cx_c, "user_c").await;
1397
1398        // Share a worktree as client A.
1399        fs.insert_tree(
1400            "/a",
1401            json!({
1402                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1403                "file1": "",
1404                "file2": ""
1405            }),
1406        )
1407        .await;
1408        let project_a = cx_a.update(|cx| {
1409            Project::local(
1410                client_a.clone(),
1411                client_a.user_store.clone(),
1412                lang_registry.clone(),
1413                fs.clone(),
1414                cx,
1415            )
1416        });
1417        let (worktree_a, _) = project_a
1418            .update(&mut cx_a, |p, cx| {
1419                p.find_or_create_local_worktree("/a", false, cx)
1420            })
1421            .await
1422            .unwrap();
1423        worktree_a
1424            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1425            .await;
1426        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1427        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1428        project_a
1429            .update(&mut cx_a, |p, cx| p.share(cx))
1430            .await
1431            .unwrap();
1432
1433        // Join that worktree as clients B and C.
1434        let project_b = Project::remote(
1435            project_id,
1436            client_b.clone(),
1437            client_b.user_store.clone(),
1438            lang_registry.clone(),
1439            fs.clone(),
1440            &mut cx_b.to_async(),
1441        )
1442        .await
1443        .unwrap();
1444        let project_c = Project::remote(
1445            project_id,
1446            client_c.clone(),
1447            client_c.user_store.clone(),
1448            lang_registry.clone(),
1449            fs.clone(),
1450            &mut cx_c.to_async(),
1451        )
1452        .await
1453        .unwrap();
1454        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1455        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1456
1457        // Open and edit a buffer as both guests B and C.
1458        let buffer_b = project_b
1459            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1460            .await
1461            .unwrap();
1462        let buffer_c = project_c
1463            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1464            .await
1465            .unwrap();
1466        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1467        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1468
1469        // Open and edit that buffer as the host.
1470        let buffer_a = project_a
1471            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1472            .await
1473            .unwrap();
1474
1475        buffer_a
1476            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1477            .await;
1478        buffer_a.update(&mut cx_a, |buf, cx| {
1479            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1480        });
1481
1482        // Wait for edits to propagate
1483        buffer_a
1484            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1485            .await;
1486        buffer_b
1487            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1488            .await;
1489        buffer_c
1490            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1491            .await;
1492
1493        // Edit the buffer as the host and concurrently save as guest B.
1494        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1495        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1496        save_b.await.unwrap();
1497        assert_eq!(
1498            fs.load("/a/file1".as_ref()).await.unwrap(),
1499            "hi-a, i-am-c, i-am-b, i-am-a"
1500        );
1501        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1502        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1503        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1504
1505        // Make changes on host's file system, see those changes on guest worktrees.
1506        fs.rename(
1507            "/a/file1".as_ref(),
1508            "/a/file1-renamed".as_ref(),
1509            Default::default(),
1510        )
1511        .await
1512        .unwrap();
1513
1514        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1515            .await
1516            .unwrap();
1517        fs.insert_file(Path::new("/a/file4"), "4".into())
1518            .await
1519            .unwrap();
1520
1521        worktree_a
1522            .condition(&cx_a, |tree, _| {
1523                tree.paths()
1524                    .map(|p| p.to_string_lossy())
1525                    .collect::<Vec<_>>()
1526                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1527            })
1528            .await;
1529        worktree_b
1530            .condition(&cx_b, |tree, _| {
1531                tree.paths()
1532                    .map(|p| p.to_string_lossy())
1533                    .collect::<Vec<_>>()
1534                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1535            })
1536            .await;
1537        worktree_c
1538            .condition(&cx_c, |tree, _| {
1539                tree.paths()
1540                    .map(|p| p.to_string_lossy())
1541                    .collect::<Vec<_>>()
1542                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1543            })
1544            .await;
1545
1546        // Ensure buffer files are updated as well.
1547        buffer_a
1548            .condition(&cx_a, |buf, _| {
1549                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1550            })
1551            .await;
1552        buffer_b
1553            .condition(&cx_b, |buf, _| {
1554                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1555            })
1556            .await;
1557        buffer_c
1558            .condition(&cx_c, |buf, _| {
1559                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1560            })
1561            .await;
1562    }
1563
1564    #[gpui::test(iterations = 10)]
1565    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1566        cx_a.foreground().forbid_parking();
1567        let lang_registry = Arc::new(LanguageRegistry::new());
1568        let fs = Arc::new(FakeFs::new(cx_a.background()));
1569
1570        // Connect to a server as 2 clients.
1571        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1572        let client_a = server.create_client(&mut cx_a, "user_a").await;
1573        let client_b = server.create_client(&mut cx_b, "user_b").await;
1574
1575        // Share a project as client A
1576        fs.insert_tree(
1577            "/dir",
1578            json!({
1579                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1580                "a.txt": "a-contents",
1581            }),
1582        )
1583        .await;
1584
1585        let project_a = cx_a.update(|cx| {
1586            Project::local(
1587                client_a.clone(),
1588                client_a.user_store.clone(),
1589                lang_registry.clone(),
1590                fs.clone(),
1591                cx,
1592            )
1593        });
1594        let (worktree_a, _) = project_a
1595            .update(&mut cx_a, |p, cx| {
1596                p.find_or_create_local_worktree("/dir", false, cx)
1597            })
1598            .await
1599            .unwrap();
1600        worktree_a
1601            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1602            .await;
1603        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1604        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1605        project_a
1606            .update(&mut cx_a, |p, cx| p.share(cx))
1607            .await
1608            .unwrap();
1609
1610        // Join that project as client B
1611        let project_b = Project::remote(
1612            project_id,
1613            client_b.clone(),
1614            client_b.user_store.clone(),
1615            lang_registry.clone(),
1616            fs.clone(),
1617            &mut cx_b.to_async(),
1618        )
1619        .await
1620        .unwrap();
1621
1622        // Open a buffer as client B
1623        let buffer_b = project_b
1624            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1625            .await
1626            .unwrap();
1627
1628        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1629        buffer_b.read_with(&cx_b, |buf, _| {
1630            assert!(buf.is_dirty());
1631            assert!(!buf.has_conflict());
1632        });
1633
1634        buffer_b
1635            .update(&mut cx_b, |buf, cx| buf.save(cx))
1636            .await
1637            .unwrap();
1638        buffer_b
1639            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1640            .await;
1641        buffer_b.read_with(&cx_b, |buf, _| {
1642            assert!(!buf.has_conflict());
1643        });
1644
1645        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1646        buffer_b.read_with(&cx_b, |buf, _| {
1647            assert!(buf.is_dirty());
1648            assert!(!buf.has_conflict());
1649        });
1650    }
1651
1652    #[gpui::test(iterations = 10)]
1653    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1654        cx_a.foreground().forbid_parking();
1655        let lang_registry = Arc::new(LanguageRegistry::new());
1656        let fs = Arc::new(FakeFs::new(cx_a.background()));
1657
1658        // Connect to a server as 2 clients.
1659        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1660        let client_a = server.create_client(&mut cx_a, "user_a").await;
1661        let client_b = server.create_client(&mut cx_b, "user_b").await;
1662
1663        // Share a project as client A
1664        fs.insert_tree(
1665            "/dir",
1666            json!({
1667                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1668                "a.txt": "a-contents",
1669            }),
1670        )
1671        .await;
1672
1673        let project_a = cx_a.update(|cx| {
1674            Project::local(
1675                client_a.clone(),
1676                client_a.user_store.clone(),
1677                lang_registry.clone(),
1678                fs.clone(),
1679                cx,
1680            )
1681        });
1682        let (worktree_a, _) = project_a
1683            .update(&mut cx_a, |p, cx| {
1684                p.find_or_create_local_worktree("/dir", false, cx)
1685            })
1686            .await
1687            .unwrap();
1688        worktree_a
1689            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1690            .await;
1691        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1692        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1693        project_a
1694            .update(&mut cx_a, |p, cx| p.share(cx))
1695            .await
1696            .unwrap();
1697
1698        // Join that project as client B
1699        let project_b = Project::remote(
1700            project_id,
1701            client_b.clone(),
1702            client_b.user_store.clone(),
1703            lang_registry.clone(),
1704            fs.clone(),
1705            &mut cx_b.to_async(),
1706        )
1707        .await
1708        .unwrap();
1709        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1710
1711        // Open a buffer as client B
1712        let buffer_b = project_b
1713            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1714            .await
1715            .unwrap();
1716        buffer_b.read_with(&cx_b, |buf, _| {
1717            assert!(!buf.is_dirty());
1718            assert!(!buf.has_conflict());
1719        });
1720
1721        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1722            .await
1723            .unwrap();
1724        buffer_b
1725            .condition(&cx_b, |buf, _| {
1726                buf.text() == "new contents" && !buf.is_dirty()
1727            })
1728            .await;
1729        buffer_b.read_with(&cx_b, |buf, _| {
1730            assert!(!buf.has_conflict());
1731        });
1732    }
1733
1734    #[gpui::test(iterations = 10)]
1735    async fn test_editing_while_guest_opens_buffer(
1736        mut cx_a: TestAppContext,
1737        mut cx_b: TestAppContext,
1738    ) {
1739        cx_a.foreground().forbid_parking();
1740        let lang_registry = Arc::new(LanguageRegistry::new());
1741        let fs = Arc::new(FakeFs::new(cx_a.background()));
1742
1743        // Connect to a server as 2 clients.
1744        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1745        let client_a = server.create_client(&mut cx_a, "user_a").await;
1746        let client_b = server.create_client(&mut cx_b, "user_b").await;
1747
1748        // Share a project as client A
1749        fs.insert_tree(
1750            "/dir",
1751            json!({
1752                ".zed.toml": r#"collaborators = ["user_b"]"#,
1753                "a.txt": "a-contents",
1754            }),
1755        )
1756        .await;
1757        let project_a = cx_a.update(|cx| {
1758            Project::local(
1759                client_a.clone(),
1760                client_a.user_store.clone(),
1761                lang_registry.clone(),
1762                fs.clone(),
1763                cx,
1764            )
1765        });
1766        let (worktree_a, _) = project_a
1767            .update(&mut cx_a, |p, cx| {
1768                p.find_or_create_local_worktree("/dir", false, cx)
1769            })
1770            .await
1771            .unwrap();
1772        worktree_a
1773            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1774            .await;
1775        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1776        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1777        project_a
1778            .update(&mut cx_a, |p, cx| p.share(cx))
1779            .await
1780            .unwrap();
1781
1782        // Join that project as client B
1783        let project_b = Project::remote(
1784            project_id,
1785            client_b.clone(),
1786            client_b.user_store.clone(),
1787            lang_registry.clone(),
1788            fs.clone(),
1789            &mut cx_b.to_async(),
1790        )
1791        .await
1792        .unwrap();
1793
1794        // Open a buffer as client A
1795        let buffer_a = project_a
1796            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1797            .await
1798            .unwrap();
1799
1800        // Start opening the same buffer as client B
1801        let buffer_b = cx_b
1802            .background()
1803            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1804
1805        // Edit the buffer as client A while client B is still opening it.
1806        cx_b.background().simulate_random_delay().await;
1807        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1808        cx_b.background().simulate_random_delay().await;
1809        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1810
1811        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1812        let buffer_b = buffer_b.await.unwrap();
1813        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1814    }
1815
1816    #[gpui::test(iterations = 10)]
1817    async fn test_leaving_worktree_while_opening_buffer(
1818        mut cx_a: TestAppContext,
1819        mut cx_b: TestAppContext,
1820    ) {
1821        cx_a.foreground().forbid_parking();
1822        let lang_registry = Arc::new(LanguageRegistry::new());
1823        let fs = Arc::new(FakeFs::new(cx_a.background()));
1824
1825        // Connect to a server as 2 clients.
1826        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1827        let client_a = server.create_client(&mut cx_a, "user_a").await;
1828        let client_b = server.create_client(&mut cx_b, "user_b").await;
1829
1830        // Share a project as client A
1831        fs.insert_tree(
1832            "/dir",
1833            json!({
1834                ".zed.toml": r#"collaborators = ["user_b"]"#,
1835                "a.txt": "a-contents",
1836            }),
1837        )
1838        .await;
1839        let project_a = cx_a.update(|cx| {
1840            Project::local(
1841                client_a.clone(),
1842                client_a.user_store.clone(),
1843                lang_registry.clone(),
1844                fs.clone(),
1845                cx,
1846            )
1847        });
1848        let (worktree_a, _) = project_a
1849            .update(&mut cx_a, |p, cx| {
1850                p.find_or_create_local_worktree("/dir", false, cx)
1851            })
1852            .await
1853            .unwrap();
1854        worktree_a
1855            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1856            .await;
1857        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1858        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1859        project_a
1860            .update(&mut cx_a, |p, cx| p.share(cx))
1861            .await
1862            .unwrap();
1863
1864        // Join that project as client B
1865        let project_b = Project::remote(
1866            project_id,
1867            client_b.clone(),
1868            client_b.user_store.clone(),
1869            lang_registry.clone(),
1870            fs.clone(),
1871            &mut cx_b.to_async(),
1872        )
1873        .await
1874        .unwrap();
1875
1876        // See that a guest has joined as client A.
1877        project_a
1878            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1879            .await;
1880
1881        // Begin opening a buffer as client B, but leave the project before the open completes.
1882        let buffer_b = cx_b
1883            .background()
1884            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1885        cx_b.update(|_| drop(project_b));
1886        drop(buffer_b);
1887
1888        // See that the guest has left.
1889        project_a
1890            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1891            .await;
1892    }
1893
1894    #[gpui::test(iterations = 10)]
1895    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1896        cx_a.foreground().forbid_parking();
1897        let lang_registry = Arc::new(LanguageRegistry::new());
1898        let fs = Arc::new(FakeFs::new(cx_a.background()));
1899
1900        // Connect to a server as 2 clients.
1901        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1902        let client_a = server.create_client(&mut cx_a, "user_a").await;
1903        let client_b = server.create_client(&mut cx_b, "user_b").await;
1904
1905        // Share a project as client A
1906        fs.insert_tree(
1907            "/a",
1908            json!({
1909                ".zed.toml": r#"collaborators = ["user_b"]"#,
1910                "a.txt": "a-contents",
1911                "b.txt": "b-contents",
1912            }),
1913        )
1914        .await;
1915        let project_a = cx_a.update(|cx| {
1916            Project::local(
1917                client_a.clone(),
1918                client_a.user_store.clone(),
1919                lang_registry.clone(),
1920                fs.clone(),
1921                cx,
1922            )
1923        });
1924        let (worktree_a, _) = project_a
1925            .update(&mut cx_a, |p, cx| {
1926                p.find_or_create_local_worktree("/a", false, cx)
1927            })
1928            .await
1929            .unwrap();
1930        worktree_a
1931            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1932            .await;
1933        let project_id = project_a
1934            .update(&mut cx_a, |project, _| project.next_remote_id())
1935            .await;
1936        project_a
1937            .update(&mut cx_a, |project, cx| project.share(cx))
1938            .await
1939            .unwrap();
1940
1941        // Join that project as client B
1942        let _project_b = Project::remote(
1943            project_id,
1944            client_b.clone(),
1945            client_b.user_store.clone(),
1946            lang_registry.clone(),
1947            fs.clone(),
1948            &mut cx_b.to_async(),
1949        )
1950        .await
1951        .unwrap();
1952
1953        // See that a guest has joined as client A.
1954        project_a
1955            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1956            .await;
1957
1958        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1959        client_b.disconnect(&cx_b.to_async()).unwrap();
1960        project_a
1961            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1962            .await;
1963    }
1964
1965    #[gpui::test(iterations = 10)]
1966    async fn test_collaborating_with_diagnostics(
1967        mut cx_a: TestAppContext,
1968        mut cx_b: TestAppContext,
1969    ) {
1970        cx_a.foreground().forbid_parking();
1971        let mut lang_registry = Arc::new(LanguageRegistry::new());
1972        let fs = Arc::new(FakeFs::new(cx_a.background()));
1973
1974        // Set up a fake language server.
1975        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1976        Arc::get_mut(&mut lang_registry)
1977            .unwrap()
1978            .add(Arc::new(Language::new(
1979                LanguageConfig {
1980                    name: "Rust".to_string(),
1981                    path_suffixes: vec!["rs".to_string()],
1982                    language_server: Some(language_server_config),
1983                    ..Default::default()
1984                },
1985                Some(tree_sitter_rust::language()),
1986            )));
1987
1988        // Connect to a server as 2 clients.
1989        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1990        let client_a = server.create_client(&mut cx_a, "user_a").await;
1991        let client_b = server.create_client(&mut cx_b, "user_b").await;
1992
1993        // Share a project as client A
1994        fs.insert_tree(
1995            "/a",
1996            json!({
1997                ".zed.toml": r#"collaborators = ["user_b"]"#,
1998                "a.rs": "let one = two",
1999                "other.rs": "",
2000            }),
2001        )
2002        .await;
2003        let project_a = cx_a.update(|cx| {
2004            Project::local(
2005                client_a.clone(),
2006                client_a.user_store.clone(),
2007                lang_registry.clone(),
2008                fs.clone(),
2009                cx,
2010            )
2011        });
2012        let (worktree_a, _) = project_a
2013            .update(&mut cx_a, |p, cx| {
2014                p.find_or_create_local_worktree("/a", false, cx)
2015            })
2016            .await
2017            .unwrap();
2018        worktree_a
2019            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2020            .await;
2021        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2022        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2023        project_a
2024            .update(&mut cx_a, |p, cx| p.share(cx))
2025            .await
2026            .unwrap();
2027
2028        // Cause the language server to start.
2029        let _ = cx_a
2030            .background()
2031            .spawn(project_a.update(&mut cx_a, |project, cx| {
2032                project.open_buffer(
2033                    ProjectPath {
2034                        worktree_id,
2035                        path: Path::new("other.rs").into(),
2036                    },
2037                    cx,
2038                )
2039            }))
2040            .await
2041            .unwrap();
2042
2043        // Simulate a language server reporting errors for a file.
2044        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2045        fake_language_server
2046            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2047                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2048                version: None,
2049                diagnostics: vec![lsp::Diagnostic {
2050                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2051                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2052                    message: "message 1".to_string(),
2053                    ..Default::default()
2054                }],
2055            })
2056            .await;
2057
2058        // Wait for server to see the diagnostics update.
2059        server
2060            .condition(|store| {
2061                let worktree = store
2062                    .project(project_id)
2063                    .unwrap()
2064                    .worktrees
2065                    .get(&worktree_id.to_proto())
2066                    .unwrap();
2067
2068                !worktree
2069                    .share
2070                    .as_ref()
2071                    .unwrap()
2072                    .diagnostic_summaries
2073                    .is_empty()
2074            })
2075            .await;
2076
2077        // Join the worktree as client B.
2078        let project_b = Project::remote(
2079            project_id,
2080            client_b.clone(),
2081            client_b.user_store.clone(),
2082            lang_registry.clone(),
2083            fs.clone(),
2084            &mut cx_b.to_async(),
2085        )
2086        .await
2087        .unwrap();
2088
2089        project_b.read_with(&cx_b, |project, cx| {
2090            assert_eq!(
2091                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2092                &[(
2093                    ProjectPath {
2094                        worktree_id,
2095                        path: Arc::from(Path::new("a.rs")),
2096                    },
2097                    DiagnosticSummary {
2098                        error_count: 1,
2099                        warning_count: 0,
2100                        ..Default::default()
2101                    },
2102                )]
2103            )
2104        });
2105
2106        // Simulate a language server reporting more errors for a file.
2107        fake_language_server
2108            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2109                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2110                version: None,
2111                diagnostics: vec![
2112                    lsp::Diagnostic {
2113                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2114                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2115                        message: "message 1".to_string(),
2116                        ..Default::default()
2117                    },
2118                    lsp::Diagnostic {
2119                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2120                        range: lsp::Range::new(
2121                            lsp::Position::new(0, 10),
2122                            lsp::Position::new(0, 13),
2123                        ),
2124                        message: "message 2".to_string(),
2125                        ..Default::default()
2126                    },
2127                ],
2128            })
2129            .await;
2130
2131        // Client b gets the updated summaries
2132        project_b
2133            .condition(&cx_b, |project, cx| {
2134                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2135                    == &[(
2136                        ProjectPath {
2137                            worktree_id,
2138                            path: Arc::from(Path::new("a.rs")),
2139                        },
2140                        DiagnosticSummary {
2141                            error_count: 1,
2142                            warning_count: 1,
2143                            ..Default::default()
2144                        },
2145                    )]
2146            })
2147            .await;
2148
2149        // Open the file with the errors on client B. They should be present.
2150        let buffer_b = cx_b
2151            .background()
2152            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2153            .await
2154            .unwrap();
2155
2156        buffer_b.read_with(&cx_b, |buffer, _| {
2157            assert_eq!(
2158                buffer
2159                    .snapshot()
2160                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2161                    .map(|entry| entry)
2162                    .collect::<Vec<_>>(),
2163                &[
2164                    DiagnosticEntry {
2165                        range: Point::new(0, 4)..Point::new(0, 7),
2166                        diagnostic: Diagnostic {
2167                            group_id: 0,
2168                            message: "message 1".to_string(),
2169                            severity: lsp::DiagnosticSeverity::ERROR,
2170                            is_primary: true,
2171                            ..Default::default()
2172                        }
2173                    },
2174                    DiagnosticEntry {
2175                        range: Point::new(0, 10)..Point::new(0, 13),
2176                        diagnostic: Diagnostic {
2177                            group_id: 1,
2178                            severity: lsp::DiagnosticSeverity::WARNING,
2179                            message: "message 2".to_string(),
2180                            is_primary: true,
2181                            ..Default::default()
2182                        }
2183                    }
2184                ]
2185            );
2186        });
2187    }
2188
2189    #[gpui::test(iterations = 10)]
2190    async fn test_collaborating_with_completion(
2191        mut cx_a: TestAppContext,
2192        mut cx_b: TestAppContext,
2193    ) {
2194        cx_a.foreground().forbid_parking();
2195        let mut lang_registry = Arc::new(LanguageRegistry::new());
2196        let fs = Arc::new(FakeFs::new(cx_a.background()));
2197
2198        // Set up a fake language server.
2199        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2200        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2201            completion_provider: Some(lsp::CompletionOptions {
2202                trigger_characters: Some(vec![".".to_string()]),
2203                ..Default::default()
2204            }),
2205            ..Default::default()
2206        });
2207        Arc::get_mut(&mut lang_registry)
2208            .unwrap()
2209            .add(Arc::new(Language::new(
2210                LanguageConfig {
2211                    name: "Rust".to_string(),
2212                    path_suffixes: vec!["rs".to_string()],
2213                    language_server: Some(language_server_config),
2214                    ..Default::default()
2215                },
2216                Some(tree_sitter_rust::language()),
2217            )));
2218
2219        // Connect to a server as 2 clients.
2220        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2221        let client_a = server.create_client(&mut cx_a, "user_a").await;
2222        let client_b = server.create_client(&mut cx_b, "user_b").await;
2223
2224        // Share a project as client A
2225        fs.insert_tree(
2226            "/a",
2227            json!({
2228                ".zed.toml": r#"collaborators = ["user_b"]"#,
2229                "main.rs": "fn main() { a }",
2230                "other.rs": "",
2231            }),
2232        )
2233        .await;
2234        let project_a = cx_a.update(|cx| {
2235            Project::local(
2236                client_a.clone(),
2237                client_a.user_store.clone(),
2238                lang_registry.clone(),
2239                fs.clone(),
2240                cx,
2241            )
2242        });
2243        let (worktree_a, _) = project_a
2244            .update(&mut cx_a, |p, cx| {
2245                p.find_or_create_local_worktree("/a", false, cx)
2246            })
2247            .await
2248            .unwrap();
2249        worktree_a
2250            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2251            .await;
2252        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2253        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2254        project_a
2255            .update(&mut cx_a, |p, cx| p.share(cx))
2256            .await
2257            .unwrap();
2258
2259        // Join the worktree as client B.
2260        let project_b = Project::remote(
2261            project_id,
2262            client_b.clone(),
2263            client_b.user_store.clone(),
2264            lang_registry.clone(),
2265            fs.clone(),
2266            &mut cx_b.to_async(),
2267        )
2268        .await
2269        .unwrap();
2270
2271        // Open a file in an editor as the guest.
2272        let buffer_b = project_b
2273            .update(&mut cx_b, |p, cx| {
2274                p.open_buffer((worktree_id, "main.rs"), cx)
2275            })
2276            .await
2277            .unwrap();
2278        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2279        let editor_b = cx_b.add_view(window_b, |cx| {
2280            Editor::for_buffer(
2281                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2282                Arc::new(|cx| EditorSettings::test(cx)),
2283                Some(project_b.clone()),
2284                cx,
2285            )
2286        });
2287
2288        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2289        buffer_b
2290            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2291            .await;
2292
2293        // Type a completion trigger character as the guest.
2294        editor_b.update(&mut cx_b, |editor, cx| {
2295            editor.select_ranges([13..13], None, cx);
2296            editor.handle_input(&Input(".".into()), cx);
2297            cx.focus(&editor_b);
2298        });
2299
2300        // Receive a completion request as the host's language server.
2301        // Return some completions from the host's language server.
2302        cx_a.foreground().start_waiting();
2303        fake_language_server
2304            .handle_request::<lsp::request::Completion, _>(|params| {
2305                assert_eq!(
2306                    params.text_document_position.text_document.uri,
2307                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2308                );
2309                assert_eq!(
2310                    params.text_document_position.position,
2311                    lsp::Position::new(0, 14),
2312                );
2313
2314                Some(lsp::CompletionResponse::Array(vec![
2315                    lsp::CompletionItem {
2316                        label: "first_method(…)".into(),
2317                        detail: Some("fn(&mut self, B) -> C".into()),
2318                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2319                            new_text: "first_method($1)".to_string(),
2320                            range: lsp::Range::new(
2321                                lsp::Position::new(0, 14),
2322                                lsp::Position::new(0, 14),
2323                            ),
2324                        })),
2325                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2326                        ..Default::default()
2327                    },
2328                    lsp::CompletionItem {
2329                        label: "second_method(…)".into(),
2330                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2331                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2332                            new_text: "second_method()".to_string(),
2333                            range: lsp::Range::new(
2334                                lsp::Position::new(0, 14),
2335                                lsp::Position::new(0, 14),
2336                            ),
2337                        })),
2338                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2339                        ..Default::default()
2340                    },
2341                ]))
2342            })
2343            .next()
2344            .await
2345            .unwrap();
2346        cx_a.foreground().finish_waiting();
2347
2348        // Open the buffer on the host.
2349        let buffer_a = project_a
2350            .update(&mut cx_a, |p, cx| {
2351                p.open_buffer((worktree_id, "main.rs"), cx)
2352            })
2353            .await
2354            .unwrap();
2355        buffer_a
2356            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2357            .await;
2358
2359        // Confirm a completion on the guest.
2360        editor_b
2361            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2362            .await;
2363        editor_b.update(&mut cx_b, |editor, cx| {
2364            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2365            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2366        });
2367
2368        // Return a resolved completion from the host's language server.
2369        // The resolved completion has an additional text edit.
2370        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2371            assert_eq!(params.label, "first_method(…)");
2372            lsp::CompletionItem {
2373                label: "first_method(…)".into(),
2374                detail: Some("fn(&mut self, B) -> C".into()),
2375                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2376                    new_text: "first_method($1)".to_string(),
2377                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2378                })),
2379                additional_text_edits: Some(vec![lsp::TextEdit {
2380                    new_text: "use d::SomeTrait;\n".to_string(),
2381                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2382                }]),
2383                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2384                ..Default::default()
2385            }
2386        });
2387
2388        // The additional edit is applied.
2389        buffer_a
2390            .condition(&cx_a, |buffer, _| {
2391                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2392            })
2393            .await;
2394        buffer_b
2395            .condition(&cx_b, |buffer, _| {
2396                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2397            })
2398            .await;
2399    }
2400
2401    #[gpui::test(iterations = 10)]
2402    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2403        cx_a.foreground().forbid_parking();
2404        let mut lang_registry = Arc::new(LanguageRegistry::new());
2405        let fs = Arc::new(FakeFs::new(cx_a.background()));
2406
2407        // Set up a fake language server.
2408        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2409        Arc::get_mut(&mut lang_registry)
2410            .unwrap()
2411            .add(Arc::new(Language::new(
2412                LanguageConfig {
2413                    name: "Rust".to_string(),
2414                    path_suffixes: vec!["rs".to_string()],
2415                    language_server: Some(language_server_config),
2416                    ..Default::default()
2417                },
2418                Some(tree_sitter_rust::language()),
2419            )));
2420
2421        // Connect to a server as 2 clients.
2422        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2423        let client_a = server.create_client(&mut cx_a, "user_a").await;
2424        let client_b = server.create_client(&mut cx_b, "user_b").await;
2425
2426        // Share a project as client A
2427        fs.insert_tree(
2428            "/a",
2429            json!({
2430                ".zed.toml": r#"collaborators = ["user_b"]"#,
2431                "a.rs": "let one = two",
2432            }),
2433        )
2434        .await;
2435        let project_a = cx_a.update(|cx| {
2436            Project::local(
2437                client_a.clone(),
2438                client_a.user_store.clone(),
2439                lang_registry.clone(),
2440                fs.clone(),
2441                cx,
2442            )
2443        });
2444        let (worktree_a, _) = project_a
2445            .update(&mut cx_a, |p, cx| {
2446                p.find_or_create_local_worktree("/a", false, cx)
2447            })
2448            .await
2449            .unwrap();
2450        worktree_a
2451            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2452            .await;
2453        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2454        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2455        project_a
2456            .update(&mut cx_a, |p, cx| p.share(cx))
2457            .await
2458            .unwrap();
2459
2460        // Join the worktree as client B.
2461        let project_b = Project::remote(
2462            project_id,
2463            client_b.clone(),
2464            client_b.user_store.clone(),
2465            lang_registry.clone(),
2466            fs.clone(),
2467            &mut cx_b.to_async(),
2468        )
2469        .await
2470        .unwrap();
2471
2472        let buffer_b = cx_b
2473            .background()
2474            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2475            .await
2476            .unwrap();
2477
2478        let format = project_b.update(&mut cx_b, |project, cx| {
2479            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2480        });
2481
2482        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2483        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2484            Some(vec![
2485                lsp::TextEdit {
2486                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2487                    new_text: "h".to_string(),
2488                },
2489                lsp::TextEdit {
2490                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2491                    new_text: "y".to_string(),
2492                },
2493            ])
2494        });
2495
2496        format.await.unwrap();
2497        assert_eq!(
2498            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2499            "let honey = two"
2500        );
2501    }
2502
2503    #[gpui::test(iterations = 10)]
2504    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2505        cx_a.foreground().forbid_parking();
2506        let mut lang_registry = Arc::new(LanguageRegistry::new());
2507        let fs = Arc::new(FakeFs::new(cx_a.background()));
2508        fs.insert_tree(
2509            "/root-1",
2510            json!({
2511                ".zed.toml": r#"collaborators = ["user_b"]"#,
2512                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2513            }),
2514        )
2515        .await;
2516        fs.insert_tree(
2517            "/root-2",
2518            json!({
2519                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2520            }),
2521        )
2522        .await;
2523
2524        // Set up a fake language server.
2525        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2526        Arc::get_mut(&mut lang_registry)
2527            .unwrap()
2528            .add(Arc::new(Language::new(
2529                LanguageConfig {
2530                    name: "Rust".to_string(),
2531                    path_suffixes: vec!["rs".to_string()],
2532                    language_server: Some(language_server_config),
2533                    ..Default::default()
2534                },
2535                Some(tree_sitter_rust::language()),
2536            )));
2537
2538        // Connect to a server as 2 clients.
2539        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2540        let client_a = server.create_client(&mut cx_a, "user_a").await;
2541        let client_b = server.create_client(&mut cx_b, "user_b").await;
2542
2543        // Share a project as client A
2544        let project_a = cx_a.update(|cx| {
2545            Project::local(
2546                client_a.clone(),
2547                client_a.user_store.clone(),
2548                lang_registry.clone(),
2549                fs.clone(),
2550                cx,
2551            )
2552        });
2553        let (worktree_a, _) = project_a
2554            .update(&mut cx_a, |p, cx| {
2555                p.find_or_create_local_worktree("/root-1", false, cx)
2556            })
2557            .await
2558            .unwrap();
2559        worktree_a
2560            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2561            .await;
2562        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2563        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2564        project_a
2565            .update(&mut cx_a, |p, cx| p.share(cx))
2566            .await
2567            .unwrap();
2568
2569        // Join the worktree as client B.
2570        let project_b = Project::remote(
2571            project_id,
2572            client_b.clone(),
2573            client_b.user_store.clone(),
2574            lang_registry.clone(),
2575            fs.clone(),
2576            &mut cx_b.to_async(),
2577        )
2578        .await
2579        .unwrap();
2580
2581        // Open the file on client B.
2582        let buffer_b = cx_b
2583            .background()
2584            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2585            .await
2586            .unwrap();
2587
2588        // Request the definition of a symbol as the guest.
2589        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2590
2591        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2592        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2593            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2594                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2595                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2596            )))
2597        });
2598
2599        let definitions_1 = definitions_1.await.unwrap();
2600        cx_b.read(|cx| {
2601            assert_eq!(definitions_1.len(), 1);
2602            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2603            let target_buffer = definitions_1[0].target_buffer.read(cx);
2604            assert_eq!(
2605                target_buffer.text(),
2606                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2607            );
2608            assert_eq!(
2609                definitions_1[0].target_range.to_point(target_buffer),
2610                Point::new(0, 6)..Point::new(0, 9)
2611            );
2612        });
2613
2614        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2615        // the previous call to `definition`.
2616        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2617        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2618            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2619                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2620                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2621            )))
2622        });
2623
2624        let definitions_2 = definitions_2.await.unwrap();
2625        cx_b.read(|cx| {
2626            assert_eq!(definitions_2.len(), 1);
2627            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2628            let target_buffer = definitions_2[0].target_buffer.read(cx);
2629            assert_eq!(
2630                target_buffer.text(),
2631                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2632            );
2633            assert_eq!(
2634                definitions_2[0].target_range.to_point(target_buffer),
2635                Point::new(1, 6)..Point::new(1, 11)
2636            );
2637        });
2638        assert_eq!(
2639            definitions_1[0].target_buffer,
2640            definitions_2[0].target_buffer
2641        );
2642
2643        cx_b.update(|_| {
2644            drop(definitions_1);
2645            drop(definitions_2);
2646        });
2647        project_b
2648            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2649            .await;
2650    }
2651
2652    #[gpui::test(iterations = 10)]
2653    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2654        mut cx_a: TestAppContext,
2655        mut cx_b: TestAppContext,
2656        mut rng: StdRng,
2657    ) {
2658        cx_a.foreground().forbid_parking();
2659        let mut lang_registry = Arc::new(LanguageRegistry::new());
2660        let fs = Arc::new(FakeFs::new(cx_a.background()));
2661        fs.insert_tree(
2662            "/root",
2663            json!({
2664                ".zed.toml": r#"collaborators = ["user_b"]"#,
2665                "a.rs": "const ONE: usize = b::TWO;",
2666                "b.rs": "const TWO: usize = 2",
2667            }),
2668        )
2669        .await;
2670
2671        // Set up a fake language server.
2672        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2673
2674        Arc::get_mut(&mut lang_registry)
2675            .unwrap()
2676            .add(Arc::new(Language::new(
2677                LanguageConfig {
2678                    name: "Rust".to_string(),
2679                    path_suffixes: vec!["rs".to_string()],
2680                    language_server: Some(language_server_config),
2681                    ..Default::default()
2682                },
2683                Some(tree_sitter_rust::language()),
2684            )));
2685
2686        // Connect to a server as 2 clients.
2687        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2688        let client_a = server.create_client(&mut cx_a, "user_a").await;
2689        let client_b = server.create_client(&mut cx_b, "user_b").await;
2690
2691        // Share a project as client A
2692        let project_a = cx_a.update(|cx| {
2693            Project::local(
2694                client_a.clone(),
2695                client_a.user_store.clone(),
2696                lang_registry.clone(),
2697                fs.clone(),
2698                cx,
2699            )
2700        });
2701
2702        let (worktree_a, _) = project_a
2703            .update(&mut cx_a, |p, cx| {
2704                p.find_or_create_local_worktree("/root", false, cx)
2705            })
2706            .await
2707            .unwrap();
2708        worktree_a
2709            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2710            .await;
2711        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2712        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2713        project_a
2714            .update(&mut cx_a, |p, cx| p.share(cx))
2715            .await
2716            .unwrap();
2717
2718        // Join the worktree as client B.
2719        let project_b = Project::remote(
2720            project_id,
2721            client_b.clone(),
2722            client_b.user_store.clone(),
2723            lang_registry.clone(),
2724            fs.clone(),
2725            &mut cx_b.to_async(),
2726        )
2727        .await
2728        .unwrap();
2729
2730        let buffer_b1 = cx_b
2731            .background()
2732            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2733            .await
2734            .unwrap();
2735
2736        let definitions;
2737        let buffer_b2;
2738        if rng.gen() {
2739            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2740            buffer_b2 =
2741                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2742        } else {
2743            buffer_b2 =
2744                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2745            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2746        }
2747
2748        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2749        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2750            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2751                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2752                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2753            )))
2754        });
2755
2756        let buffer_b2 = buffer_b2.await.unwrap();
2757        let definitions = definitions.await.unwrap();
2758        assert_eq!(definitions.len(), 1);
2759        assert_eq!(definitions[0].target_buffer, buffer_b2);
2760    }
2761
2762    #[gpui::test(iterations = 10)]
2763    async fn test_collaborating_with_code_actions(
2764        mut cx_a: TestAppContext,
2765        mut cx_b: TestAppContext,
2766    ) {
2767        cx_a.foreground().forbid_parking();
2768        let mut lang_registry = Arc::new(LanguageRegistry::new());
2769        let fs = Arc::new(FakeFs::new(cx_a.background()));
2770        let mut path_openers_b = Vec::new();
2771        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2772
2773        // Set up a fake language server.
2774        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2775        Arc::get_mut(&mut lang_registry)
2776            .unwrap()
2777            .add(Arc::new(Language::new(
2778                LanguageConfig {
2779                    name: "Rust".to_string(),
2780                    path_suffixes: vec!["rs".to_string()],
2781                    language_server: Some(language_server_config),
2782                    ..Default::default()
2783                },
2784                Some(tree_sitter_rust::language()),
2785            )));
2786
2787        // Connect to a server as 2 clients.
2788        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2789        let client_a = server.create_client(&mut cx_a, "user_a").await;
2790        let client_b = server.create_client(&mut cx_b, "user_b").await;
2791
2792        // Share a project as client A
2793        fs.insert_tree(
2794            "/a",
2795            json!({
2796                ".zed.toml": r#"collaborators = ["user_b"]"#,
2797                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2798                "other.rs": "pub fn foo() -> usize { 4 }",
2799            }),
2800        )
2801        .await;
2802        let project_a = cx_a.update(|cx| {
2803            Project::local(
2804                client_a.clone(),
2805                client_a.user_store.clone(),
2806                lang_registry.clone(),
2807                fs.clone(),
2808                cx,
2809            )
2810        });
2811        let (worktree_a, _) = project_a
2812            .update(&mut cx_a, |p, cx| {
2813                p.find_or_create_local_worktree("/a", false, cx)
2814            })
2815            .await
2816            .unwrap();
2817        worktree_a
2818            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2819            .await;
2820        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2821        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2822        project_a
2823            .update(&mut cx_a, |p, cx| p.share(cx))
2824            .await
2825            .unwrap();
2826
2827        // Join the worktree as client B.
2828        let project_b = Project::remote(
2829            project_id,
2830            client_b.clone(),
2831            client_b.user_store.clone(),
2832            lang_registry.clone(),
2833            fs.clone(),
2834            &mut cx_b.to_async(),
2835        )
2836        .await
2837        .unwrap();
2838        let mut params = cx_b.update(WorkspaceParams::test);
2839        params.languages = lang_registry.clone();
2840        params.client = client_b.client.clone();
2841        params.user_store = client_b.user_store.clone();
2842        params.project = project_b;
2843        params.path_openers = path_openers_b.into();
2844
2845        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2846        let editor_b = workspace_b
2847            .update(&mut cx_b, |workspace, cx| {
2848                workspace.open_path((worktree_id, "main.rs").into(), cx)
2849            })
2850            .await
2851            .unwrap()
2852            .downcast::<Editor>()
2853            .unwrap();
2854
2855        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2856        fake_language_server
2857            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2858                assert_eq!(
2859                    params.text_document.uri,
2860                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2861                );
2862                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2863                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2864                None
2865            })
2866            .next()
2867            .await;
2868
2869        // Move cursor to a location that contains code actions.
2870        editor_b.update(&mut cx_b, |editor, cx| {
2871            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2872            cx.focus(&editor_b);
2873        });
2874
2875        fake_language_server
2876            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2877                assert_eq!(
2878                    params.text_document.uri,
2879                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2880                );
2881                assert_eq!(params.range.start, lsp::Position::new(1, 31));
2882                assert_eq!(params.range.end, lsp::Position::new(1, 31));
2883
2884                Some(vec![lsp::CodeActionOrCommand::CodeAction(
2885                    lsp::CodeAction {
2886                        title: "Inline into all callers".to_string(),
2887                        edit: Some(lsp::WorkspaceEdit {
2888                            changes: Some(
2889                                [
2890                                    (
2891                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
2892                                        vec![lsp::TextEdit::new(
2893                                            lsp::Range::new(
2894                                                lsp::Position::new(1, 22),
2895                                                lsp::Position::new(1, 34),
2896                                            ),
2897                                            "4".to_string(),
2898                                        )],
2899                                    ),
2900                                    (
2901                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
2902                                        vec![lsp::TextEdit::new(
2903                                            lsp::Range::new(
2904                                                lsp::Position::new(0, 0),
2905                                                lsp::Position::new(0, 27),
2906                                            ),
2907                                            "".to_string(),
2908                                        )],
2909                                    ),
2910                                ]
2911                                .into_iter()
2912                                .collect(),
2913                            ),
2914                            ..Default::default()
2915                        }),
2916                        data: Some(json!({
2917                            "codeActionParams": {
2918                                "range": {
2919                                    "start": {"line": 1, "column": 31},
2920                                    "end": {"line": 1, "column": 31},
2921                                }
2922                            }
2923                        })),
2924                        ..Default::default()
2925                    },
2926                )])
2927            })
2928            .next()
2929            .await;
2930
2931        // Toggle code actions and wait for them to display.
2932        editor_b.update(&mut cx_b, |editor, cx| {
2933            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2934        });
2935        editor_b
2936            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2937            .await;
2938
2939        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
2940
2941        // Confirming the code action will trigger a resolve request.
2942        let confirm_action = workspace_b
2943            .update(&mut cx_b, |workspace, cx| {
2944                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2945            })
2946            .unwrap();
2947        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2948            lsp::CodeAction {
2949                title: "Inline into all callers".to_string(),
2950                edit: Some(lsp::WorkspaceEdit {
2951                    changes: Some(
2952                        [
2953                            (
2954                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2955                                vec![lsp::TextEdit::new(
2956                                    lsp::Range::new(
2957                                        lsp::Position::new(1, 22),
2958                                        lsp::Position::new(1, 34),
2959                                    ),
2960                                    "4".to_string(),
2961                                )],
2962                            ),
2963                            (
2964                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2965                                vec![lsp::TextEdit::new(
2966                                    lsp::Range::new(
2967                                        lsp::Position::new(0, 0),
2968                                        lsp::Position::new(0, 27),
2969                                    ),
2970                                    "".to_string(),
2971                                )],
2972                            ),
2973                        ]
2974                        .into_iter()
2975                        .collect(),
2976                    ),
2977                    ..Default::default()
2978                }),
2979                ..Default::default()
2980            }
2981        });
2982
2983        // After the action is confirmed, an editor containing both modified files is opened.
2984        confirm_action.await.unwrap();
2985        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2986            workspace
2987                .active_item(cx)
2988                .unwrap()
2989                .downcast::<Editor>()
2990                .unwrap()
2991        });
2992        code_action_editor.update(&mut cx_b, |editor, cx| {
2993            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2994            editor.undo(&Undo, cx);
2995            assert_eq!(
2996                editor.text(cx),
2997                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2998            );
2999            editor.redo(&Redo, cx);
3000            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3001        });
3002    }
3003
3004    #[gpui::test(iterations = 10)]
3005    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3006        cx_a.foreground().forbid_parking();
3007
3008        // Connect to a server as 2 clients.
3009        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3010        let client_a = server.create_client(&mut cx_a, "user_a").await;
3011        let client_b = server.create_client(&mut cx_b, "user_b").await;
3012
3013        // Create an org that includes these 2 users.
3014        let db = &server.app_state.db;
3015        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3016        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3017            .await
3018            .unwrap();
3019        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3020            .await
3021            .unwrap();
3022
3023        // Create a channel that includes all the users.
3024        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3025        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3026            .await
3027            .unwrap();
3028        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3029            .await
3030            .unwrap();
3031        db.create_channel_message(
3032            channel_id,
3033            client_b.current_user_id(&cx_b),
3034            "hello A, it's B.",
3035            OffsetDateTime::now_utc(),
3036            1,
3037        )
3038        .await
3039        .unwrap();
3040
3041        let channels_a = cx_a
3042            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3043        channels_a
3044            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3045            .await;
3046        channels_a.read_with(&cx_a, |list, _| {
3047            assert_eq!(
3048                list.available_channels().unwrap(),
3049                &[ChannelDetails {
3050                    id: channel_id.to_proto(),
3051                    name: "test-channel".to_string()
3052                }]
3053            )
3054        });
3055        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3056            this.get_channel(channel_id.to_proto(), cx).unwrap()
3057        });
3058        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3059        channel_a
3060            .condition(&cx_a, |channel, _| {
3061                channel_messages(channel)
3062                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3063            })
3064            .await;
3065
3066        let channels_b = cx_b
3067            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3068        channels_b
3069            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3070            .await;
3071        channels_b.read_with(&cx_b, |list, _| {
3072            assert_eq!(
3073                list.available_channels().unwrap(),
3074                &[ChannelDetails {
3075                    id: channel_id.to_proto(),
3076                    name: "test-channel".to_string()
3077                }]
3078            )
3079        });
3080
3081        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3082            this.get_channel(channel_id.to_proto(), cx).unwrap()
3083        });
3084        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3085        channel_b
3086            .condition(&cx_b, |channel, _| {
3087                channel_messages(channel)
3088                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3089            })
3090            .await;
3091
3092        channel_a
3093            .update(&mut cx_a, |channel, cx| {
3094                channel
3095                    .send_message("oh, hi B.".to_string(), cx)
3096                    .unwrap()
3097                    .detach();
3098                let task = channel.send_message("sup".to_string(), cx).unwrap();
3099                assert_eq!(
3100                    channel_messages(channel),
3101                    &[
3102                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3103                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3104                        ("user_a".to_string(), "sup".to_string(), true)
3105                    ]
3106                );
3107                task
3108            })
3109            .await
3110            .unwrap();
3111
3112        channel_b
3113            .condition(&cx_b, |channel, _| {
3114                channel_messages(channel)
3115                    == [
3116                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3117                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3118                        ("user_a".to_string(), "sup".to_string(), false),
3119                    ]
3120            })
3121            .await;
3122
3123        assert_eq!(
3124            server
3125                .state()
3126                .await
3127                .channel(channel_id)
3128                .unwrap()
3129                .connection_ids
3130                .len(),
3131            2
3132        );
3133        cx_b.update(|_| drop(channel_b));
3134        server
3135            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3136            .await;
3137
3138        cx_a.update(|_| drop(channel_a));
3139        server
3140            .condition(|state| state.channel(channel_id).is_none())
3141            .await;
3142    }
3143
3144    #[gpui::test(iterations = 10)]
3145    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3146        cx_a.foreground().forbid_parking();
3147
3148        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3149        let client_a = server.create_client(&mut cx_a, "user_a").await;
3150
3151        let db = &server.app_state.db;
3152        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3153        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3154        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3155            .await
3156            .unwrap();
3157        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3158            .await
3159            .unwrap();
3160
3161        let channels_a = cx_a
3162            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3163        channels_a
3164            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3165            .await;
3166        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3167            this.get_channel(channel_id.to_proto(), cx).unwrap()
3168        });
3169
3170        // Messages aren't allowed to be too long.
3171        channel_a
3172            .update(&mut cx_a, |channel, cx| {
3173                let long_body = "this is long.\n".repeat(1024);
3174                channel.send_message(long_body, cx).unwrap()
3175            })
3176            .await
3177            .unwrap_err();
3178
3179        // Messages aren't allowed to be blank.
3180        channel_a.update(&mut cx_a, |channel, cx| {
3181            channel.send_message(String::new(), cx).unwrap_err()
3182        });
3183
3184        // Leading and trailing whitespace are trimmed.
3185        channel_a
3186            .update(&mut cx_a, |channel, cx| {
3187                channel
3188                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3189                    .unwrap()
3190            })
3191            .await
3192            .unwrap();
3193        assert_eq!(
3194            db.get_channel_messages(channel_id, 10, None)
3195                .await
3196                .unwrap()
3197                .iter()
3198                .map(|m| &m.body)
3199                .collect::<Vec<_>>(),
3200            &["surrounded by whitespace"]
3201        );
3202    }
3203
3204    #[gpui::test(iterations = 10)]
3205    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3206        cx_a.foreground().forbid_parking();
3207
3208        // Connect to a server as 2 clients.
3209        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3210        let client_a = server.create_client(&mut cx_a, "user_a").await;
3211        let client_b = server.create_client(&mut cx_b, "user_b").await;
3212        let mut status_b = client_b.status();
3213
3214        // Create an org that includes these 2 users.
3215        let db = &server.app_state.db;
3216        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3217        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3218            .await
3219            .unwrap();
3220        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3221            .await
3222            .unwrap();
3223
3224        // Create a channel that includes all the users.
3225        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3226        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3227            .await
3228            .unwrap();
3229        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3230            .await
3231            .unwrap();
3232        db.create_channel_message(
3233            channel_id,
3234            client_b.current_user_id(&cx_b),
3235            "hello A, it's B.",
3236            OffsetDateTime::now_utc(),
3237            2,
3238        )
3239        .await
3240        .unwrap();
3241
3242        let channels_a = cx_a
3243            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3244        channels_a
3245            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3246            .await;
3247
3248        channels_a.read_with(&cx_a, |list, _| {
3249            assert_eq!(
3250                list.available_channels().unwrap(),
3251                &[ChannelDetails {
3252                    id: channel_id.to_proto(),
3253                    name: "test-channel".to_string()
3254                }]
3255            )
3256        });
3257        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3258            this.get_channel(channel_id.to_proto(), cx).unwrap()
3259        });
3260        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3261        channel_a
3262            .condition(&cx_a, |channel, _| {
3263                channel_messages(channel)
3264                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3265            })
3266            .await;
3267
3268        let channels_b = cx_b
3269            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3270        channels_b
3271            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3272            .await;
3273        channels_b.read_with(&cx_b, |list, _| {
3274            assert_eq!(
3275                list.available_channels().unwrap(),
3276                &[ChannelDetails {
3277                    id: channel_id.to_proto(),
3278                    name: "test-channel".to_string()
3279                }]
3280            )
3281        });
3282
3283        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3284            this.get_channel(channel_id.to_proto(), cx).unwrap()
3285        });
3286        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3287        channel_b
3288            .condition(&cx_b, |channel, _| {
3289                channel_messages(channel)
3290                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3291            })
3292            .await;
3293
3294        // Disconnect client B, ensuring we can still access its cached channel data.
3295        server.forbid_connections();
3296        server.disconnect_client(client_b.current_user_id(&cx_b));
3297        while !matches!(
3298            status_b.next().await,
3299            Some(client::Status::ReconnectionError { .. })
3300        ) {}
3301
3302        channels_b.read_with(&cx_b, |channels, _| {
3303            assert_eq!(
3304                channels.available_channels().unwrap(),
3305                [ChannelDetails {
3306                    id: channel_id.to_proto(),
3307                    name: "test-channel".to_string()
3308                }]
3309            )
3310        });
3311        channel_b.read_with(&cx_b, |channel, _| {
3312            assert_eq!(
3313                channel_messages(channel),
3314                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3315            )
3316        });
3317
3318        // Send a message from client B while it is disconnected.
3319        channel_b
3320            .update(&mut cx_b, |channel, cx| {
3321                let task = channel
3322                    .send_message("can you see this?".to_string(), cx)
3323                    .unwrap();
3324                assert_eq!(
3325                    channel_messages(channel),
3326                    &[
3327                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3328                        ("user_b".to_string(), "can you see this?".to_string(), true)
3329                    ]
3330                );
3331                task
3332            })
3333            .await
3334            .unwrap_err();
3335
3336        // Send a message from client A while B is disconnected.
3337        channel_a
3338            .update(&mut cx_a, |channel, cx| {
3339                channel
3340                    .send_message("oh, hi B.".to_string(), cx)
3341                    .unwrap()
3342                    .detach();
3343                let task = channel.send_message("sup".to_string(), cx).unwrap();
3344                assert_eq!(
3345                    channel_messages(channel),
3346                    &[
3347                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3348                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3349                        ("user_a".to_string(), "sup".to_string(), true)
3350                    ]
3351                );
3352                task
3353            })
3354            .await
3355            .unwrap();
3356
3357        // Give client B a chance to reconnect.
3358        server.allow_connections();
3359        cx_b.foreground().advance_clock(Duration::from_secs(10));
3360
3361        // Verify that B sees the new messages upon reconnection, as well as the message client B
3362        // sent while offline.
3363        channel_b
3364            .condition(&cx_b, |channel, _| {
3365                channel_messages(channel)
3366                    == [
3367                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3368                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3369                        ("user_a".to_string(), "sup".to_string(), false),
3370                        ("user_b".to_string(), "can you see this?".to_string(), false),
3371                    ]
3372            })
3373            .await;
3374
3375        // Ensure client A and B can communicate normally after reconnection.
3376        channel_a
3377            .update(&mut cx_a, |channel, cx| {
3378                channel.send_message("you online?".to_string(), cx).unwrap()
3379            })
3380            .await
3381            .unwrap();
3382        channel_b
3383            .condition(&cx_b, |channel, _| {
3384                channel_messages(channel)
3385                    == [
3386                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3387                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3388                        ("user_a".to_string(), "sup".to_string(), false),
3389                        ("user_b".to_string(), "can you see this?".to_string(), false),
3390                        ("user_a".to_string(), "you online?".to_string(), false),
3391                    ]
3392            })
3393            .await;
3394
3395        channel_b
3396            .update(&mut cx_b, |channel, cx| {
3397                channel.send_message("yep".to_string(), cx).unwrap()
3398            })
3399            .await
3400            .unwrap();
3401        channel_a
3402            .condition(&cx_a, |channel, _| {
3403                channel_messages(channel)
3404                    == [
3405                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3406                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3407                        ("user_a".to_string(), "sup".to_string(), false),
3408                        ("user_b".to_string(), "can you see this?".to_string(), false),
3409                        ("user_a".to_string(), "you online?".to_string(), false),
3410                        ("user_b".to_string(), "yep".to_string(), false),
3411                    ]
3412            })
3413            .await;
3414    }
3415
3416    #[gpui::test(iterations = 10)]
3417    async fn test_contacts(
3418        mut cx_a: TestAppContext,
3419        mut cx_b: TestAppContext,
3420        mut cx_c: TestAppContext,
3421    ) {
3422        cx_a.foreground().forbid_parking();
3423        let lang_registry = Arc::new(LanguageRegistry::new());
3424        let fs = Arc::new(FakeFs::new(cx_a.background()));
3425
3426        // Connect to a server as 3 clients.
3427        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3428        let client_a = server.create_client(&mut cx_a, "user_a").await;
3429        let client_b = server.create_client(&mut cx_b, "user_b").await;
3430        let client_c = server.create_client(&mut cx_c, "user_c").await;
3431
3432        // Share a worktree as client A.
3433        fs.insert_tree(
3434            "/a",
3435            json!({
3436                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3437            }),
3438        )
3439        .await;
3440
3441        let project_a = cx_a.update(|cx| {
3442            Project::local(
3443                client_a.clone(),
3444                client_a.user_store.clone(),
3445                lang_registry.clone(),
3446                fs.clone(),
3447                cx,
3448            )
3449        });
3450        let (worktree_a, _) = project_a
3451            .update(&mut cx_a, |p, cx| {
3452                p.find_or_create_local_worktree("/a", false, cx)
3453            })
3454            .await
3455            .unwrap();
3456        worktree_a
3457            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3458            .await;
3459
3460        client_a
3461            .user_store
3462            .condition(&cx_a, |user_store, _| {
3463                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3464            })
3465            .await;
3466        client_b
3467            .user_store
3468            .condition(&cx_b, |user_store, _| {
3469                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3470            })
3471            .await;
3472        client_c
3473            .user_store
3474            .condition(&cx_c, |user_store, _| {
3475                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3476            })
3477            .await;
3478
3479        let project_id = project_a
3480            .update(&mut cx_a, |project, _| project.next_remote_id())
3481            .await;
3482        project_a
3483            .update(&mut cx_a, |project, cx| project.share(cx))
3484            .await
3485            .unwrap();
3486
3487        let _project_b = Project::remote(
3488            project_id,
3489            client_b.clone(),
3490            client_b.user_store.clone(),
3491            lang_registry.clone(),
3492            fs.clone(),
3493            &mut cx_b.to_async(),
3494        )
3495        .await
3496        .unwrap();
3497
3498        client_a
3499            .user_store
3500            .condition(&cx_a, |user_store, _| {
3501                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3502            })
3503            .await;
3504        client_b
3505            .user_store
3506            .condition(&cx_b, |user_store, _| {
3507                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3508            })
3509            .await;
3510        client_c
3511            .user_store
3512            .condition(&cx_c, |user_store, _| {
3513                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3514            })
3515            .await;
3516
3517        project_a
3518            .condition(&cx_a, |project, _| {
3519                project.collaborators().contains_key(&client_b.peer_id)
3520            })
3521            .await;
3522
3523        cx_a.update(move |_| drop(project_a));
3524        client_a
3525            .user_store
3526            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3527            .await;
3528        client_b
3529            .user_store
3530            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3531            .await;
3532        client_c
3533            .user_store
3534            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3535            .await;
3536
3537        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3538            user_store
3539                .contacts()
3540                .iter()
3541                .map(|contact| {
3542                    let worktrees = contact
3543                        .projects
3544                        .iter()
3545                        .map(|p| {
3546                            (
3547                                p.worktree_root_names[0].as_str(),
3548                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3549                            )
3550                        })
3551                        .collect();
3552                    (contact.user.github_login.as_str(), worktrees)
3553                })
3554                .collect()
3555        }
3556    }
3557
3558    #[gpui::test(iterations = 100)]
3559    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3560        cx.foreground().forbid_parking();
3561        let max_peers = env::var("MAX_PEERS")
3562            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3563            .unwrap_or(5);
3564        let max_operations = env::var("OPERATIONS")
3565            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3566            .unwrap_or(10);
3567
3568        let rng = Rc::new(RefCell::new(rng));
3569
3570        let mut host_lang_registry = Arc::new(LanguageRegistry::new());
3571        let guest_lang_registry = Arc::new(LanguageRegistry::new());
3572
3573        // Set up a fake language server.
3574        let (mut language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3575        language_server_config.set_fake_initializer(|fake_server| {
3576            fake_server.handle_request::<lsp::request::Completion, _>(|_| {
3577                Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
3578                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3579                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3580                        new_text: "the-new-text".to_string(),
3581                    })),
3582                    ..Default::default()
3583                }]))
3584            });
3585
3586            fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
3587                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3588                    lsp::CodeAction {
3589                        title: "the-code-action".to_string(),
3590                        ..Default::default()
3591                    },
3592                )])
3593            });
3594        });
3595
3596        Arc::get_mut(&mut host_lang_registry)
3597            .unwrap()
3598            .add(Arc::new(Language::new(
3599                LanguageConfig {
3600                    name: "Rust".to_string(),
3601                    path_suffixes: vec!["rs".to_string()],
3602                    language_server: Some(language_server_config),
3603                    ..Default::default()
3604                },
3605                None,
3606            )));
3607
3608        let fs = Arc::new(FakeFs::new(cx.background()));
3609        fs.insert_tree(
3610            "/_collab",
3611            json!({
3612                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3613            }),
3614        )
3615        .await;
3616
3617        let operations = Rc::new(Cell::new(0));
3618        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
3619        let mut clients = Vec::new();
3620
3621        let mut next_entity_id = 100000;
3622        let mut host_cx = TestAppContext::new(
3623            cx.foreground_platform(),
3624            cx.platform(),
3625            cx.foreground(),
3626            cx.background(),
3627            cx.font_cache(),
3628            next_entity_id,
3629        );
3630        let host = server.create_client(&mut host_cx, "host").await;
3631        let host_project = host_cx.update(|cx| {
3632            Project::local(
3633                host.client.clone(),
3634                host.user_store.clone(),
3635                host_lang_registry.clone(),
3636                fs.clone(),
3637                cx,
3638            )
3639        });
3640        let host_project_id = host_project
3641            .update(&mut host_cx, |p, _| p.next_remote_id())
3642            .await;
3643
3644        let (collab_worktree, _) = host_project
3645            .update(&mut host_cx, |project, cx| {
3646                project.find_or_create_local_worktree("/_collab", false, cx)
3647            })
3648            .await
3649            .unwrap();
3650        collab_worktree
3651            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3652            .await;
3653        host_project
3654            .update(&mut host_cx, |project, cx| project.share(cx))
3655            .await
3656            .unwrap();
3657
3658        clients.push(cx.foreground().spawn(host.simulate_host(
3659            host_project.clone(),
3660            operations.clone(),
3661            max_operations,
3662            rng.clone(),
3663            host_cx.clone(),
3664        )));
3665
3666        while operations.get() < max_operations {
3667            cx.background().simulate_random_delay().await;
3668            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3669                operations.set(operations.get() + 1);
3670
3671                let guest_id = clients.len();
3672                log::info!("Adding guest {}", guest_id);
3673                next_entity_id += 100000;
3674                let mut guest_cx = TestAppContext::new(
3675                    cx.foreground_platform(),
3676                    cx.platform(),
3677                    cx.foreground(),
3678                    cx.background(),
3679                    cx.font_cache(),
3680                    next_entity_id,
3681                );
3682                let guest = server
3683                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3684                    .await;
3685                let guest_project = Project::remote(
3686                    host_project_id,
3687                    guest.client.clone(),
3688                    guest.user_store.clone(),
3689                    guest_lang_registry.clone(),
3690                    fs.clone(),
3691                    &mut guest_cx.to_async(),
3692                )
3693                .await
3694                .unwrap();
3695                clients.push(cx.foreground().spawn(guest.simulate_guest(
3696                    guest_id,
3697                    guest_project,
3698                    operations.clone(),
3699                    max_operations,
3700                    rng.clone(),
3701                    guest_cx,
3702                )));
3703
3704                log::info!("Guest {} added", guest_id);
3705            }
3706        }
3707
3708        let clients = futures::future::join_all(clients).await;
3709        cx.foreground().run_until_parked();
3710
3711        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3712            project
3713                .worktrees(cx)
3714                .map(|worktree| {
3715                    let snapshot = worktree.read(cx).snapshot();
3716                    (snapshot.id(), snapshot)
3717                })
3718                .collect::<BTreeMap<_, _>>()
3719        });
3720
3721        for (guest_client, guest_cx) in clients.iter().skip(1) {
3722            let guest_id = guest_client.client.id();
3723            let worktree_snapshots =
3724                guest_client
3725                    .project
3726                    .as_ref()
3727                    .unwrap()
3728                    .read_with(guest_cx, |project, cx| {
3729                        project
3730                            .worktrees(cx)
3731                            .map(|worktree| {
3732                                let worktree = worktree.read(cx);
3733                                assert!(
3734                                    !worktree.as_remote().unwrap().has_pending_updates(),
3735                                    "Guest {} worktree {:?} contains deferred updates",
3736                                    guest_id,
3737                                    worktree.id()
3738                                );
3739                                (worktree.id(), worktree.snapshot())
3740                            })
3741                            .collect::<BTreeMap<_, _>>()
3742                    });
3743
3744            assert_eq!(
3745                worktree_snapshots.keys().collect::<Vec<_>>(),
3746                host_worktree_snapshots.keys().collect::<Vec<_>>(),
3747                "guest {} has different worktrees than the host",
3748                guest_id
3749            );
3750            for (id, host_snapshot) in &host_worktree_snapshots {
3751                let guest_snapshot = &worktree_snapshots[id];
3752                assert_eq!(
3753                    guest_snapshot.root_name(),
3754                    host_snapshot.root_name(),
3755                    "guest {} has different root name than the host for worktree {}",
3756                    guest_id,
3757                    id
3758                );
3759                assert_eq!(
3760                    guest_snapshot.entries(false).collect::<Vec<_>>(),
3761                    host_snapshot.entries(false).collect::<Vec<_>>(),
3762                    "guest {} has different snapshot than the host for worktree {}",
3763                    guest_id,
3764                    id
3765                );
3766            }
3767
3768            guest_client
3769                .project
3770                .as_ref()
3771                .unwrap()
3772                .read_with(guest_cx, |project, _| {
3773                    assert!(
3774                        !project.has_buffered_operations(),
3775                        "guest {} has buffered operations ",
3776                        guest_id,
3777                    );
3778                });
3779
3780            for guest_buffer in &guest_client.buffers {
3781                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
3782                let host_buffer = host_project.read_with(&host_cx, |project, _| {
3783                    project
3784                        .shared_buffer(guest_client.peer_id, buffer_id)
3785                        .expect(&format!(
3786                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
3787                            guest_id, guest_client.peer_id, buffer_id
3788                        ))
3789                });
3790                assert_eq!(
3791                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
3792                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
3793                    "guest {} buffer {} differs from the host's buffer",
3794                    guest_id,
3795                    buffer_id,
3796                );
3797            }
3798        }
3799    }
3800
3801    struct TestServer {
3802        peer: Arc<Peer>,
3803        app_state: Arc<AppState>,
3804        server: Arc<Server>,
3805        foreground: Rc<executor::Foreground>,
3806        notifications: mpsc::UnboundedReceiver<()>,
3807        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3808        forbid_connections: Arc<AtomicBool>,
3809        _test_db: TestDb,
3810    }
3811
3812    impl TestServer {
3813        async fn start(
3814            foreground: Rc<executor::Foreground>,
3815            background: Arc<executor::Background>,
3816        ) -> Self {
3817            let test_db = TestDb::fake(background);
3818            let app_state = Self::build_app_state(&test_db).await;
3819            let peer = Peer::new();
3820            let notifications = mpsc::unbounded();
3821            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3822            Self {
3823                peer,
3824                app_state,
3825                server,
3826                foreground,
3827                notifications: notifications.1,
3828                connection_killers: Default::default(),
3829                forbid_connections: Default::default(),
3830                _test_db: test_db,
3831            }
3832        }
3833
3834        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3835            let http = FakeHttpClient::with_404_response();
3836            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3837            let client_name = name.to_string();
3838            let mut client = Client::new(http.clone());
3839            let server = self.server.clone();
3840            let connection_killers = self.connection_killers.clone();
3841            let forbid_connections = self.forbid_connections.clone();
3842            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
3843
3844            Arc::get_mut(&mut client)
3845                .unwrap()
3846                .override_authenticate(move |cx| {
3847                    cx.spawn(|_| async move {
3848                        let access_token = "the-token".to_string();
3849                        Ok(Credentials {
3850                            user_id: user_id.0 as u64,
3851                            access_token,
3852                        })
3853                    })
3854                })
3855                .override_establish_connection(move |credentials, cx| {
3856                    assert_eq!(credentials.user_id, user_id.0 as u64);
3857                    assert_eq!(credentials.access_token, "the-token");
3858
3859                    let server = server.clone();
3860                    let connection_killers = connection_killers.clone();
3861                    let forbid_connections = forbid_connections.clone();
3862                    let client_name = client_name.clone();
3863                    let connection_id_tx = connection_id_tx.clone();
3864                    cx.spawn(move |cx| async move {
3865                        if forbid_connections.load(SeqCst) {
3866                            Err(EstablishConnectionError::other(anyhow!(
3867                                "server is forbidding connections"
3868                            )))
3869                        } else {
3870                            let (client_conn, server_conn, kill_conn) =
3871                                Connection::in_memory(cx.background());
3872                            connection_killers.lock().insert(user_id, kill_conn);
3873                            cx.background()
3874                                .spawn(server.handle_connection(
3875                                    server_conn,
3876                                    client_name,
3877                                    user_id,
3878                                    Some(connection_id_tx),
3879                                    cx.background(),
3880                                ))
3881                                .detach();
3882                            Ok(client_conn)
3883                        }
3884                    })
3885                });
3886
3887            client
3888                .authenticate_and_connect(&cx.to_async())
3889                .await
3890                .unwrap();
3891
3892            Channel::init(&client);
3893            Project::init(&client);
3894
3895            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3896            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3897            let mut authed_user =
3898                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3899            while authed_user.next().await.unwrap().is_none() {}
3900
3901            TestClient {
3902                client,
3903                peer_id,
3904                user_store,
3905                project: Default::default(),
3906                buffers: Default::default(),
3907            }
3908        }
3909
3910        fn disconnect_client(&self, user_id: UserId) {
3911            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3912                let _ = kill_conn.try_send(Some(()));
3913            }
3914        }
3915
3916        fn forbid_connections(&self) {
3917            self.forbid_connections.store(true, SeqCst);
3918        }
3919
3920        fn allow_connections(&self) {
3921            self.forbid_connections.store(false, SeqCst);
3922        }
3923
3924        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3925            let mut config = Config::default();
3926            config.session_secret = "a".repeat(32);
3927            config.database_url = test_db.url.clone();
3928            let github_client = github::AppClient::test();
3929            Arc::new(AppState {
3930                db: test_db.db().clone(),
3931                handlebars: Default::default(),
3932                auth_client: auth::build_client("", ""),
3933                repo_client: github::RepoClient::test(&github_client),
3934                github_client,
3935                config,
3936            })
3937        }
3938
3939        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3940            self.server.store.read()
3941        }
3942
3943        async fn condition<F>(&mut self, mut predicate: F)
3944        where
3945            F: FnMut(&Store) -> bool,
3946        {
3947            async_std::future::timeout(Duration::from_millis(500), async {
3948                while !(predicate)(&*self.server.store.read()) {
3949                    self.foreground.start_waiting();
3950                    self.notifications.next().await;
3951                    self.foreground.finish_waiting();
3952                }
3953            })
3954            .await
3955            .expect("condition timed out");
3956        }
3957    }
3958
3959    impl Drop for TestServer {
3960        fn drop(&mut self) {
3961            self.peer.reset();
3962        }
3963    }
3964
3965    struct TestClient {
3966        client: Arc<Client>,
3967        pub peer_id: PeerId,
3968        pub user_store: ModelHandle<UserStore>,
3969        project: Option<ModelHandle<Project>>,
3970        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3971    }
3972
3973    impl Deref for TestClient {
3974        type Target = Arc<Client>;
3975
3976        fn deref(&self) -> &Self::Target {
3977            &self.client
3978        }
3979    }
3980
3981    impl TestClient {
3982        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3983            UserId::from_proto(
3984                self.user_store
3985                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3986            )
3987        }
3988
3989        async fn simulate_host(
3990            mut self,
3991            project: ModelHandle<Project>,
3992            operations: Rc<Cell<usize>>,
3993            max_operations: usize,
3994            rng: Rc<RefCell<StdRng>>,
3995            mut cx: TestAppContext,
3996        ) -> (Self, TestAppContext) {
3997            let fs = project.read_with(&cx, |project, _| project.fs().clone());
3998            let mut files: Vec<PathBuf> = Default::default();
3999            while operations.get() < max_operations {
4000                operations.set(operations.get() + 1);
4001
4002                let distribution = rng.borrow_mut().gen_range(0..100);
4003                match distribution {
4004                    0..=20 if !files.is_empty() => {
4005                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
4006                        while let Some(parent_path) = path.parent() {
4007                            path = parent_path;
4008                            if rng.borrow_mut().gen() {
4009                                break;
4010                            }
4011                        }
4012
4013                        log::info!("Host: find/create local worktree {:?}", path);
4014                        project
4015                            .update(&mut cx, |project, cx| {
4016                                project.find_or_create_local_worktree(path, false, cx)
4017                            })
4018                            .await
4019                            .unwrap();
4020                    }
4021                    10..=80 if !files.is_empty() => {
4022                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4023                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4024                            let (worktree, path) = project
4025                                .update(&mut cx, |project, cx| {
4026                                    project.find_or_create_local_worktree(file, false, cx)
4027                                })
4028                                .await
4029                                .unwrap();
4030                            let project_path =
4031                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4032                            log::info!("Host: opening path {:?}", project_path);
4033                            let buffer = project
4034                                .update(&mut cx, |project, cx| {
4035                                    project.open_buffer(project_path, cx)
4036                                })
4037                                .await
4038                                .unwrap();
4039                            self.buffers.insert(buffer.clone());
4040                            buffer
4041                        } else {
4042                            self.buffers
4043                                .iter()
4044                                .choose(&mut *rng.borrow_mut())
4045                                .unwrap()
4046                                .clone()
4047                        };
4048
4049                        if rng.borrow_mut().gen_bool(0.1) {
4050                            cx.update(|cx| {
4051                                log::info!(
4052                                    "Host: dropping buffer {:?}",
4053                                    buffer.read(cx).file().unwrap().full_path(cx)
4054                                );
4055                                self.buffers.remove(&buffer);
4056                                drop(buffer);
4057                            });
4058                        } else {
4059                            buffer.update(&mut cx, |buffer, cx| {
4060                                log::info!(
4061                                    "Host: updating buffer {:?}",
4062                                    buffer.file().unwrap().full_path(cx)
4063                                );
4064                                buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4065                            });
4066                        }
4067                    }
4068                    _ => loop {
4069                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
4070                        let mut path = PathBuf::new();
4071                        path.push("/");
4072                        for _ in 0..path_component_count {
4073                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4074                            path.push(std::str::from_utf8(&[letter]).unwrap());
4075                        }
4076                        path.set_extension("rs");
4077                        let parent_path = path.parent().unwrap();
4078
4079                        log::info!("Host: creating file {:?}", path);
4080                        if fs.create_dir(&parent_path).await.is_ok()
4081                            && fs.create_file(&path, Default::default()).await.is_ok()
4082                        {
4083                            files.push(path);
4084                            break;
4085                        } else {
4086                            log::info!("Host: cannot create file");
4087                        }
4088                    },
4089                }
4090
4091                cx.background().simulate_random_delay().await;
4092            }
4093
4094            self.project = Some(project);
4095            (self, cx)
4096        }
4097
4098        pub async fn simulate_guest(
4099            mut self,
4100            guest_id: usize,
4101            project: ModelHandle<Project>,
4102            operations: Rc<Cell<usize>>,
4103            max_operations: usize,
4104            rng: Rc<RefCell<StdRng>>,
4105            mut cx: TestAppContext,
4106        ) -> (Self, TestAppContext) {
4107            while operations.get() < max_operations {
4108                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4109                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4110                        project
4111                            .worktrees(&cx)
4112                            .filter(|worktree| {
4113                                worktree.read(cx).entries(false).any(|e| e.is_file())
4114                            })
4115                            .choose(&mut *rng.borrow_mut())
4116                    }) {
4117                        worktree
4118                    } else {
4119                        cx.background().simulate_random_delay().await;
4120                        continue;
4121                    };
4122
4123                    operations.set(operations.get() + 1);
4124                    let project_path = worktree.read_with(&cx, |worktree, _| {
4125                        let entry = worktree
4126                            .entries(false)
4127                            .filter(|e| e.is_file())
4128                            .choose(&mut *rng.borrow_mut())
4129                            .unwrap();
4130                        (worktree.id(), entry.path.clone())
4131                    });
4132                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4133                    let buffer = project
4134                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4135                        .await
4136                        .unwrap();
4137                    self.buffers.insert(buffer.clone());
4138                    buffer
4139                } else {
4140                    operations.set(operations.get() + 1);
4141
4142                    self.buffers
4143                        .iter()
4144                        .choose(&mut *rng.borrow_mut())
4145                        .unwrap()
4146                        .clone()
4147                };
4148
4149                let choice = rng.borrow_mut().gen_range(0..100);
4150                match choice {
4151                    0..=9 => {
4152                        cx.update(|cx| {
4153                            log::info!(
4154                                "Guest {}: dropping buffer {:?}",
4155                                guest_id,
4156                                buffer.read(cx).file().unwrap().full_path(cx)
4157                            );
4158                            self.buffers.remove(&buffer);
4159                            drop(buffer);
4160                        });
4161                    }
4162                    10..=19 => {
4163                        let completions = project.update(&mut cx, |project, cx| {
4164                            log::info!(
4165                                "Guest {}: requesting completions for buffer {:?}",
4166                                guest_id,
4167                                buffer.read(cx).file().unwrap().full_path(cx)
4168                            );
4169                            let offset = rng.borrow_mut().gen_range(0..=buffer.read(cx).len());
4170                            project.completions(&buffer, offset, cx)
4171                        });
4172                        let completions = cx.background().spawn(async move {
4173                            completions.await.expect("completions request failed");
4174                        });
4175                        if rng.borrow_mut().gen_bool(0.3) {
4176                            log::info!("Guest {}: detaching completions request", guest_id);
4177                            completions.detach();
4178                        } else {
4179                            completions.await;
4180                        }
4181                    }
4182                    20..=29 => {
4183                        let code_actions = project.update(&mut cx, |project, cx| {
4184                            log::info!(
4185                                "Guest {}: requesting code actions for buffer {:?}",
4186                                guest_id,
4187                                buffer.read(cx).file().unwrap().full_path(cx)
4188                            );
4189                            let range =
4190                                buffer.read(cx).random_byte_range(0, &mut *rng.borrow_mut());
4191                            project.code_actions(&buffer, range, cx)
4192                        });
4193                        let code_actions = cx.background().spawn(async move {
4194                            code_actions.await.expect("code actions request failed");
4195                        });
4196                        if rng.borrow_mut().gen_bool(0.3) {
4197                            log::info!("Guest {}: detaching code actions request", guest_id);
4198                            code_actions.detach();
4199                        } else {
4200                            code_actions.await;
4201                        }
4202                    }
4203                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4204                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4205                            log::info!(
4206                                "Guest {}: saving buffer {:?}",
4207                                guest_id,
4208                                buffer.file().unwrap().full_path(cx)
4209                            );
4210                            (buffer.version(), buffer.save(cx))
4211                        });
4212                        let save = cx.spawn(|cx| async move {
4213                            let (saved_version, _) = save.await.expect("save request failed");
4214                            buffer.read_with(&cx, |buffer, _| {
4215                                assert!(buffer.version().observed_all(&saved_version));
4216                                assert!(saved_version.observed_all(&requested_version));
4217                            });
4218                        });
4219                        if rng.borrow_mut().gen_bool(0.3) {
4220                            log::info!("Guest {}: detaching save request", guest_id);
4221                            save.detach();
4222                        } else {
4223                            save.await;
4224                        }
4225                    }
4226                    _ => {
4227                        buffer.update(&mut cx, |buffer, cx| {
4228                            log::info!(
4229                                "Guest {}: updating buffer {:?}",
4230                                guest_id,
4231                                buffer.file().unwrap().full_path(cx)
4232                            );
4233                            buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4234                        });
4235                    }
4236                }
4237                cx.background().simulate_random_delay().await;
4238            }
4239
4240            self.project = Some(project);
4241            (self, cx)
4242        }
4243    }
4244
4245    impl Executor for Arc<gpui::executor::Background> {
4246        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4247            self.spawn(future).detach();
4248        }
4249    }
4250
4251    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4252        channel
4253            .messages()
4254            .cursor::<()>()
4255            .map(|m| {
4256                (
4257                    m.sender.github_login.clone(),
4258                    m.body.clone(),
4259                    m.is_pending(),
4260                )
4261            })
4262            .collect()
4263    }
4264
4265    struct EmptyView;
4266
4267    impl gpui::Entity for EmptyView {
4268        type Event = ();
4269    }
4270
4271    impl gpui::View for EmptyView {
4272        fn ui_name() -> &'static str {
4273            "empty view"
4274        }
4275
4276        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4277            gpui::Element::boxed(gpui::elements::Empty)
4278        }
4279    }
4280}