rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::open_buffer)
  83            .add_message_handler(Server::close_buffer)
  84            .add_request_handler(Server::update_buffer)
  85            .add_message_handler(Server::update_buffer_file)
  86            .add_message_handler(Server::buffer_reloaded)
  87            .add_message_handler(Server::buffer_saved)
  88            .add_request_handler(Server::save_buffer)
  89            .add_request_handler(Server::format_buffers)
  90            .add_request_handler(Server::get_completions)
  91            .add_request_handler(Server::apply_additional_edits_for_completion)
  92            .add_request_handler(Server::get_code_actions)
  93            .add_request_handler(Server::apply_code_action)
  94            .add_request_handler(Server::get_channels)
  95            .add_request_handler(Server::get_users)
  96            .add_request_handler(Server::join_channel)
  97            .add_message_handler(Server::leave_channel)
  98            .add_request_handler(Server::send_channel_message)
  99            .add_request_handler(Server::get_channel_messages);
 100
 101        Arc::new(server)
 102    }
 103
 104    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 105    where
 106        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 107        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 108        M: EnvelopedMessage,
 109    {
 110        let prev_handler = self.handlers.insert(
 111            TypeId::of::<M>(),
 112            Box::new(move |server, envelope| {
 113                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 114                (handler)(server, *envelope).boxed()
 115            }),
 116        );
 117        if prev_handler.is_some() {
 118            panic!("registered a handler for the same message twice");
 119        }
 120        self
 121    }
 122
 123    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 124    where
 125        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 126        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 127        M: RequestMessage,
 128    {
 129        self.add_message_handler(move |server, envelope| {
 130            let receipt = envelope.receipt();
 131            let response = (handler)(server.clone(), envelope);
 132            async move {
 133                match response.await {
 134                    Ok(response) => {
 135                        server.peer.respond(receipt, response)?;
 136                        Ok(())
 137                    }
 138                    Err(error) => {
 139                        server.peer.respond_with_error(
 140                            receipt,
 141                            proto::Error {
 142                                message: error.to_string(),
 143                            },
 144                        )?;
 145                        Err(error)
 146                    }
 147                }
 148            }
 149        })
 150    }
 151
 152    pub fn handle_connection<E: Executor>(
 153        self: &Arc<Self>,
 154        connection: Connection,
 155        addr: String,
 156        user_id: UserId,
 157        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 158        executor: E,
 159    ) -> impl Future<Output = ()> {
 160        let mut this = self.clone();
 161        async move {
 162            let (connection_id, handle_io, mut incoming_rx) =
 163                this.peer.add_connection(connection).await;
 164
 165            if let Some(send_connection_id) = send_connection_id.as_mut() {
 166                let _ = send_connection_id.send(connection_id).await;
 167            }
 168
 169            this.state_mut().add_connection(connection_id, user_id);
 170            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 171                log::error!("error updating contacts for {:?}: {}", user_id, err);
 172            }
 173
 174            let handle_io = handle_io.fuse();
 175            futures::pin_mut!(handle_io);
 176            loop {
 177                let next_message = incoming_rx.next().fuse();
 178                futures::pin_mut!(next_message);
 179                futures::select_biased! {
 180                    result = handle_io => {
 181                        if let Err(err) = result {
 182                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 183                        }
 184                        break;
 185                    }
 186                    message = next_message => {
 187                        if let Some(message) = message {
 188                            let start_time = Instant::now();
 189                            let type_name = message.payload_type_name();
 190                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 191                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 192                                let notifications = this.notifications.clone();
 193                                let is_background = message.is_background();
 194                                let handle_message = (handler)(this.clone(), message);
 195                                let handle_message = async move {
 196                                    if let Err(err) = handle_message.await {
 197                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 198                                    } else {
 199                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 200                                    }
 201                                    if let Some(mut notifications) = notifications {
 202                                        let _ = notifications.send(()).await;
 203                                    }
 204                                };
 205                                if is_background {
 206                                    executor.spawn_detached(handle_message);
 207                                } else {
 208                                    handle_message.await;
 209                                }
 210                            } else {
 211                                log::warn!("unhandled message: {}", type_name);
 212                            }
 213                        } else {
 214                            log::info!("rpc connection closed {:?}", addr);
 215                            break;
 216                        }
 217                    }
 218                }
 219            }
 220
 221            if let Err(err) = this.sign_out(connection_id).await {
 222                log::error!("error signing out connection {:?} - {:?}", addr, err);
 223            }
 224        }
 225    }
 226
 227    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 228        self.peer.disconnect(connection_id);
 229        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 230
 231        for (project_id, project) in removed_connection.hosted_projects {
 232            if let Some(share) = project.share {
 233                broadcast(
 234                    connection_id,
 235                    share.guests.keys().copied().collect(),
 236                    |conn_id| {
 237                        self.peer
 238                            .send(conn_id, proto::UnshareProject { project_id })
 239                    },
 240                )?;
 241            }
 242        }
 243
 244        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 245            broadcast(connection_id, peer_ids, |conn_id| {
 246                self.peer.send(
 247                    conn_id,
 248                    proto::RemoveProjectCollaborator {
 249                        project_id,
 250                        peer_id: connection_id.0,
 251                    },
 252                )
 253            })?;
 254        }
 255
 256        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 257        Ok(())
 258    }
 259
 260    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 261        Ok(proto::Ack {})
 262    }
 263
 264    async fn register_project(
 265        mut self: Arc<Server>,
 266        request: TypedEnvelope<proto::RegisterProject>,
 267    ) -> tide::Result<proto::RegisterProjectResponse> {
 268        let project_id = {
 269            let mut state = self.state_mut();
 270            let user_id = state.user_id_for_connection(request.sender_id)?;
 271            state.register_project(request.sender_id, user_id)
 272        };
 273        Ok(proto::RegisterProjectResponse { project_id })
 274    }
 275
 276    async fn unregister_project(
 277        mut self: Arc<Server>,
 278        request: TypedEnvelope<proto::UnregisterProject>,
 279    ) -> tide::Result<()> {
 280        let project = self
 281            .state_mut()
 282            .unregister_project(request.payload.project_id, request.sender_id)?;
 283        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 284        Ok(())
 285    }
 286
 287    async fn share_project(
 288        mut self: Arc<Server>,
 289        request: TypedEnvelope<proto::ShareProject>,
 290    ) -> tide::Result<proto::Ack> {
 291        self.state_mut()
 292            .share_project(request.payload.project_id, request.sender_id);
 293        Ok(proto::Ack {})
 294    }
 295
 296    async fn unshare_project(
 297        mut self: Arc<Server>,
 298        request: TypedEnvelope<proto::UnshareProject>,
 299    ) -> tide::Result<()> {
 300        let project_id = request.payload.project_id;
 301        let project = self
 302            .state_mut()
 303            .unshare_project(project_id, request.sender_id)?;
 304
 305        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 306            self.peer
 307                .send(conn_id, proto::UnshareProject { project_id })
 308        })?;
 309        self.update_contacts_for_users(&project.authorized_user_ids)?;
 310        Ok(())
 311    }
 312
 313    async fn join_project(
 314        mut self: Arc<Server>,
 315        request: TypedEnvelope<proto::JoinProject>,
 316    ) -> tide::Result<proto::JoinProjectResponse> {
 317        let project_id = request.payload.project_id;
 318
 319        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 320        let (response, connection_ids, contact_user_ids) = self
 321            .state_mut()
 322            .join_project(request.sender_id, user_id, project_id)
 323            .and_then(|joined| {
 324                let share = joined.project.share()?;
 325                let peer_count = share.guests.len();
 326                let mut collaborators = Vec::with_capacity(peer_count);
 327                collaborators.push(proto::Collaborator {
 328                    peer_id: joined.project.host_connection_id.0,
 329                    replica_id: 0,
 330                    user_id: joined.project.host_user_id.to_proto(),
 331                });
 332                let worktrees = joined
 333                    .project
 334                    .worktrees
 335                    .iter()
 336                    .filter_map(|(id, worktree)| {
 337                        worktree.share.as_ref().map(|share| proto::Worktree {
 338                            id: *id,
 339                            root_name: worktree.root_name.clone(),
 340                            entries: share.entries.values().cloned().collect(),
 341                            diagnostic_summaries: share
 342                                .diagnostic_summaries
 343                                .values()
 344                                .cloned()
 345                                .collect(),
 346                            weak: worktree.weak,
 347                            next_update_id: share.next_update_id as u64,
 348                        })
 349                    })
 350                    .collect();
 351                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 352                    if *peer_conn_id != request.sender_id {
 353                        collaborators.push(proto::Collaborator {
 354                            peer_id: peer_conn_id.0,
 355                            replica_id: *peer_replica_id as u32,
 356                            user_id: peer_user_id.to_proto(),
 357                        });
 358                    }
 359                }
 360                let response = proto::JoinProjectResponse {
 361                    worktrees,
 362                    replica_id: joined.replica_id as u32,
 363                    collaborators,
 364                };
 365                let connection_ids = joined.project.connection_ids();
 366                let contact_user_ids = joined.project.authorized_user_ids();
 367                Ok((response, connection_ids, contact_user_ids))
 368            })?;
 369
 370        broadcast(request.sender_id, connection_ids, |conn_id| {
 371            self.peer.send(
 372                conn_id,
 373                proto::AddProjectCollaborator {
 374                    project_id,
 375                    collaborator: Some(proto::Collaborator {
 376                        peer_id: request.sender_id.0,
 377                        replica_id: response.replica_id,
 378                        user_id: user_id.to_proto(),
 379                    }),
 380                },
 381            )
 382        })?;
 383        self.update_contacts_for_users(&contact_user_ids)?;
 384        Ok(response)
 385    }
 386
 387    async fn leave_project(
 388        mut self: Arc<Server>,
 389        request: TypedEnvelope<proto::LeaveProject>,
 390    ) -> tide::Result<()> {
 391        let sender_id = request.sender_id;
 392        let project_id = request.payload.project_id;
 393        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 394
 395        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 396            self.peer.send(
 397                conn_id,
 398                proto::RemoveProjectCollaborator {
 399                    project_id,
 400                    peer_id: sender_id.0,
 401                },
 402            )
 403        })?;
 404        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 405
 406        Ok(())
 407    }
 408
 409    async fn register_worktree(
 410        mut self: Arc<Server>,
 411        request: TypedEnvelope<proto::RegisterWorktree>,
 412    ) -> tide::Result<proto::Ack> {
 413        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 414
 415        let mut contact_user_ids = HashSet::default();
 416        contact_user_ids.insert(host_user_id);
 417        for github_login in request.payload.authorized_logins {
 418            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 419            contact_user_ids.insert(contact_user_id);
 420        }
 421
 422        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 423        self.state_mut().register_worktree(
 424            request.payload.project_id,
 425            request.payload.worktree_id,
 426            request.sender_id,
 427            Worktree {
 428                authorized_user_ids: contact_user_ids.clone(),
 429                root_name: request.payload.root_name,
 430                share: None,
 431                weak: false,
 432            },
 433        )?;
 434        self.update_contacts_for_users(&contact_user_ids)?;
 435        Ok(proto::Ack {})
 436    }
 437
 438    async fn unregister_worktree(
 439        mut self: Arc<Server>,
 440        request: TypedEnvelope<proto::UnregisterWorktree>,
 441    ) -> tide::Result<()> {
 442        let project_id = request.payload.project_id;
 443        let worktree_id = request.payload.worktree_id;
 444        let (worktree, guest_connection_ids) =
 445            self.state_mut()
 446                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 447        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 448            self.peer.send(
 449                conn_id,
 450                proto::UnregisterWorktree {
 451                    project_id,
 452                    worktree_id,
 453                },
 454            )
 455        })?;
 456        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 457        Ok(())
 458    }
 459
 460    async fn share_worktree(
 461        mut self: Arc<Server>,
 462        mut request: TypedEnvelope<proto::ShareWorktree>,
 463    ) -> tide::Result<proto::Ack> {
 464        let worktree = request
 465            .payload
 466            .worktree
 467            .as_mut()
 468            .ok_or_else(|| anyhow!("missing worktree"))?;
 469        let entries = worktree
 470            .entries
 471            .iter()
 472            .map(|entry| (entry.id, entry.clone()))
 473            .collect();
 474        let diagnostic_summaries = worktree
 475            .diagnostic_summaries
 476            .iter()
 477            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 478            .collect();
 479
 480        let shared_worktree = self.state_mut().share_worktree(
 481            request.payload.project_id,
 482            worktree.id,
 483            request.sender_id,
 484            entries,
 485            diagnostic_summaries,
 486            worktree.next_update_id,
 487        )?;
 488
 489        broadcast(
 490            request.sender_id,
 491            shared_worktree.connection_ids,
 492            |connection_id| {
 493                self.peer
 494                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 495            },
 496        )?;
 497        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 498
 499        Ok(proto::Ack {})
 500    }
 501
 502    async fn update_worktree(
 503        mut self: Arc<Server>,
 504        request: TypedEnvelope<proto::UpdateWorktree>,
 505    ) -> tide::Result<proto::Ack> {
 506        let connection_ids = self.state_mut().update_worktree(
 507            request.sender_id,
 508            request.payload.project_id,
 509            request.payload.worktree_id,
 510            request.payload.id,
 511            &request.payload.removed_entries,
 512            &request.payload.updated_entries,
 513        )?;
 514
 515        broadcast(request.sender_id, connection_ids, |connection_id| {
 516            self.peer
 517                .forward_send(request.sender_id, connection_id, request.payload.clone())
 518        })?;
 519
 520        Ok(proto::Ack {})
 521    }
 522
 523    async fn update_diagnostic_summary(
 524        mut self: Arc<Server>,
 525        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 526    ) -> tide::Result<()> {
 527        let summary = request
 528            .payload
 529            .summary
 530            .clone()
 531            .ok_or_else(|| anyhow!("invalid summary"))?;
 532        let receiver_ids = self.state_mut().update_diagnostic_summary(
 533            request.payload.project_id,
 534            request.payload.worktree_id,
 535            request.sender_id,
 536            summary,
 537        )?;
 538
 539        broadcast(request.sender_id, receiver_ids, |connection_id| {
 540            self.peer
 541                .forward_send(request.sender_id, connection_id, request.payload.clone())
 542        })?;
 543        Ok(())
 544    }
 545
 546    async fn disk_based_diagnostics_updating(
 547        self: Arc<Server>,
 548        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 549    ) -> tide::Result<()> {
 550        let receiver_ids = self
 551            .state()
 552            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 553        broadcast(request.sender_id, receiver_ids, |connection_id| {
 554            self.peer
 555                .forward_send(request.sender_id, connection_id, request.payload.clone())
 556        })?;
 557        Ok(())
 558    }
 559
 560    async fn disk_based_diagnostics_updated(
 561        self: Arc<Server>,
 562        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 563    ) -> tide::Result<()> {
 564        let receiver_ids = self
 565            .state()
 566            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 567        broadcast(request.sender_id, receiver_ids, |connection_id| {
 568            self.peer
 569                .forward_send(request.sender_id, connection_id, request.payload.clone())
 570        })?;
 571        Ok(())
 572    }
 573
 574    async fn get_definition(
 575        self: Arc<Server>,
 576        request: TypedEnvelope<proto::GetDefinition>,
 577    ) -> tide::Result<proto::GetDefinitionResponse> {
 578        let host_connection_id = self
 579            .state()
 580            .read_project(request.payload.project_id, request.sender_id)?
 581            .host_connection_id;
 582        Ok(self
 583            .peer
 584            .forward_request(request.sender_id, host_connection_id, request.payload)
 585            .await?)
 586    }
 587
 588    async fn open_buffer(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::OpenBuffer>,
 591    ) -> tide::Result<proto::OpenBufferResponse> {
 592        let host_connection_id = self
 593            .state()
 594            .read_project(request.payload.project_id, request.sender_id)?
 595            .host_connection_id;
 596        Ok(self
 597            .peer
 598            .forward_request(request.sender_id, host_connection_id, request.payload)
 599            .await?)
 600    }
 601
 602    async fn close_buffer(
 603        self: Arc<Server>,
 604        request: TypedEnvelope<proto::CloseBuffer>,
 605    ) -> tide::Result<()> {
 606        let host_connection_id = self
 607            .state()
 608            .read_project(request.payload.project_id, request.sender_id)?
 609            .host_connection_id;
 610        self.peer
 611            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 612        Ok(())
 613    }
 614
 615    async fn save_buffer(
 616        self: Arc<Server>,
 617        request: TypedEnvelope<proto::SaveBuffer>,
 618    ) -> tide::Result<proto::BufferSaved> {
 619        let host;
 620        let mut guests;
 621        {
 622            let state = self.state();
 623            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 624            host = project.host_connection_id;
 625            guests = project.guest_connection_ids()
 626        }
 627
 628        let response = self
 629            .peer
 630            .forward_request(request.sender_id, host, request.payload.clone())
 631            .await?;
 632
 633        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 634        broadcast(host, guests, |conn_id| {
 635            self.peer.forward_send(host, conn_id, response.clone())
 636        })?;
 637
 638        Ok(response)
 639    }
 640
 641    async fn format_buffers(
 642        self: Arc<Server>,
 643        request: TypedEnvelope<proto::FormatBuffers>,
 644    ) -> tide::Result<proto::FormatBuffersResponse> {
 645        let host = self
 646            .state()
 647            .read_project(request.payload.project_id, request.sender_id)?
 648            .host_connection_id;
 649        Ok(self
 650            .peer
 651            .forward_request(request.sender_id, host, request.payload.clone())
 652            .await?)
 653    }
 654
 655    async fn get_completions(
 656        self: Arc<Server>,
 657        request: TypedEnvelope<proto::GetCompletions>,
 658    ) -> tide::Result<proto::GetCompletionsResponse> {
 659        let host = self
 660            .state()
 661            .read_project(request.payload.project_id, request.sender_id)?
 662            .host_connection_id;
 663        Ok(self
 664            .peer
 665            .forward_request(request.sender_id, host, request.payload.clone())
 666            .await?)
 667    }
 668
 669    async fn apply_additional_edits_for_completion(
 670        self: Arc<Server>,
 671        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 672    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 673        let host = self
 674            .state()
 675            .read_project(request.payload.project_id, request.sender_id)?
 676            .host_connection_id;
 677        Ok(self
 678            .peer
 679            .forward_request(request.sender_id, host, request.payload.clone())
 680            .await?)
 681    }
 682
 683    async fn get_code_actions(
 684        self: Arc<Server>,
 685        request: TypedEnvelope<proto::GetCodeActions>,
 686    ) -> tide::Result<proto::GetCodeActionsResponse> {
 687        let host = self
 688            .state()
 689            .read_project(request.payload.project_id, request.sender_id)?
 690            .host_connection_id;
 691        Ok(self
 692            .peer
 693            .forward_request(request.sender_id, host, request.payload.clone())
 694            .await?)
 695    }
 696
 697    async fn apply_code_action(
 698        self: Arc<Server>,
 699        request: TypedEnvelope<proto::ApplyCodeAction>,
 700    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 701        let host = self
 702            .state()
 703            .read_project(request.payload.project_id, request.sender_id)?
 704            .host_connection_id;
 705        Ok(self
 706            .peer
 707            .forward_request(request.sender_id, host, request.payload.clone())
 708            .await?)
 709    }
 710
 711    async fn update_buffer(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<proto::UpdateBuffer>,
 714    ) -> tide::Result<proto::Ack> {
 715        let receiver_ids = self
 716            .state()
 717            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 718        broadcast(request.sender_id, receiver_ids, |connection_id| {
 719            self.peer
 720                .forward_send(request.sender_id, connection_id, request.payload.clone())
 721        })?;
 722        Ok(proto::Ack {})
 723    }
 724
 725    async fn update_buffer_file(
 726        self: Arc<Server>,
 727        request: TypedEnvelope<proto::UpdateBufferFile>,
 728    ) -> tide::Result<()> {
 729        let receiver_ids = self
 730            .state()
 731            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 732        broadcast(request.sender_id, receiver_ids, |connection_id| {
 733            self.peer
 734                .forward_send(request.sender_id, connection_id, request.payload.clone())
 735        })?;
 736        Ok(())
 737    }
 738
 739    async fn buffer_reloaded(
 740        self: Arc<Server>,
 741        request: TypedEnvelope<proto::BufferReloaded>,
 742    ) -> tide::Result<()> {
 743        let receiver_ids = self
 744            .state()
 745            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 746        broadcast(request.sender_id, receiver_ids, |connection_id| {
 747            self.peer
 748                .forward_send(request.sender_id, connection_id, request.payload.clone())
 749        })?;
 750        Ok(())
 751    }
 752
 753    async fn buffer_saved(
 754        self: Arc<Server>,
 755        request: TypedEnvelope<proto::BufferSaved>,
 756    ) -> tide::Result<()> {
 757        let receiver_ids = self
 758            .state()
 759            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 760        broadcast(request.sender_id, receiver_ids, |connection_id| {
 761            self.peer
 762                .forward_send(request.sender_id, connection_id, request.payload.clone())
 763        })?;
 764        Ok(())
 765    }
 766
 767    async fn get_channels(
 768        self: Arc<Server>,
 769        request: TypedEnvelope<proto::GetChannels>,
 770    ) -> tide::Result<proto::GetChannelsResponse> {
 771        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 772        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 773        Ok(proto::GetChannelsResponse {
 774            channels: channels
 775                .into_iter()
 776                .map(|chan| proto::Channel {
 777                    id: chan.id.to_proto(),
 778                    name: chan.name,
 779                })
 780                .collect(),
 781        })
 782    }
 783
 784    async fn get_users(
 785        self: Arc<Server>,
 786        request: TypedEnvelope<proto::GetUsers>,
 787    ) -> tide::Result<proto::GetUsersResponse> {
 788        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 789        let users = self
 790            .app_state
 791            .db
 792            .get_users_by_ids(user_ids)
 793            .await?
 794            .into_iter()
 795            .map(|user| proto::User {
 796                id: user.id.to_proto(),
 797                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 798                github_login: user.github_login,
 799            })
 800            .collect();
 801        Ok(proto::GetUsersResponse { users })
 802    }
 803
 804    fn update_contacts_for_users<'a>(
 805        self: &Arc<Server>,
 806        user_ids: impl IntoIterator<Item = &'a UserId>,
 807    ) -> anyhow::Result<()> {
 808        let mut result = Ok(());
 809        let state = self.state();
 810        for user_id in user_ids {
 811            let contacts = state.contacts_for_user(*user_id);
 812            for connection_id in state.connection_ids_for_user(*user_id) {
 813                if let Err(error) = self.peer.send(
 814                    connection_id,
 815                    proto::UpdateContacts {
 816                        contacts: contacts.clone(),
 817                    },
 818                ) {
 819                    result = Err(error);
 820                }
 821            }
 822        }
 823        result
 824    }
 825
 826    async fn join_channel(
 827        mut self: Arc<Self>,
 828        request: TypedEnvelope<proto::JoinChannel>,
 829    ) -> tide::Result<proto::JoinChannelResponse> {
 830        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 831        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 832        if !self
 833            .app_state
 834            .db
 835            .can_user_access_channel(user_id, channel_id)
 836            .await?
 837        {
 838            Err(anyhow!("access denied"))?;
 839        }
 840
 841        self.state_mut().join_channel(request.sender_id, channel_id);
 842        let messages = self
 843            .app_state
 844            .db
 845            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 846            .await?
 847            .into_iter()
 848            .map(|msg| proto::ChannelMessage {
 849                id: msg.id.to_proto(),
 850                body: msg.body,
 851                timestamp: msg.sent_at.unix_timestamp() as u64,
 852                sender_id: msg.sender_id.to_proto(),
 853                nonce: Some(msg.nonce.as_u128().into()),
 854            })
 855            .collect::<Vec<_>>();
 856        Ok(proto::JoinChannelResponse {
 857            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 858            messages,
 859        })
 860    }
 861
 862    async fn leave_channel(
 863        mut self: Arc<Self>,
 864        request: TypedEnvelope<proto::LeaveChannel>,
 865    ) -> tide::Result<()> {
 866        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 867        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 868        if !self
 869            .app_state
 870            .db
 871            .can_user_access_channel(user_id, channel_id)
 872            .await?
 873        {
 874            Err(anyhow!("access denied"))?;
 875        }
 876
 877        self.state_mut()
 878            .leave_channel(request.sender_id, channel_id);
 879
 880        Ok(())
 881    }
 882
 883    async fn send_channel_message(
 884        self: Arc<Self>,
 885        request: TypedEnvelope<proto::SendChannelMessage>,
 886    ) -> tide::Result<proto::SendChannelMessageResponse> {
 887        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 888        let user_id;
 889        let connection_ids;
 890        {
 891            let state = self.state();
 892            user_id = state.user_id_for_connection(request.sender_id)?;
 893            connection_ids = state.channel_connection_ids(channel_id)?;
 894        }
 895
 896        // Validate the message body.
 897        let body = request.payload.body.trim().to_string();
 898        if body.len() > MAX_MESSAGE_LEN {
 899            return Err(anyhow!("message is too long"))?;
 900        }
 901        if body.is_empty() {
 902            return Err(anyhow!("message can't be blank"))?;
 903        }
 904
 905        let timestamp = OffsetDateTime::now_utc();
 906        let nonce = request
 907            .payload
 908            .nonce
 909            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 910
 911        let message_id = self
 912            .app_state
 913            .db
 914            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 915            .await?
 916            .to_proto();
 917        let message = proto::ChannelMessage {
 918            sender_id: user_id.to_proto(),
 919            id: message_id,
 920            body,
 921            timestamp: timestamp.unix_timestamp() as u64,
 922            nonce: Some(nonce),
 923        };
 924        broadcast(request.sender_id, connection_ids, |conn_id| {
 925            self.peer.send(
 926                conn_id,
 927                proto::ChannelMessageSent {
 928                    channel_id: channel_id.to_proto(),
 929                    message: Some(message.clone()),
 930                },
 931            )
 932        })?;
 933        Ok(proto::SendChannelMessageResponse {
 934            message: Some(message),
 935        })
 936    }
 937
 938    async fn get_channel_messages(
 939        self: Arc<Self>,
 940        request: TypedEnvelope<proto::GetChannelMessages>,
 941    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 942        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 943        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 944        if !self
 945            .app_state
 946            .db
 947            .can_user_access_channel(user_id, channel_id)
 948            .await?
 949        {
 950            Err(anyhow!("access denied"))?;
 951        }
 952
 953        let messages = self
 954            .app_state
 955            .db
 956            .get_channel_messages(
 957                channel_id,
 958                MESSAGE_COUNT_PER_PAGE,
 959                Some(MessageId::from_proto(request.payload.before_message_id)),
 960            )
 961            .await?
 962            .into_iter()
 963            .map(|msg| proto::ChannelMessage {
 964                id: msg.id.to_proto(),
 965                body: msg.body,
 966                timestamp: msg.sent_at.unix_timestamp() as u64,
 967                sender_id: msg.sender_id.to_proto(),
 968                nonce: Some(msg.nonce.as_u128().into()),
 969            })
 970            .collect::<Vec<_>>();
 971
 972        Ok(proto::GetChannelMessagesResponse {
 973            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 974            messages,
 975        })
 976    }
 977
 978    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 979        self.store.read()
 980    }
 981
 982    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 983        self.store.write()
 984    }
 985}
 986
 987impl Executor for RealExecutor {
 988    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 989        task::spawn(future);
 990    }
 991}
 992
 993fn broadcast<F>(
 994    sender_id: ConnectionId,
 995    receiver_ids: Vec<ConnectionId>,
 996    mut f: F,
 997) -> anyhow::Result<()>
 998where
 999    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1000{
1001    let mut result = Ok(());
1002    for receiver_id in receiver_ids {
1003        if receiver_id != sender_id {
1004            if let Err(error) = f(receiver_id) {
1005                if result.is_ok() {
1006                    result = Err(error);
1007                }
1008            }
1009        }
1010    }
1011    result
1012}
1013
1014pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1015    let server = Server::new(app.state().clone(), rpc.clone(), None);
1016    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1017        let server = server.clone();
1018        async move {
1019            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1020
1021            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1022            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1023            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1024            let client_protocol_version: Option<u32> = request
1025                .header("X-Zed-Protocol-Version")
1026                .and_then(|v| v.as_str().parse().ok());
1027
1028            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1029                return Ok(Response::new(StatusCode::UpgradeRequired));
1030            }
1031
1032            let header = match request.header("Sec-Websocket-Key") {
1033                Some(h) => h.as_str(),
1034                None => return Err(anyhow!("expected sec-websocket-key"))?,
1035            };
1036
1037            let user_id = process_auth_header(&request).await?;
1038
1039            let mut response = Response::new(StatusCode::SwitchingProtocols);
1040            response.insert_header(UPGRADE, "websocket");
1041            response.insert_header(CONNECTION, "Upgrade");
1042            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1043            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1044            response.insert_header("Sec-Websocket-Version", "13");
1045
1046            let http_res: &mut tide::http::Response = response.as_mut();
1047            let upgrade_receiver = http_res.recv_upgrade().await;
1048            let addr = request.remote().unwrap_or("unknown").to_string();
1049            task::spawn(async move {
1050                if let Some(stream) = upgrade_receiver.await {
1051                    server
1052                        .handle_connection(
1053                            Connection::new(
1054                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1055                            ),
1056                            addr,
1057                            user_id,
1058                            None,
1059                            RealExecutor,
1060                        )
1061                        .await;
1062                }
1063            });
1064
1065            Ok(response)
1066        }
1067    });
1068}
1069
1070fn header_contains_ignore_case<T>(
1071    request: &tide::Request<T>,
1072    header_name: HeaderName,
1073    value: &str,
1074) -> bool {
1075    request
1076        .header(header_name)
1077        .map(|h| {
1078            h.as_str()
1079                .split(',')
1080                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1081        })
1082        .unwrap_or(false)
1083}
1084
1085#[cfg(test)]
1086mod tests {
1087    use super::*;
1088    use crate::{
1089        auth,
1090        db::{tests::TestDb, UserId},
1091        github, AppState, Config,
1092    };
1093    use ::rpc::Peer;
1094    use collections::BTreeMap;
1095    use gpui::{executor, ModelHandle, TestAppContext};
1096    use parking_lot::Mutex;
1097    use postage::{sink::Sink, watch};
1098    use rand::prelude::*;
1099    use rpc::PeerId;
1100    use serde_json::json;
1101    use sqlx::types::time::OffsetDateTime;
1102    use std::{
1103        cell::{Cell, RefCell},
1104        env,
1105        ops::Deref,
1106        path::Path,
1107        rc::Rc,
1108        sync::{
1109            atomic::{AtomicBool, Ordering::SeqCst},
1110            Arc,
1111        },
1112        time::Duration,
1113    };
1114    use zed::{
1115        client::{
1116            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1117            EstablishConnectionError, UserStore,
1118        },
1119        editor::{
1120            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1121            Redo, ToggleCodeActions, Undo,
1122        },
1123        fs::{FakeFs, Fs as _},
1124        language::{
1125            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1126            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1127        },
1128        lsp,
1129        project::{DiagnosticSummary, Project, ProjectPath},
1130        workspace::{Workspace, WorkspaceParams},
1131    };
1132
1133    #[cfg(test)]
1134    #[ctor::ctor]
1135    fn init_logger() {
1136        if std::env::var("RUST_LOG").is_ok() {
1137            env_logger::init();
1138        }
1139    }
1140
1141    #[gpui::test(iterations = 10)]
1142    async fn test_share_project(
1143        mut cx_a: TestAppContext,
1144        mut cx_b: TestAppContext,
1145        last_iteration: bool,
1146    ) {
1147        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1148        let lang_registry = Arc::new(LanguageRegistry::new());
1149        let fs = Arc::new(FakeFs::new(cx_a.background()));
1150        cx_a.foreground().forbid_parking();
1151
1152        // Connect to a server as 2 clients.
1153        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1154        let client_a = server.create_client(&mut cx_a, "user_a").await;
1155        let client_b = server.create_client(&mut cx_b, "user_b").await;
1156
1157        // Share a project as client A
1158        fs.insert_tree(
1159            "/a",
1160            json!({
1161                ".zed.toml": r#"collaborators = ["user_b"]"#,
1162                "a.txt": "a-contents",
1163                "b.txt": "b-contents",
1164            }),
1165        )
1166        .await;
1167        let project_a = cx_a.update(|cx| {
1168            Project::local(
1169                client_a.clone(),
1170                client_a.user_store.clone(),
1171                lang_registry.clone(),
1172                fs.clone(),
1173                cx,
1174            )
1175        });
1176        let (worktree_a, _) = project_a
1177            .update(&mut cx_a, |p, cx| {
1178                p.find_or_create_local_worktree("/a", false, cx)
1179            })
1180            .await
1181            .unwrap();
1182        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1183        worktree_a
1184            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1185            .await;
1186        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1187        project_a
1188            .update(&mut cx_a, |p, cx| p.share(cx))
1189            .await
1190            .unwrap();
1191
1192        // Join that project as client B
1193        let project_b = Project::remote(
1194            project_id,
1195            client_b.clone(),
1196            client_b.user_store.clone(),
1197            lang_registry.clone(),
1198            fs.clone(),
1199            &mut cx_b.to_async(),
1200        )
1201        .await
1202        .unwrap();
1203
1204        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1205            assert_eq!(
1206                project
1207                    .collaborators()
1208                    .get(&client_a.peer_id)
1209                    .unwrap()
1210                    .user
1211                    .github_login,
1212                "user_a"
1213            );
1214            project.replica_id()
1215        });
1216        project_a
1217            .condition(&cx_a, |tree, _| {
1218                tree.collaborators()
1219                    .get(&client_b.peer_id)
1220                    .map_or(false, |collaborator| {
1221                        collaborator.replica_id == replica_id_b
1222                            && collaborator.user.github_login == "user_b"
1223                    })
1224            })
1225            .await;
1226
1227        // Open the same file as client B and client A.
1228        let buffer_b = project_b
1229            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1230            .await
1231            .unwrap();
1232        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1233        buffer_b.read_with(&cx_b, |buf, cx| {
1234            assert_eq!(buf.read(cx).text(), "b-contents")
1235        });
1236        project_a.read_with(&cx_a, |project, cx| {
1237            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1238        });
1239        let buffer_a = project_a
1240            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1241            .await
1242            .unwrap();
1243
1244        let editor_b = cx_b.add_view(window_b, |cx| {
1245            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1246        });
1247
1248        // TODO
1249        // // Create a selection set as client B and see that selection set as client A.
1250        // buffer_a
1251        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1252        //     .await;
1253
1254        // Edit the buffer as client B and see that edit as client A.
1255        editor_b.update(&mut cx_b, |editor, cx| {
1256            editor.handle_input(&Input("ok, ".into()), cx)
1257        });
1258        buffer_a
1259            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1260            .await;
1261
1262        // TODO
1263        // // Remove the selection set as client B, see those selections disappear as client A.
1264        cx_b.update(move |_| drop(editor_b));
1265        // buffer_a
1266        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1267        //     .await;
1268
1269        // Close the buffer as client A, see that the buffer is closed.
1270        cx_a.update(move |_| drop(buffer_a));
1271        project_a
1272            .condition(&cx_a, |project, cx| {
1273                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1274            })
1275            .await;
1276
1277        // Dropping the client B's project removes client B from client A's collaborators.
1278        cx_b.update(move |_| drop(project_b));
1279        project_a
1280            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1281            .await;
1282    }
1283
1284    #[gpui::test(iterations = 10)]
1285    async fn test_unshare_project(
1286        mut cx_a: TestAppContext,
1287        mut cx_b: TestAppContext,
1288        last_iteration: bool,
1289    ) {
1290        let lang_registry = Arc::new(LanguageRegistry::new());
1291        let fs = Arc::new(FakeFs::new(cx_a.background()));
1292        cx_a.foreground().forbid_parking();
1293
1294        // Connect to a server as 2 clients.
1295        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1296        let client_a = server.create_client(&mut cx_a, "user_a").await;
1297        let client_b = server.create_client(&mut cx_b, "user_b").await;
1298
1299        // Share a project as client A
1300        fs.insert_tree(
1301            "/a",
1302            json!({
1303                ".zed.toml": r#"collaborators = ["user_b"]"#,
1304                "a.txt": "a-contents",
1305                "b.txt": "b-contents",
1306            }),
1307        )
1308        .await;
1309        let project_a = cx_a.update(|cx| {
1310            Project::local(
1311                client_a.clone(),
1312                client_a.user_store.clone(),
1313                lang_registry.clone(),
1314                fs.clone(),
1315                cx,
1316            )
1317        });
1318        let (worktree_a, _) = project_a
1319            .update(&mut cx_a, |p, cx| {
1320                p.find_or_create_local_worktree("/a", false, cx)
1321            })
1322            .await
1323            .unwrap();
1324        worktree_a
1325            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1326            .await;
1327        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1328        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1329        project_a
1330            .update(&mut cx_a, |p, cx| p.share(cx))
1331            .await
1332            .unwrap();
1333        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1334
1335        // Join that project as client B
1336        let project_b = Project::remote(
1337            project_id,
1338            client_b.clone(),
1339            client_b.user_store.clone(),
1340            lang_registry.clone(),
1341            fs.clone(),
1342            &mut cx_b.to_async(),
1343        )
1344        .await
1345        .unwrap();
1346        project_b
1347            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1348            .await
1349            .unwrap();
1350
1351        // Unshare the project as client A
1352        project_a
1353            .update(&mut cx_a, |project, cx| project.unshare(cx))
1354            .await
1355            .unwrap();
1356        project_b
1357            .condition(&mut cx_b, |project, _| project.is_read_only())
1358            .await;
1359        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1360        drop(project_b);
1361
1362        // Share the project again and ensure guests can still join.
1363        project_a
1364            .update(&mut cx_a, |project, cx| project.share(cx))
1365            .await
1366            .unwrap();
1367        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1368
1369        let project_c = Project::remote(
1370            project_id,
1371            client_b.clone(),
1372            client_b.user_store.clone(),
1373            lang_registry.clone(),
1374            fs.clone(),
1375            &mut cx_b.to_async(),
1376        )
1377        .await
1378        .unwrap();
1379        project_c
1380            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1381            .await
1382            .unwrap();
1383    }
1384
1385    #[gpui::test(iterations = 10)]
1386    async fn test_propagate_saves_and_fs_changes(
1387        mut cx_a: TestAppContext,
1388        mut cx_b: TestAppContext,
1389        mut cx_c: TestAppContext,
1390        last_iteration: bool,
1391    ) {
1392        let lang_registry = Arc::new(LanguageRegistry::new());
1393        let fs = Arc::new(FakeFs::new(cx_a.background()));
1394        cx_a.foreground().forbid_parking();
1395
1396        // Connect to a server as 3 clients.
1397        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1398        let client_a = server.create_client(&mut cx_a, "user_a").await;
1399        let client_b = server.create_client(&mut cx_b, "user_b").await;
1400        let client_c = server.create_client(&mut cx_c, "user_c").await;
1401
1402        // Share a worktree as client A.
1403        fs.insert_tree(
1404            "/a",
1405            json!({
1406                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1407                "file1": "",
1408                "file2": ""
1409            }),
1410        )
1411        .await;
1412        let project_a = cx_a.update(|cx| {
1413            Project::local(
1414                client_a.clone(),
1415                client_a.user_store.clone(),
1416                lang_registry.clone(),
1417                fs.clone(),
1418                cx,
1419            )
1420        });
1421        let (worktree_a, _) = project_a
1422            .update(&mut cx_a, |p, cx| {
1423                p.find_or_create_local_worktree("/a", false, cx)
1424            })
1425            .await
1426            .unwrap();
1427        worktree_a
1428            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1429            .await;
1430        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1431        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1432        project_a
1433            .update(&mut cx_a, |p, cx| p.share(cx))
1434            .await
1435            .unwrap();
1436
1437        // Join that worktree as clients B and C.
1438        let project_b = Project::remote(
1439            project_id,
1440            client_b.clone(),
1441            client_b.user_store.clone(),
1442            lang_registry.clone(),
1443            fs.clone(),
1444            &mut cx_b.to_async(),
1445        )
1446        .await
1447        .unwrap();
1448        let project_c = Project::remote(
1449            project_id,
1450            client_c.clone(),
1451            client_c.user_store.clone(),
1452            lang_registry.clone(),
1453            fs.clone(),
1454            &mut cx_c.to_async(),
1455        )
1456        .await
1457        .unwrap();
1458        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1459        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1460
1461        // Open and edit a buffer as both guests B and C.
1462        let buffer_b = project_b
1463            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1464            .await
1465            .unwrap();
1466        let buffer_c = project_c
1467            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1468            .await
1469            .unwrap();
1470        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1471        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1472
1473        // Open and edit that buffer as the host.
1474        let buffer_a = project_a
1475            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1476            .await
1477            .unwrap();
1478
1479        buffer_a
1480            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1481            .await;
1482        buffer_a.update(&mut cx_a, |buf, cx| {
1483            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1484        });
1485
1486        // Wait for edits to propagate
1487        buffer_a
1488            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1489            .await;
1490        buffer_b
1491            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1492            .await;
1493        buffer_c
1494            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1495            .await;
1496
1497        // Edit the buffer as the host and concurrently save as guest B.
1498        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1499        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1500        save_b.await.unwrap();
1501        assert_eq!(
1502            fs.load("/a/file1".as_ref()).await.unwrap(),
1503            "hi-a, i-am-c, i-am-b, i-am-a"
1504        );
1505        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1506        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1507        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1508
1509        // Make changes on host's file system, see those changes on guest worktrees.
1510        fs.rename(
1511            "/a/file1".as_ref(),
1512            "/a/file1-renamed".as_ref(),
1513            Default::default(),
1514        )
1515        .await
1516        .unwrap();
1517
1518        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1519            .await
1520            .unwrap();
1521        fs.insert_file(Path::new("/a/file4"), "4".into())
1522            .await
1523            .unwrap();
1524
1525        worktree_a
1526            .condition(&cx_a, |tree, _| {
1527                tree.paths()
1528                    .map(|p| p.to_string_lossy())
1529                    .collect::<Vec<_>>()
1530                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1531            })
1532            .await;
1533        worktree_b
1534            .condition(&cx_b, |tree, _| {
1535                tree.paths()
1536                    .map(|p| p.to_string_lossy())
1537                    .collect::<Vec<_>>()
1538                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1539            })
1540            .await;
1541        worktree_c
1542            .condition(&cx_c, |tree, _| {
1543                tree.paths()
1544                    .map(|p| p.to_string_lossy())
1545                    .collect::<Vec<_>>()
1546                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1547            })
1548            .await;
1549
1550        // Ensure buffer files are updated as well.
1551        buffer_a
1552            .condition(&cx_a, |buf, _| {
1553                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1554            })
1555            .await;
1556        buffer_b
1557            .condition(&cx_b, |buf, _| {
1558                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1559            })
1560            .await;
1561        buffer_c
1562            .condition(&cx_c, |buf, _| {
1563                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1564            })
1565            .await;
1566    }
1567
1568    #[gpui::test(iterations = 10)]
1569    async fn test_buffer_conflict_after_save(
1570        mut cx_a: TestAppContext,
1571        mut cx_b: TestAppContext,
1572        last_iteration: bool,
1573    ) {
1574        cx_a.foreground().forbid_parking();
1575        let lang_registry = Arc::new(LanguageRegistry::new());
1576        let fs = Arc::new(FakeFs::new(cx_a.background()));
1577
1578        // Connect to a server as 2 clients.
1579        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1580        let client_a = server.create_client(&mut cx_a, "user_a").await;
1581        let client_b = server.create_client(&mut cx_b, "user_b").await;
1582
1583        // Share a project as client A
1584        fs.insert_tree(
1585            "/dir",
1586            json!({
1587                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1588                "a.txt": "a-contents",
1589            }),
1590        )
1591        .await;
1592
1593        let project_a = cx_a.update(|cx| {
1594            Project::local(
1595                client_a.clone(),
1596                client_a.user_store.clone(),
1597                lang_registry.clone(),
1598                fs.clone(),
1599                cx,
1600            )
1601        });
1602        let (worktree_a, _) = project_a
1603            .update(&mut cx_a, |p, cx| {
1604                p.find_or_create_local_worktree("/dir", false, cx)
1605            })
1606            .await
1607            .unwrap();
1608        worktree_a
1609            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1610            .await;
1611        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1612        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1613        project_a
1614            .update(&mut cx_a, |p, cx| p.share(cx))
1615            .await
1616            .unwrap();
1617
1618        // Join that project as client B
1619        let project_b = Project::remote(
1620            project_id,
1621            client_b.clone(),
1622            client_b.user_store.clone(),
1623            lang_registry.clone(),
1624            fs.clone(),
1625            &mut cx_b.to_async(),
1626        )
1627        .await
1628        .unwrap();
1629
1630        // Open a buffer as client B
1631        let buffer_b = project_b
1632            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1633            .await
1634            .unwrap();
1635
1636        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1637        buffer_b.read_with(&cx_b, |buf, _| {
1638            assert!(buf.is_dirty());
1639            assert!(!buf.has_conflict());
1640        });
1641
1642        buffer_b
1643            .update(&mut cx_b, |buf, cx| buf.save(cx))
1644            .await
1645            .unwrap();
1646        buffer_b
1647            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1648            .await;
1649        buffer_b.read_with(&cx_b, |buf, _| {
1650            assert!(!buf.has_conflict());
1651        });
1652
1653        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1654        buffer_b.read_with(&cx_b, |buf, _| {
1655            assert!(buf.is_dirty());
1656            assert!(!buf.has_conflict());
1657        });
1658    }
1659
1660    #[gpui::test(iterations = 10)]
1661    async fn test_buffer_reloading(
1662        mut cx_a: TestAppContext,
1663        mut cx_b: TestAppContext,
1664        last_iteration: bool,
1665    ) {
1666        cx_a.foreground().forbid_parking();
1667        let lang_registry = Arc::new(LanguageRegistry::new());
1668        let fs = Arc::new(FakeFs::new(cx_a.background()));
1669
1670        // Connect to a server as 2 clients.
1671        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1672        let client_a = server.create_client(&mut cx_a, "user_a").await;
1673        let client_b = server.create_client(&mut cx_b, "user_b").await;
1674
1675        // Share a project as client A
1676        fs.insert_tree(
1677            "/dir",
1678            json!({
1679                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1680                "a.txt": "a-contents",
1681            }),
1682        )
1683        .await;
1684
1685        let project_a = cx_a.update(|cx| {
1686            Project::local(
1687                client_a.clone(),
1688                client_a.user_store.clone(),
1689                lang_registry.clone(),
1690                fs.clone(),
1691                cx,
1692            )
1693        });
1694        let (worktree_a, _) = project_a
1695            .update(&mut cx_a, |p, cx| {
1696                p.find_or_create_local_worktree("/dir", false, cx)
1697            })
1698            .await
1699            .unwrap();
1700        worktree_a
1701            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1702            .await;
1703        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1704        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1705        project_a
1706            .update(&mut cx_a, |p, cx| p.share(cx))
1707            .await
1708            .unwrap();
1709
1710        // Join that project as client B
1711        let project_b = Project::remote(
1712            project_id,
1713            client_b.clone(),
1714            client_b.user_store.clone(),
1715            lang_registry.clone(),
1716            fs.clone(),
1717            &mut cx_b.to_async(),
1718        )
1719        .await
1720        .unwrap();
1721        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1722
1723        // Open a buffer as client B
1724        let buffer_b = project_b
1725            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1726            .await
1727            .unwrap();
1728        buffer_b.read_with(&cx_b, |buf, _| {
1729            assert!(!buf.is_dirty());
1730            assert!(!buf.has_conflict());
1731        });
1732
1733        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1734            .await
1735            .unwrap();
1736        buffer_b
1737            .condition(&cx_b, |buf, _| {
1738                buf.text() == "new contents" && !buf.is_dirty()
1739            })
1740            .await;
1741        buffer_b.read_with(&cx_b, |buf, _| {
1742            assert!(!buf.has_conflict());
1743        });
1744    }
1745
1746    #[gpui::test(iterations = 10)]
1747    async fn test_editing_while_guest_opens_buffer(
1748        mut cx_a: TestAppContext,
1749        mut cx_b: TestAppContext,
1750        last_iteration: bool,
1751    ) {
1752        cx_a.foreground().forbid_parking();
1753        let lang_registry = Arc::new(LanguageRegistry::new());
1754        let fs = Arc::new(FakeFs::new(cx_a.background()));
1755
1756        // Connect to a server as 2 clients.
1757        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1758        let client_a = server.create_client(&mut cx_a, "user_a").await;
1759        let client_b = server.create_client(&mut cx_b, "user_b").await;
1760
1761        // Share a project as client A
1762        fs.insert_tree(
1763            "/dir",
1764            json!({
1765                ".zed.toml": r#"collaborators = ["user_b"]"#,
1766                "a.txt": "a-contents",
1767            }),
1768        )
1769        .await;
1770        let project_a = cx_a.update(|cx| {
1771            Project::local(
1772                client_a.clone(),
1773                client_a.user_store.clone(),
1774                lang_registry.clone(),
1775                fs.clone(),
1776                cx,
1777            )
1778        });
1779        let (worktree_a, _) = project_a
1780            .update(&mut cx_a, |p, cx| {
1781                p.find_or_create_local_worktree("/dir", false, cx)
1782            })
1783            .await
1784            .unwrap();
1785        worktree_a
1786            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1787            .await;
1788        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1789        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1790        project_a
1791            .update(&mut cx_a, |p, cx| p.share(cx))
1792            .await
1793            .unwrap();
1794
1795        // Join that project as client B
1796        let project_b = Project::remote(
1797            project_id,
1798            client_b.clone(),
1799            client_b.user_store.clone(),
1800            lang_registry.clone(),
1801            fs.clone(),
1802            &mut cx_b.to_async(),
1803        )
1804        .await
1805        .unwrap();
1806
1807        // Open a buffer as client A
1808        let buffer_a = project_a
1809            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1810            .await
1811            .unwrap();
1812
1813        // Start opening the same buffer as client B
1814        let buffer_b = cx_b
1815            .background()
1816            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1817
1818        // Edit the buffer as client A while client B is still opening it.
1819        cx_b.background().simulate_random_delay().await;
1820        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1821        cx_b.background().simulate_random_delay().await;
1822        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1823
1824        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1825        let buffer_b = buffer_b.await.unwrap();
1826        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1827    }
1828
1829    #[gpui::test(iterations = 10)]
1830    async fn test_leaving_worktree_while_opening_buffer(
1831        mut cx_a: TestAppContext,
1832        mut cx_b: TestAppContext,
1833        last_iteration: bool,
1834    ) {
1835        cx_a.foreground().forbid_parking();
1836        let lang_registry = Arc::new(LanguageRegistry::new());
1837        let fs = Arc::new(FakeFs::new(cx_a.background()));
1838
1839        // Connect to a server as 2 clients.
1840        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1841        let client_a = server.create_client(&mut cx_a, "user_a").await;
1842        let client_b = server.create_client(&mut cx_b, "user_b").await;
1843
1844        // Share a project as client A
1845        fs.insert_tree(
1846            "/dir",
1847            json!({
1848                ".zed.toml": r#"collaborators = ["user_b"]"#,
1849                "a.txt": "a-contents",
1850            }),
1851        )
1852        .await;
1853        let project_a = cx_a.update(|cx| {
1854            Project::local(
1855                client_a.clone(),
1856                client_a.user_store.clone(),
1857                lang_registry.clone(),
1858                fs.clone(),
1859                cx,
1860            )
1861        });
1862        let (worktree_a, _) = project_a
1863            .update(&mut cx_a, |p, cx| {
1864                p.find_or_create_local_worktree("/dir", false, cx)
1865            })
1866            .await
1867            .unwrap();
1868        worktree_a
1869            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1870            .await;
1871        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1872        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1873        project_a
1874            .update(&mut cx_a, |p, cx| p.share(cx))
1875            .await
1876            .unwrap();
1877
1878        // Join that project as client B
1879        let project_b = Project::remote(
1880            project_id,
1881            client_b.clone(),
1882            client_b.user_store.clone(),
1883            lang_registry.clone(),
1884            fs.clone(),
1885            &mut cx_b.to_async(),
1886        )
1887        .await
1888        .unwrap();
1889
1890        // See that a guest has joined as client A.
1891        project_a
1892            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1893            .await;
1894
1895        // Begin opening a buffer as client B, but leave the project before the open completes.
1896        let buffer_b = cx_b
1897            .background()
1898            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1899        cx_b.update(|_| drop(project_b));
1900        drop(buffer_b);
1901
1902        // See that the guest has left.
1903        project_a
1904            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1905            .await;
1906    }
1907
1908    #[gpui::test(iterations = 10)]
1909    async fn test_peer_disconnection(
1910        mut cx_a: TestAppContext,
1911        mut cx_b: TestAppContext,
1912        last_iteration: bool,
1913    ) {
1914        cx_a.foreground().forbid_parking();
1915        let lang_registry = Arc::new(LanguageRegistry::new());
1916        let fs = Arc::new(FakeFs::new(cx_a.background()));
1917
1918        // Connect to a server as 2 clients.
1919        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1920        let client_a = server.create_client(&mut cx_a, "user_a").await;
1921        let client_b = server.create_client(&mut cx_b, "user_b").await;
1922
1923        // Share a project as client A
1924        fs.insert_tree(
1925            "/a",
1926            json!({
1927                ".zed.toml": r#"collaborators = ["user_b"]"#,
1928                "a.txt": "a-contents",
1929                "b.txt": "b-contents",
1930            }),
1931        )
1932        .await;
1933        let project_a = cx_a.update(|cx| {
1934            Project::local(
1935                client_a.clone(),
1936                client_a.user_store.clone(),
1937                lang_registry.clone(),
1938                fs.clone(),
1939                cx,
1940            )
1941        });
1942        let (worktree_a, _) = project_a
1943            .update(&mut cx_a, |p, cx| {
1944                p.find_or_create_local_worktree("/a", false, cx)
1945            })
1946            .await
1947            .unwrap();
1948        worktree_a
1949            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1950            .await;
1951        let project_id = project_a
1952            .update(&mut cx_a, |project, _| project.next_remote_id())
1953            .await;
1954        project_a
1955            .update(&mut cx_a, |project, cx| project.share(cx))
1956            .await
1957            .unwrap();
1958
1959        // Join that project as client B
1960        let _project_b = Project::remote(
1961            project_id,
1962            client_b.clone(),
1963            client_b.user_store.clone(),
1964            lang_registry.clone(),
1965            fs.clone(),
1966            &mut cx_b.to_async(),
1967        )
1968        .await
1969        .unwrap();
1970
1971        // See that a guest has joined as client A.
1972        project_a
1973            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1974            .await;
1975
1976        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1977        client_b.disconnect(&cx_b.to_async()).unwrap();
1978        project_a
1979            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1980            .await;
1981    }
1982
1983    #[gpui::test(iterations = 10)]
1984    async fn test_collaborating_with_diagnostics(
1985        mut cx_a: TestAppContext,
1986        mut cx_b: TestAppContext,
1987        last_iteration: bool,
1988    ) {
1989        cx_a.foreground().forbid_parking();
1990        let mut lang_registry = Arc::new(LanguageRegistry::new());
1991        let fs = Arc::new(FakeFs::new(cx_a.background()));
1992
1993        // Set up a fake language server.
1994        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1995        Arc::get_mut(&mut lang_registry)
1996            .unwrap()
1997            .add(Arc::new(Language::new(
1998                LanguageConfig {
1999                    name: "Rust".to_string(),
2000                    path_suffixes: vec!["rs".to_string()],
2001                    language_server: Some(language_server_config),
2002                    ..Default::default()
2003                },
2004                Some(tree_sitter_rust::language()),
2005            )));
2006
2007        // Connect to a server as 2 clients.
2008        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2009        let client_a = server.create_client(&mut cx_a, "user_a").await;
2010        let client_b = server.create_client(&mut cx_b, "user_b").await;
2011
2012        // Share a project as client A
2013        fs.insert_tree(
2014            "/a",
2015            json!({
2016                ".zed.toml": r#"collaborators = ["user_b"]"#,
2017                "a.rs": "let one = two",
2018                "other.rs": "",
2019            }),
2020        )
2021        .await;
2022        let project_a = cx_a.update(|cx| {
2023            Project::local(
2024                client_a.clone(),
2025                client_a.user_store.clone(),
2026                lang_registry.clone(),
2027                fs.clone(),
2028                cx,
2029            )
2030        });
2031        let (worktree_a, _) = project_a
2032            .update(&mut cx_a, |p, cx| {
2033                p.find_or_create_local_worktree("/a", false, cx)
2034            })
2035            .await
2036            .unwrap();
2037        worktree_a
2038            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2039            .await;
2040        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2041        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2042        project_a
2043            .update(&mut cx_a, |p, cx| p.share(cx))
2044            .await
2045            .unwrap();
2046
2047        // Cause the language server to start.
2048        let _ = cx_a
2049            .background()
2050            .spawn(project_a.update(&mut cx_a, |project, cx| {
2051                project.open_buffer(
2052                    ProjectPath {
2053                        worktree_id,
2054                        path: Path::new("other.rs").into(),
2055                    },
2056                    cx,
2057                )
2058            }))
2059            .await
2060            .unwrap();
2061
2062        // Simulate a language server reporting errors for a file.
2063        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2064        fake_language_server
2065            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2066                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2067                version: None,
2068                diagnostics: vec![lsp::Diagnostic {
2069                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2070                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2071                    message: "message 1".to_string(),
2072                    ..Default::default()
2073                }],
2074            })
2075            .await;
2076
2077        // Wait for server to see the diagnostics update.
2078        server
2079            .condition(|store| {
2080                let worktree = store
2081                    .project(project_id)
2082                    .unwrap()
2083                    .worktrees
2084                    .get(&worktree_id.to_proto())
2085                    .unwrap();
2086
2087                !worktree
2088                    .share
2089                    .as_ref()
2090                    .unwrap()
2091                    .diagnostic_summaries
2092                    .is_empty()
2093            })
2094            .await;
2095
2096        // Join the worktree as client B.
2097        let project_b = Project::remote(
2098            project_id,
2099            client_b.clone(),
2100            client_b.user_store.clone(),
2101            lang_registry.clone(),
2102            fs.clone(),
2103            &mut cx_b.to_async(),
2104        )
2105        .await
2106        .unwrap();
2107
2108        project_b.read_with(&cx_b, |project, cx| {
2109            assert_eq!(
2110                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2111                &[(
2112                    ProjectPath {
2113                        worktree_id,
2114                        path: Arc::from(Path::new("a.rs")),
2115                    },
2116                    DiagnosticSummary {
2117                        error_count: 1,
2118                        warning_count: 0,
2119                        ..Default::default()
2120                    },
2121                )]
2122            )
2123        });
2124
2125        // Simulate a language server reporting more errors for a file.
2126        fake_language_server
2127            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2128                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2129                version: None,
2130                diagnostics: vec![
2131                    lsp::Diagnostic {
2132                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2133                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2134                        message: "message 1".to_string(),
2135                        ..Default::default()
2136                    },
2137                    lsp::Diagnostic {
2138                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2139                        range: lsp::Range::new(
2140                            lsp::Position::new(0, 10),
2141                            lsp::Position::new(0, 13),
2142                        ),
2143                        message: "message 2".to_string(),
2144                        ..Default::default()
2145                    },
2146                ],
2147            })
2148            .await;
2149
2150        // Client b gets the updated summaries
2151        project_b
2152            .condition(&cx_b, |project, cx| {
2153                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2154                    == &[(
2155                        ProjectPath {
2156                            worktree_id,
2157                            path: Arc::from(Path::new("a.rs")),
2158                        },
2159                        DiagnosticSummary {
2160                            error_count: 1,
2161                            warning_count: 1,
2162                            ..Default::default()
2163                        },
2164                    )]
2165            })
2166            .await;
2167
2168        // Open the file with the errors on client B. They should be present.
2169        let buffer_b = cx_b
2170            .background()
2171            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2172            .await
2173            .unwrap();
2174
2175        buffer_b.read_with(&cx_b, |buffer, _| {
2176            assert_eq!(
2177                buffer
2178                    .snapshot()
2179                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2180                    .map(|entry| entry)
2181                    .collect::<Vec<_>>(),
2182                &[
2183                    DiagnosticEntry {
2184                        range: Point::new(0, 4)..Point::new(0, 7),
2185                        diagnostic: Diagnostic {
2186                            group_id: 0,
2187                            message: "message 1".to_string(),
2188                            severity: lsp::DiagnosticSeverity::ERROR,
2189                            is_primary: true,
2190                            ..Default::default()
2191                        }
2192                    },
2193                    DiagnosticEntry {
2194                        range: Point::new(0, 10)..Point::new(0, 13),
2195                        diagnostic: Diagnostic {
2196                            group_id: 1,
2197                            severity: lsp::DiagnosticSeverity::WARNING,
2198                            message: "message 2".to_string(),
2199                            is_primary: true,
2200                            ..Default::default()
2201                        }
2202                    }
2203                ]
2204            );
2205        });
2206    }
2207
2208    #[gpui::test(iterations = 10)]
2209    async fn test_collaborating_with_completion(
2210        mut cx_a: TestAppContext,
2211        mut cx_b: TestAppContext,
2212        last_iteration: bool,
2213    ) {
2214        cx_a.foreground().forbid_parking();
2215        let mut lang_registry = Arc::new(LanguageRegistry::new());
2216        let fs = Arc::new(FakeFs::new(cx_a.background()));
2217
2218        // Set up a fake language server.
2219        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2220        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2221            completion_provider: Some(lsp::CompletionOptions {
2222                trigger_characters: Some(vec![".".to_string()]),
2223                ..Default::default()
2224            }),
2225            ..Default::default()
2226        });
2227        Arc::get_mut(&mut lang_registry)
2228            .unwrap()
2229            .add(Arc::new(Language::new(
2230                LanguageConfig {
2231                    name: "Rust".to_string(),
2232                    path_suffixes: vec!["rs".to_string()],
2233                    language_server: Some(language_server_config),
2234                    ..Default::default()
2235                },
2236                Some(tree_sitter_rust::language()),
2237            )));
2238
2239        // Connect to a server as 2 clients.
2240        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2241        let client_a = server.create_client(&mut cx_a, "user_a").await;
2242        let client_b = server.create_client(&mut cx_b, "user_b").await;
2243
2244        // Share a project as client A
2245        fs.insert_tree(
2246            "/a",
2247            json!({
2248                ".zed.toml": r#"collaborators = ["user_b"]"#,
2249                "main.rs": "fn main() { a }",
2250                "other.rs": "",
2251            }),
2252        )
2253        .await;
2254        let project_a = cx_a.update(|cx| {
2255            Project::local(
2256                client_a.clone(),
2257                client_a.user_store.clone(),
2258                lang_registry.clone(),
2259                fs.clone(),
2260                cx,
2261            )
2262        });
2263        let (worktree_a, _) = project_a
2264            .update(&mut cx_a, |p, cx| {
2265                p.find_or_create_local_worktree("/a", false, cx)
2266            })
2267            .await
2268            .unwrap();
2269        worktree_a
2270            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2271            .await;
2272        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2273        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2274        project_a
2275            .update(&mut cx_a, |p, cx| p.share(cx))
2276            .await
2277            .unwrap();
2278
2279        // Join the worktree as client B.
2280        let project_b = Project::remote(
2281            project_id,
2282            client_b.clone(),
2283            client_b.user_store.clone(),
2284            lang_registry.clone(),
2285            fs.clone(),
2286            &mut cx_b.to_async(),
2287        )
2288        .await
2289        .unwrap();
2290
2291        // Open a file in an editor as the guest.
2292        let buffer_b = project_b
2293            .update(&mut cx_b, |p, cx| {
2294                p.open_buffer((worktree_id, "main.rs"), cx)
2295            })
2296            .await
2297            .unwrap();
2298        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2299        let editor_b = cx_b.add_view(window_b, |cx| {
2300            Editor::for_buffer(
2301                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2302                Arc::new(|cx| EditorSettings::test(cx)),
2303                Some(project_b.clone()),
2304                cx,
2305            )
2306        });
2307
2308        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2309        buffer_b
2310            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2311            .await;
2312
2313        // Type a completion trigger character as the guest.
2314        editor_b.update(&mut cx_b, |editor, cx| {
2315            editor.select_ranges([13..13], None, cx);
2316            editor.handle_input(&Input(".".into()), cx);
2317            cx.focus(&editor_b);
2318        });
2319
2320        // Receive a completion request as the host's language server.
2321        // Return some completions from the host's language server.
2322        cx_a.foreground().start_waiting();
2323        fake_language_server
2324            .handle_request::<lsp::request::Completion, _>(|params| {
2325                assert_eq!(
2326                    params.text_document_position.text_document.uri,
2327                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2328                );
2329                assert_eq!(
2330                    params.text_document_position.position,
2331                    lsp::Position::new(0, 14),
2332                );
2333
2334                Some(lsp::CompletionResponse::Array(vec![
2335                    lsp::CompletionItem {
2336                        label: "first_method(…)".into(),
2337                        detail: Some("fn(&mut self, B) -> C".into()),
2338                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2339                            new_text: "first_method($1)".to_string(),
2340                            range: lsp::Range::new(
2341                                lsp::Position::new(0, 14),
2342                                lsp::Position::new(0, 14),
2343                            ),
2344                        })),
2345                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2346                        ..Default::default()
2347                    },
2348                    lsp::CompletionItem {
2349                        label: "second_method(…)".into(),
2350                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2351                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2352                            new_text: "second_method()".to_string(),
2353                            range: lsp::Range::new(
2354                                lsp::Position::new(0, 14),
2355                                lsp::Position::new(0, 14),
2356                            ),
2357                        })),
2358                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2359                        ..Default::default()
2360                    },
2361                ]))
2362            })
2363            .next()
2364            .await
2365            .unwrap();
2366        cx_a.foreground().finish_waiting();
2367
2368        // Open the buffer on the host.
2369        let buffer_a = project_a
2370            .update(&mut cx_a, |p, cx| {
2371                p.open_buffer((worktree_id, "main.rs"), cx)
2372            })
2373            .await
2374            .unwrap();
2375        buffer_a
2376            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2377            .await;
2378
2379        // Confirm a completion on the guest.
2380        editor_b
2381            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2382            .await;
2383        editor_b.update(&mut cx_b, |editor, cx| {
2384            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2385            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2386        });
2387
2388        // Return a resolved completion from the host's language server.
2389        // The resolved completion has an additional text edit.
2390        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2391            assert_eq!(params.label, "first_method(…)");
2392            lsp::CompletionItem {
2393                label: "first_method(…)".into(),
2394                detail: Some("fn(&mut self, B) -> C".into()),
2395                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2396                    new_text: "first_method($1)".to_string(),
2397                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2398                })),
2399                additional_text_edits: Some(vec![lsp::TextEdit {
2400                    new_text: "use d::SomeTrait;\n".to_string(),
2401                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2402                }]),
2403                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2404                ..Default::default()
2405            }
2406        });
2407
2408        // The additional edit is applied.
2409        buffer_a
2410            .condition(&cx_a, |buffer, _| {
2411                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2412            })
2413            .await;
2414        buffer_b
2415            .condition(&cx_b, |buffer, _| {
2416                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2417            })
2418            .await;
2419    }
2420
2421    #[gpui::test(iterations = 10)]
2422    async fn test_formatting_buffer(
2423        mut cx_a: TestAppContext,
2424        mut cx_b: TestAppContext,
2425        last_iteration: bool,
2426    ) {
2427        cx_a.foreground().forbid_parking();
2428        let mut lang_registry = Arc::new(LanguageRegistry::new());
2429        let fs = Arc::new(FakeFs::new(cx_a.background()));
2430
2431        // Set up a fake language server.
2432        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2433        Arc::get_mut(&mut lang_registry)
2434            .unwrap()
2435            .add(Arc::new(Language::new(
2436                LanguageConfig {
2437                    name: "Rust".to_string(),
2438                    path_suffixes: vec!["rs".to_string()],
2439                    language_server: Some(language_server_config),
2440                    ..Default::default()
2441                },
2442                Some(tree_sitter_rust::language()),
2443            )));
2444
2445        // Connect to a server as 2 clients.
2446        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2447        let client_a = server.create_client(&mut cx_a, "user_a").await;
2448        let client_b = server.create_client(&mut cx_b, "user_b").await;
2449
2450        // Share a project as client A
2451        fs.insert_tree(
2452            "/a",
2453            json!({
2454                ".zed.toml": r#"collaborators = ["user_b"]"#,
2455                "a.rs": "let one = two",
2456            }),
2457        )
2458        .await;
2459        let project_a = cx_a.update(|cx| {
2460            Project::local(
2461                client_a.clone(),
2462                client_a.user_store.clone(),
2463                lang_registry.clone(),
2464                fs.clone(),
2465                cx,
2466            )
2467        });
2468        let (worktree_a, _) = project_a
2469            .update(&mut cx_a, |p, cx| {
2470                p.find_or_create_local_worktree("/a", false, cx)
2471            })
2472            .await
2473            .unwrap();
2474        worktree_a
2475            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2476            .await;
2477        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2478        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2479        project_a
2480            .update(&mut cx_a, |p, cx| p.share(cx))
2481            .await
2482            .unwrap();
2483
2484        // Join the worktree as client B.
2485        let project_b = Project::remote(
2486            project_id,
2487            client_b.clone(),
2488            client_b.user_store.clone(),
2489            lang_registry.clone(),
2490            fs.clone(),
2491            &mut cx_b.to_async(),
2492        )
2493        .await
2494        .unwrap();
2495
2496        let buffer_b = cx_b
2497            .background()
2498            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2499            .await
2500            .unwrap();
2501
2502        let format = project_b.update(&mut cx_b, |project, cx| {
2503            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2504        });
2505
2506        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2507        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2508            Some(vec![
2509                lsp::TextEdit {
2510                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2511                    new_text: "h".to_string(),
2512                },
2513                lsp::TextEdit {
2514                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2515                    new_text: "y".to_string(),
2516                },
2517            ])
2518        });
2519
2520        format.await.unwrap();
2521        assert_eq!(
2522            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2523            "let honey = two"
2524        );
2525    }
2526
2527    #[gpui::test(iterations = 10)]
2528    async fn test_definition(
2529        mut cx_a: TestAppContext,
2530        mut cx_b: TestAppContext,
2531        last_iteration: bool,
2532    ) {
2533        cx_a.foreground().forbid_parking();
2534        let mut lang_registry = Arc::new(LanguageRegistry::new());
2535        let fs = Arc::new(FakeFs::new(cx_a.background()));
2536        fs.insert_tree(
2537            "/root-1",
2538            json!({
2539                ".zed.toml": r#"collaborators = ["user_b"]"#,
2540                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2541            }),
2542        )
2543        .await;
2544        fs.insert_tree(
2545            "/root-2",
2546            json!({
2547                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2548            }),
2549        )
2550        .await;
2551
2552        // Set up a fake language server.
2553        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2554        Arc::get_mut(&mut lang_registry)
2555            .unwrap()
2556            .add(Arc::new(Language::new(
2557                LanguageConfig {
2558                    name: "Rust".to_string(),
2559                    path_suffixes: vec!["rs".to_string()],
2560                    language_server: Some(language_server_config),
2561                    ..Default::default()
2562                },
2563                Some(tree_sitter_rust::language()),
2564            )));
2565
2566        // Connect to a server as 2 clients.
2567        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2568        let client_a = server.create_client(&mut cx_a, "user_a").await;
2569        let client_b = server.create_client(&mut cx_b, "user_b").await;
2570
2571        // Share a project as client A
2572        let project_a = cx_a.update(|cx| {
2573            Project::local(
2574                client_a.clone(),
2575                client_a.user_store.clone(),
2576                lang_registry.clone(),
2577                fs.clone(),
2578                cx,
2579            )
2580        });
2581        let (worktree_a, _) = project_a
2582            .update(&mut cx_a, |p, cx| {
2583                p.find_or_create_local_worktree("/root-1", false, cx)
2584            })
2585            .await
2586            .unwrap();
2587        worktree_a
2588            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2589            .await;
2590        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2591        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2592        project_a
2593            .update(&mut cx_a, |p, cx| p.share(cx))
2594            .await
2595            .unwrap();
2596
2597        // Join the worktree as client B.
2598        let project_b = Project::remote(
2599            project_id,
2600            client_b.clone(),
2601            client_b.user_store.clone(),
2602            lang_registry.clone(),
2603            fs.clone(),
2604            &mut cx_b.to_async(),
2605        )
2606        .await
2607        .unwrap();
2608
2609        // Open the file on client B.
2610        let buffer_b = cx_b
2611            .background()
2612            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2613            .await
2614            .unwrap();
2615
2616        // Request the definition of a symbol as the guest.
2617        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2618
2619        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2620        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2621            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2622                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2623                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2624            )))
2625        });
2626
2627        let definitions_1 = definitions_1.await.unwrap();
2628        cx_b.read(|cx| {
2629            assert_eq!(definitions_1.len(), 1);
2630            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2631            let target_buffer = definitions_1[0].target_buffer.read(cx);
2632            assert_eq!(
2633                target_buffer.text(),
2634                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2635            );
2636            assert_eq!(
2637                definitions_1[0].target_range.to_point(target_buffer),
2638                Point::new(0, 6)..Point::new(0, 9)
2639            );
2640        });
2641
2642        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2643        // the previous call to `definition`.
2644        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2645        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2646            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2647                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2648                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2649            )))
2650        });
2651
2652        let definitions_2 = definitions_2.await.unwrap();
2653        cx_b.read(|cx| {
2654            assert_eq!(definitions_2.len(), 1);
2655            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2656            let target_buffer = definitions_2[0].target_buffer.read(cx);
2657            assert_eq!(
2658                target_buffer.text(),
2659                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2660            );
2661            assert_eq!(
2662                definitions_2[0].target_range.to_point(target_buffer),
2663                Point::new(1, 6)..Point::new(1, 11)
2664            );
2665        });
2666        assert_eq!(
2667            definitions_1[0].target_buffer,
2668            definitions_2[0].target_buffer
2669        );
2670
2671        cx_b.update(|_| {
2672            drop(definitions_1);
2673            drop(definitions_2);
2674        });
2675        project_b
2676            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2677            .await;
2678    }
2679
2680    #[gpui::test(iterations = 10)]
2681    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2682        mut cx_a: TestAppContext,
2683        mut cx_b: TestAppContext,
2684        mut rng: StdRng,
2685        last_iteration: bool,
2686    ) {
2687        cx_a.foreground().forbid_parking();
2688        let mut lang_registry = Arc::new(LanguageRegistry::new());
2689        let fs = Arc::new(FakeFs::new(cx_a.background()));
2690        fs.insert_tree(
2691            "/root",
2692            json!({
2693                ".zed.toml": r#"collaborators = ["user_b"]"#,
2694                "a.rs": "const ONE: usize = b::TWO;",
2695                "b.rs": "const TWO: usize = 2",
2696            }),
2697        )
2698        .await;
2699
2700        // Set up a fake language server.
2701        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2702
2703        Arc::get_mut(&mut lang_registry)
2704            .unwrap()
2705            .add(Arc::new(Language::new(
2706                LanguageConfig {
2707                    name: "Rust".to_string(),
2708                    path_suffixes: vec!["rs".to_string()],
2709                    language_server: Some(language_server_config),
2710                    ..Default::default()
2711                },
2712                Some(tree_sitter_rust::language()),
2713            )));
2714
2715        // Connect to a server as 2 clients.
2716        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2717        let client_a = server.create_client(&mut cx_a, "user_a").await;
2718        let client_b = server.create_client(&mut cx_b, "user_b").await;
2719
2720        // Share a project as client A
2721        let project_a = cx_a.update(|cx| {
2722            Project::local(
2723                client_a.clone(),
2724                client_a.user_store.clone(),
2725                lang_registry.clone(),
2726                fs.clone(),
2727                cx,
2728            )
2729        });
2730
2731        let (worktree_a, _) = project_a
2732            .update(&mut cx_a, |p, cx| {
2733                p.find_or_create_local_worktree("/root", false, cx)
2734            })
2735            .await
2736            .unwrap();
2737        worktree_a
2738            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2739            .await;
2740        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2741        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2742        project_a
2743            .update(&mut cx_a, |p, cx| p.share(cx))
2744            .await
2745            .unwrap();
2746
2747        // Join the worktree as client B.
2748        let project_b = Project::remote(
2749            project_id,
2750            client_b.clone(),
2751            client_b.user_store.clone(),
2752            lang_registry.clone(),
2753            fs.clone(),
2754            &mut cx_b.to_async(),
2755        )
2756        .await
2757        .unwrap();
2758
2759        let buffer_b1 = cx_b
2760            .background()
2761            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2762            .await
2763            .unwrap();
2764
2765        let definitions;
2766        let buffer_b2;
2767        if rng.gen() {
2768            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2769            buffer_b2 =
2770                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2771        } else {
2772            buffer_b2 =
2773                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2774            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2775        }
2776
2777        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2778        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2779            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2780                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2781                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2782            )))
2783        });
2784
2785        let buffer_b2 = buffer_b2.await.unwrap();
2786        let definitions = definitions.await.unwrap();
2787        assert_eq!(definitions.len(), 1);
2788        assert_eq!(definitions[0].target_buffer, buffer_b2);
2789    }
2790
2791    #[gpui::test(iterations = 10)]
2792    async fn test_collaborating_with_code_actions(
2793        mut cx_a: TestAppContext,
2794        mut cx_b: TestAppContext,
2795        last_iteration: bool,
2796    ) {
2797        cx_a.foreground().forbid_parking();
2798        let mut lang_registry = Arc::new(LanguageRegistry::new());
2799        let fs = Arc::new(FakeFs::new(cx_a.background()));
2800        let mut path_openers_b = Vec::new();
2801        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2802
2803        // Set up a fake language server.
2804        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2805        Arc::get_mut(&mut lang_registry)
2806            .unwrap()
2807            .add(Arc::new(Language::new(
2808                LanguageConfig {
2809                    name: "Rust".to_string(),
2810                    path_suffixes: vec!["rs".to_string()],
2811                    language_server: Some(language_server_config),
2812                    ..Default::default()
2813                },
2814                Some(tree_sitter_rust::language()),
2815            )));
2816
2817        // Connect to a server as 2 clients.
2818        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2819        let client_a = server.create_client(&mut cx_a, "user_a").await;
2820        let client_b = server.create_client(&mut cx_b, "user_b").await;
2821
2822        // Share a project as client A
2823        fs.insert_tree(
2824            "/a",
2825            json!({
2826                ".zed.toml": r#"collaborators = ["user_b"]"#,
2827                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2828                "other.rs": "pub fn foo() -> usize { 4 }",
2829            }),
2830        )
2831        .await;
2832        let project_a = cx_a.update(|cx| {
2833            Project::local(
2834                client_a.clone(),
2835                client_a.user_store.clone(),
2836                lang_registry.clone(),
2837                fs.clone(),
2838                cx,
2839            )
2840        });
2841        let (worktree_a, _) = project_a
2842            .update(&mut cx_a, |p, cx| {
2843                p.find_or_create_local_worktree("/a", false, cx)
2844            })
2845            .await
2846            .unwrap();
2847        worktree_a
2848            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2849            .await;
2850        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2851        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2852        project_a
2853            .update(&mut cx_a, |p, cx| p.share(cx))
2854            .await
2855            .unwrap();
2856
2857        // Join the worktree as client B.
2858        let project_b = Project::remote(
2859            project_id,
2860            client_b.clone(),
2861            client_b.user_store.clone(),
2862            lang_registry.clone(),
2863            fs.clone(),
2864            &mut cx_b.to_async(),
2865        )
2866        .await
2867        .unwrap();
2868        let mut params = cx_b.update(WorkspaceParams::test);
2869        params.languages = lang_registry.clone();
2870        params.client = client_b.client.clone();
2871        params.user_store = client_b.user_store.clone();
2872        params.project = project_b;
2873        params.path_openers = path_openers_b.into();
2874
2875        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2876        let editor_b = workspace_b
2877            .update(&mut cx_b, |workspace, cx| {
2878                workspace.open_path((worktree_id, "main.rs").into(), cx)
2879            })
2880            .await
2881            .unwrap()
2882            .downcast::<Editor>()
2883            .unwrap();
2884
2885        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2886        fake_language_server
2887            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2888                assert_eq!(
2889                    params.text_document.uri,
2890                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2891                );
2892                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2893                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2894                None
2895            })
2896            .next()
2897            .await;
2898
2899        // Move cursor to a location that contains code actions.
2900        editor_b.update(&mut cx_b, |editor, cx| {
2901            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2902            cx.focus(&editor_b);
2903        });
2904
2905        fake_language_server
2906            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2907                assert_eq!(
2908                    params.text_document.uri,
2909                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2910                );
2911                assert_eq!(params.range.start, lsp::Position::new(1, 31));
2912                assert_eq!(params.range.end, lsp::Position::new(1, 31));
2913
2914                Some(vec![lsp::CodeActionOrCommand::CodeAction(
2915                    lsp::CodeAction {
2916                        title: "Inline into all callers".to_string(),
2917                        edit: Some(lsp::WorkspaceEdit {
2918                            changes: Some(
2919                                [
2920                                    (
2921                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
2922                                        vec![lsp::TextEdit::new(
2923                                            lsp::Range::new(
2924                                                lsp::Position::new(1, 22),
2925                                                lsp::Position::new(1, 34),
2926                                            ),
2927                                            "4".to_string(),
2928                                        )],
2929                                    ),
2930                                    (
2931                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
2932                                        vec![lsp::TextEdit::new(
2933                                            lsp::Range::new(
2934                                                lsp::Position::new(0, 0),
2935                                                lsp::Position::new(0, 27),
2936                                            ),
2937                                            "".to_string(),
2938                                        )],
2939                                    ),
2940                                ]
2941                                .into_iter()
2942                                .collect(),
2943                            ),
2944                            ..Default::default()
2945                        }),
2946                        data: Some(json!({
2947                            "codeActionParams": {
2948                                "range": {
2949                                    "start": {"line": 1, "column": 31},
2950                                    "end": {"line": 1, "column": 31},
2951                                }
2952                            }
2953                        })),
2954                        ..Default::default()
2955                    },
2956                )])
2957            })
2958            .next()
2959            .await;
2960
2961        // Toggle code actions and wait for them to display.
2962        editor_b.update(&mut cx_b, |editor, cx| {
2963            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2964        });
2965        editor_b
2966            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2967            .await;
2968
2969        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
2970
2971        // Confirming the code action will trigger a resolve request.
2972        let confirm_action = workspace_b
2973            .update(&mut cx_b, |workspace, cx| {
2974                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2975            })
2976            .unwrap();
2977        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2978            lsp::CodeAction {
2979                title: "Inline into all callers".to_string(),
2980                edit: Some(lsp::WorkspaceEdit {
2981                    changes: Some(
2982                        [
2983                            (
2984                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2985                                vec![lsp::TextEdit::new(
2986                                    lsp::Range::new(
2987                                        lsp::Position::new(1, 22),
2988                                        lsp::Position::new(1, 34),
2989                                    ),
2990                                    "4".to_string(),
2991                                )],
2992                            ),
2993                            (
2994                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2995                                vec![lsp::TextEdit::new(
2996                                    lsp::Range::new(
2997                                        lsp::Position::new(0, 0),
2998                                        lsp::Position::new(0, 27),
2999                                    ),
3000                                    "".to_string(),
3001                                )],
3002                            ),
3003                        ]
3004                        .into_iter()
3005                        .collect(),
3006                    ),
3007                    ..Default::default()
3008                }),
3009                ..Default::default()
3010            }
3011        });
3012
3013        // After the action is confirmed, an editor containing both modified files is opened.
3014        confirm_action.await.unwrap();
3015        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3016            workspace
3017                .active_item(cx)
3018                .unwrap()
3019                .downcast::<Editor>()
3020                .unwrap()
3021        });
3022        code_action_editor.update(&mut cx_b, |editor, cx| {
3023            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3024            editor.undo(&Undo, cx);
3025            assert_eq!(
3026                editor.text(cx),
3027                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3028            );
3029            editor.redo(&Redo, cx);
3030            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3031        });
3032    }
3033
3034    #[gpui::test(iterations = 10)]
3035    async fn test_basic_chat(
3036        mut cx_a: TestAppContext,
3037        mut cx_b: TestAppContext,
3038        last_iteration: bool,
3039    ) {
3040        cx_a.foreground().forbid_parking();
3041
3042        // Connect to a server as 2 clients.
3043        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3044        let client_a = server.create_client(&mut cx_a, "user_a").await;
3045        let client_b = server.create_client(&mut cx_b, "user_b").await;
3046
3047        // Create an org that includes these 2 users.
3048        let db = &server.app_state.db;
3049        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3050        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3051            .await
3052            .unwrap();
3053        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3054            .await
3055            .unwrap();
3056
3057        // Create a channel that includes all the users.
3058        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3059        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3060            .await
3061            .unwrap();
3062        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3063            .await
3064            .unwrap();
3065        db.create_channel_message(
3066            channel_id,
3067            client_b.current_user_id(&cx_b),
3068            "hello A, it's B.",
3069            OffsetDateTime::now_utc(),
3070            1,
3071        )
3072        .await
3073        .unwrap();
3074
3075        let channels_a = cx_a
3076            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3077        channels_a
3078            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3079            .await;
3080        channels_a.read_with(&cx_a, |list, _| {
3081            assert_eq!(
3082                list.available_channels().unwrap(),
3083                &[ChannelDetails {
3084                    id: channel_id.to_proto(),
3085                    name: "test-channel".to_string()
3086                }]
3087            )
3088        });
3089        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3090            this.get_channel(channel_id.to_proto(), cx).unwrap()
3091        });
3092        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3093        channel_a
3094            .condition(&cx_a, |channel, _| {
3095                channel_messages(channel)
3096                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3097            })
3098            .await;
3099
3100        let channels_b = cx_b
3101            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3102        channels_b
3103            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3104            .await;
3105        channels_b.read_with(&cx_b, |list, _| {
3106            assert_eq!(
3107                list.available_channels().unwrap(),
3108                &[ChannelDetails {
3109                    id: channel_id.to_proto(),
3110                    name: "test-channel".to_string()
3111                }]
3112            )
3113        });
3114
3115        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3116            this.get_channel(channel_id.to_proto(), cx).unwrap()
3117        });
3118        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3119        channel_b
3120            .condition(&cx_b, |channel, _| {
3121                channel_messages(channel)
3122                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3123            })
3124            .await;
3125
3126        channel_a
3127            .update(&mut cx_a, |channel, cx| {
3128                channel
3129                    .send_message("oh, hi B.".to_string(), cx)
3130                    .unwrap()
3131                    .detach();
3132                let task = channel.send_message("sup".to_string(), cx).unwrap();
3133                assert_eq!(
3134                    channel_messages(channel),
3135                    &[
3136                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3137                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3138                        ("user_a".to_string(), "sup".to_string(), true)
3139                    ]
3140                );
3141                task
3142            })
3143            .await
3144            .unwrap();
3145
3146        channel_b
3147            .condition(&cx_b, |channel, _| {
3148                channel_messages(channel)
3149                    == [
3150                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3151                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3152                        ("user_a".to_string(), "sup".to_string(), false),
3153                    ]
3154            })
3155            .await;
3156
3157        assert_eq!(
3158            server
3159                .state()
3160                .await
3161                .channel(channel_id)
3162                .unwrap()
3163                .connection_ids
3164                .len(),
3165            2
3166        );
3167        cx_b.update(|_| drop(channel_b));
3168        server
3169            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3170            .await;
3171
3172        cx_a.update(|_| drop(channel_a));
3173        server
3174            .condition(|state| state.channel(channel_id).is_none())
3175            .await;
3176    }
3177
3178    #[gpui::test(iterations = 10)]
3179    async fn test_chat_message_validation(mut cx_a: TestAppContext, last_iteration: bool) {
3180        cx_a.foreground().forbid_parking();
3181
3182        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3183        let client_a = server.create_client(&mut cx_a, "user_a").await;
3184
3185        let db = &server.app_state.db;
3186        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3187        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3188        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3189            .await
3190            .unwrap();
3191        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3192            .await
3193            .unwrap();
3194
3195        let channels_a = cx_a
3196            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3197        channels_a
3198            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3199            .await;
3200        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3201            this.get_channel(channel_id.to_proto(), cx).unwrap()
3202        });
3203
3204        // Messages aren't allowed to be too long.
3205        channel_a
3206            .update(&mut cx_a, |channel, cx| {
3207                let long_body = "this is long.\n".repeat(1024);
3208                channel.send_message(long_body, cx).unwrap()
3209            })
3210            .await
3211            .unwrap_err();
3212
3213        // Messages aren't allowed to be blank.
3214        channel_a.update(&mut cx_a, |channel, cx| {
3215            channel.send_message(String::new(), cx).unwrap_err()
3216        });
3217
3218        // Leading and trailing whitespace are trimmed.
3219        channel_a
3220            .update(&mut cx_a, |channel, cx| {
3221                channel
3222                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3223                    .unwrap()
3224            })
3225            .await
3226            .unwrap();
3227        assert_eq!(
3228            db.get_channel_messages(channel_id, 10, None)
3229                .await
3230                .unwrap()
3231                .iter()
3232                .map(|m| &m.body)
3233                .collect::<Vec<_>>(),
3234            &["surrounded by whitespace"]
3235        );
3236    }
3237
3238    #[gpui::test(iterations = 10)]
3239    async fn test_chat_reconnection(
3240        mut cx_a: TestAppContext,
3241        mut cx_b: TestAppContext,
3242        last_iteration: bool,
3243    ) {
3244        cx_a.foreground().forbid_parking();
3245
3246        // Connect to a server as 2 clients.
3247        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3248        let client_a = server.create_client(&mut cx_a, "user_a").await;
3249        let client_b = server.create_client(&mut cx_b, "user_b").await;
3250        let mut status_b = client_b.status();
3251
3252        // Create an org that includes these 2 users.
3253        let db = &server.app_state.db;
3254        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3255        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3256            .await
3257            .unwrap();
3258        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3259            .await
3260            .unwrap();
3261
3262        // Create a channel that includes all the users.
3263        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3264        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3265            .await
3266            .unwrap();
3267        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3268            .await
3269            .unwrap();
3270        db.create_channel_message(
3271            channel_id,
3272            client_b.current_user_id(&cx_b),
3273            "hello A, it's B.",
3274            OffsetDateTime::now_utc(),
3275            2,
3276        )
3277        .await
3278        .unwrap();
3279
3280        let channels_a = cx_a
3281            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3282        channels_a
3283            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3284            .await;
3285
3286        channels_a.read_with(&cx_a, |list, _| {
3287            assert_eq!(
3288                list.available_channels().unwrap(),
3289                &[ChannelDetails {
3290                    id: channel_id.to_proto(),
3291                    name: "test-channel".to_string()
3292                }]
3293            )
3294        });
3295        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3296            this.get_channel(channel_id.to_proto(), cx).unwrap()
3297        });
3298        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3299        channel_a
3300            .condition(&cx_a, |channel, _| {
3301                channel_messages(channel)
3302                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3303            })
3304            .await;
3305
3306        let channels_b = cx_b
3307            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3308        channels_b
3309            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3310            .await;
3311        channels_b.read_with(&cx_b, |list, _| {
3312            assert_eq!(
3313                list.available_channels().unwrap(),
3314                &[ChannelDetails {
3315                    id: channel_id.to_proto(),
3316                    name: "test-channel".to_string()
3317                }]
3318            )
3319        });
3320
3321        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3322            this.get_channel(channel_id.to_proto(), cx).unwrap()
3323        });
3324        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3325        channel_b
3326            .condition(&cx_b, |channel, _| {
3327                channel_messages(channel)
3328                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3329            })
3330            .await;
3331
3332        // Disconnect client B, ensuring we can still access its cached channel data.
3333        server.forbid_connections();
3334        server.disconnect_client(client_b.current_user_id(&cx_b));
3335        while !matches!(
3336            status_b.next().await,
3337            Some(client::Status::ReconnectionError { .. })
3338        ) {}
3339
3340        channels_b.read_with(&cx_b, |channels, _| {
3341            assert_eq!(
3342                channels.available_channels().unwrap(),
3343                [ChannelDetails {
3344                    id: channel_id.to_proto(),
3345                    name: "test-channel".to_string()
3346                }]
3347            )
3348        });
3349        channel_b.read_with(&cx_b, |channel, _| {
3350            assert_eq!(
3351                channel_messages(channel),
3352                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3353            )
3354        });
3355
3356        // Send a message from client B while it is disconnected.
3357        channel_b
3358            .update(&mut cx_b, |channel, cx| {
3359                let task = channel
3360                    .send_message("can you see this?".to_string(), cx)
3361                    .unwrap();
3362                assert_eq!(
3363                    channel_messages(channel),
3364                    &[
3365                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3366                        ("user_b".to_string(), "can you see this?".to_string(), true)
3367                    ]
3368                );
3369                task
3370            })
3371            .await
3372            .unwrap_err();
3373
3374        // Send a message from client A while B is disconnected.
3375        channel_a
3376            .update(&mut cx_a, |channel, cx| {
3377                channel
3378                    .send_message("oh, hi B.".to_string(), cx)
3379                    .unwrap()
3380                    .detach();
3381                let task = channel.send_message("sup".to_string(), cx).unwrap();
3382                assert_eq!(
3383                    channel_messages(channel),
3384                    &[
3385                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3386                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3387                        ("user_a".to_string(), "sup".to_string(), true)
3388                    ]
3389                );
3390                task
3391            })
3392            .await
3393            .unwrap();
3394
3395        // Give client B a chance to reconnect.
3396        server.allow_connections();
3397        cx_b.foreground().advance_clock(Duration::from_secs(10));
3398
3399        // Verify that B sees the new messages upon reconnection, as well as the message client B
3400        // sent while offline.
3401        channel_b
3402            .condition(&cx_b, |channel, _| {
3403                channel_messages(channel)
3404                    == [
3405                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3406                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3407                        ("user_a".to_string(), "sup".to_string(), false),
3408                        ("user_b".to_string(), "can you see this?".to_string(), false),
3409                    ]
3410            })
3411            .await;
3412
3413        // Ensure client A and B can communicate normally after reconnection.
3414        channel_a
3415            .update(&mut cx_a, |channel, cx| {
3416                channel.send_message("you online?".to_string(), cx).unwrap()
3417            })
3418            .await
3419            .unwrap();
3420        channel_b
3421            .condition(&cx_b, |channel, _| {
3422                channel_messages(channel)
3423                    == [
3424                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3425                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3426                        ("user_a".to_string(), "sup".to_string(), false),
3427                        ("user_b".to_string(), "can you see this?".to_string(), false),
3428                        ("user_a".to_string(), "you online?".to_string(), false),
3429                    ]
3430            })
3431            .await;
3432
3433        channel_b
3434            .update(&mut cx_b, |channel, cx| {
3435                channel.send_message("yep".to_string(), cx).unwrap()
3436            })
3437            .await
3438            .unwrap();
3439        channel_a
3440            .condition(&cx_a, |channel, _| {
3441                channel_messages(channel)
3442                    == [
3443                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3444                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3445                        ("user_a".to_string(), "sup".to_string(), false),
3446                        ("user_b".to_string(), "can you see this?".to_string(), false),
3447                        ("user_a".to_string(), "you online?".to_string(), false),
3448                        ("user_b".to_string(), "yep".to_string(), false),
3449                    ]
3450            })
3451            .await;
3452    }
3453
3454    #[gpui::test(iterations = 10)]
3455    async fn test_contacts(
3456        mut cx_a: TestAppContext,
3457        mut cx_b: TestAppContext,
3458        mut cx_c: TestAppContext,
3459        last_iteration: bool,
3460    ) {
3461        cx_a.foreground().forbid_parking();
3462        let lang_registry = Arc::new(LanguageRegistry::new());
3463        let fs = Arc::new(FakeFs::new(cx_a.background()));
3464
3465        // Connect to a server as 3 clients.
3466        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3467        let client_a = server.create_client(&mut cx_a, "user_a").await;
3468        let client_b = server.create_client(&mut cx_b, "user_b").await;
3469        let client_c = server.create_client(&mut cx_c, "user_c").await;
3470
3471        // Share a worktree as client A.
3472        fs.insert_tree(
3473            "/a",
3474            json!({
3475                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3476            }),
3477        )
3478        .await;
3479
3480        let project_a = cx_a.update(|cx| {
3481            Project::local(
3482                client_a.clone(),
3483                client_a.user_store.clone(),
3484                lang_registry.clone(),
3485                fs.clone(),
3486                cx,
3487            )
3488        });
3489        let (worktree_a, _) = project_a
3490            .update(&mut cx_a, |p, cx| {
3491                p.find_or_create_local_worktree("/a", false, cx)
3492            })
3493            .await
3494            .unwrap();
3495        worktree_a
3496            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3497            .await;
3498
3499        client_a
3500            .user_store
3501            .condition(&cx_a, |user_store, _| {
3502                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3503            })
3504            .await;
3505        client_b
3506            .user_store
3507            .condition(&cx_b, |user_store, _| {
3508                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3509            })
3510            .await;
3511        client_c
3512            .user_store
3513            .condition(&cx_c, |user_store, _| {
3514                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3515            })
3516            .await;
3517
3518        let project_id = project_a
3519            .update(&mut cx_a, |project, _| project.next_remote_id())
3520            .await;
3521        project_a
3522            .update(&mut cx_a, |project, cx| project.share(cx))
3523            .await
3524            .unwrap();
3525
3526        let _project_b = Project::remote(
3527            project_id,
3528            client_b.clone(),
3529            client_b.user_store.clone(),
3530            lang_registry.clone(),
3531            fs.clone(),
3532            &mut cx_b.to_async(),
3533        )
3534        .await
3535        .unwrap();
3536
3537        client_a
3538            .user_store
3539            .condition(&cx_a, |user_store, _| {
3540                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3541            })
3542            .await;
3543        client_b
3544            .user_store
3545            .condition(&cx_b, |user_store, _| {
3546                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3547            })
3548            .await;
3549        client_c
3550            .user_store
3551            .condition(&cx_c, |user_store, _| {
3552                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3553            })
3554            .await;
3555
3556        project_a
3557            .condition(&cx_a, |project, _| {
3558                project.collaborators().contains_key(&client_b.peer_id)
3559            })
3560            .await;
3561
3562        cx_a.update(move |_| drop(project_a));
3563        client_a
3564            .user_store
3565            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3566            .await;
3567        client_b
3568            .user_store
3569            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3570            .await;
3571        client_c
3572            .user_store
3573            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3574            .await;
3575
3576        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3577            user_store
3578                .contacts()
3579                .iter()
3580                .map(|contact| {
3581                    let worktrees = contact
3582                        .projects
3583                        .iter()
3584                        .map(|p| {
3585                            (
3586                                p.worktree_root_names[0].as_str(),
3587                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3588                            )
3589                        })
3590                        .collect();
3591                    (contact.user.github_login.as_str(), worktrees)
3592                })
3593                .collect()
3594        }
3595    }
3596
3597    #[gpui::test(iterations = 100)]
3598    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng, last_iteration: bool) {
3599        cx.foreground().forbid_parking();
3600        let max_peers = env::var("MAX_PEERS")
3601            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3602            .unwrap_or(5);
3603        let max_operations = env::var("OPERATIONS")
3604            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3605            .unwrap_or(10);
3606
3607        let rng = Rc::new(RefCell::new(rng));
3608
3609        let mut host_lang_registry = Arc::new(LanguageRegistry::new());
3610        let guest_lang_registry = Arc::new(LanguageRegistry::new());
3611
3612        // Set up a fake language server.
3613        let (mut language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
3614        language_server_config.set_fake_initializer(|fake_server| {
3615            fake_server.handle_request::<lsp::request::Completion, _>(|_| {
3616                Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
3617                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
3618                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
3619                        new_text: "the-new-text".to_string(),
3620                    })),
3621                    ..Default::default()
3622                }]))
3623            });
3624
3625            fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
3626                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3627                    lsp::CodeAction {
3628                        title: "the-code-action".to_string(),
3629                        ..Default::default()
3630                    },
3631                )])
3632            });
3633        });
3634
3635        Arc::get_mut(&mut host_lang_registry)
3636            .unwrap()
3637            .add(Arc::new(Language::new(
3638                LanguageConfig {
3639                    name: "Rust".to_string(),
3640                    path_suffixes: vec!["rs".to_string()],
3641                    language_server: Some(language_server_config),
3642                    ..Default::default()
3643                },
3644                Some(tree_sitter_rust::language()),
3645            )));
3646
3647        let fs = Arc::new(FakeFs::new(cx.background()));
3648        fs.insert_tree(
3649            "/_collab",
3650            json!({
3651                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3652            }),
3653        )
3654        .await;
3655
3656        let operations = Rc::new(Cell::new(0));
3657        let mut server = TestServer::start(cx.foreground(), last_iteration).await;
3658        let mut clients = Vec::new();
3659
3660        let mut next_entity_id = 100000;
3661        let mut host_cx = TestAppContext::new(
3662            cx.foreground_platform(),
3663            cx.platform(),
3664            cx.foreground(),
3665            cx.background(),
3666            cx.font_cache(),
3667            next_entity_id,
3668        );
3669        let host = server.create_client(&mut host_cx, "host").await;
3670        let host_project = host_cx.update(|cx| {
3671            Project::local(
3672                host.client.clone(),
3673                host.user_store.clone(),
3674                host_lang_registry.clone(),
3675                fs.clone(),
3676                cx,
3677            )
3678        });
3679        let host_project_id = host_project
3680            .update(&mut host_cx, |p, _| p.next_remote_id())
3681            .await;
3682
3683        let (collab_worktree, _) = host_project
3684            .update(&mut host_cx, |project, cx| {
3685                project.find_or_create_local_worktree("/_collab", false, cx)
3686            })
3687            .await
3688            .unwrap();
3689        collab_worktree
3690            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3691            .await;
3692        host_project
3693            .update(&mut host_cx, |project, cx| project.share(cx))
3694            .await
3695            .unwrap();
3696
3697        clients.push(cx.foreground().spawn(host.simulate_host(
3698            host_project.clone(),
3699            operations.clone(),
3700            max_operations,
3701            rng.clone(),
3702            host_cx.clone(),
3703        )));
3704
3705        while operations.get() < max_operations {
3706            cx.background().simulate_random_delay().await;
3707            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3708                operations.set(operations.get() + 1);
3709
3710                let guest_id = clients.len();
3711                log::info!("Adding guest {}", guest_id);
3712                next_entity_id += 100000;
3713                let mut guest_cx = TestAppContext::new(
3714                    cx.foreground_platform(),
3715                    cx.platform(),
3716                    cx.foreground(),
3717                    cx.background(),
3718                    cx.font_cache(),
3719                    next_entity_id,
3720                );
3721                let guest = server
3722                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3723                    .await;
3724                let guest_project = Project::remote(
3725                    host_project_id,
3726                    guest.client.clone(),
3727                    guest.user_store.clone(),
3728                    guest_lang_registry.clone(),
3729                    fs.clone(),
3730                    &mut guest_cx.to_async(),
3731                )
3732                .await
3733                .unwrap();
3734                clients.push(cx.foreground().spawn(guest.simulate_guest(
3735                    guest_id,
3736                    guest_project,
3737                    operations.clone(),
3738                    max_operations,
3739                    rng.clone(),
3740                    guest_cx,
3741                )));
3742
3743                log::info!("Guest {} added", guest_id);
3744            }
3745        }
3746
3747        let clients = futures::future::join_all(clients).await;
3748        cx.foreground().run_until_parked();
3749
3750        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3751            project
3752                .worktrees(cx)
3753                .map(|worktree| {
3754                    let snapshot = worktree.read(cx).snapshot();
3755                    (snapshot.id(), snapshot)
3756                })
3757                .collect::<BTreeMap<_, _>>()
3758        });
3759
3760        for (guest_client, guest_cx) in clients.iter().skip(1) {
3761            let guest_id = guest_client.client.id();
3762            let worktree_snapshots =
3763                guest_client
3764                    .project
3765                    .as_ref()
3766                    .unwrap()
3767                    .read_with(guest_cx, |project, cx| {
3768                        project
3769                            .worktrees(cx)
3770                            .map(|worktree| {
3771                                let worktree = worktree.read(cx);
3772                                assert!(
3773                                    !worktree.as_remote().unwrap().has_pending_updates(),
3774                                    "Guest {} worktree {:?} contains deferred updates",
3775                                    guest_id,
3776                                    worktree.id()
3777                                );
3778                                (worktree.id(), worktree.snapshot())
3779                            })
3780                            .collect::<BTreeMap<_, _>>()
3781                    });
3782
3783            assert_eq!(
3784                worktree_snapshots.keys().collect::<Vec<_>>(),
3785                host_worktree_snapshots.keys().collect::<Vec<_>>(),
3786                "guest {} has different worktrees than the host",
3787                guest_id
3788            );
3789            for (id, host_snapshot) in &host_worktree_snapshots {
3790                let guest_snapshot = &worktree_snapshots[id];
3791                assert_eq!(
3792                    guest_snapshot.root_name(),
3793                    host_snapshot.root_name(),
3794                    "guest {} has different root name than the host for worktree {}",
3795                    guest_id,
3796                    id
3797                );
3798                assert_eq!(
3799                    guest_snapshot.entries(false).collect::<Vec<_>>(),
3800                    host_snapshot.entries(false).collect::<Vec<_>>(),
3801                    "guest {} has different snapshot than the host for worktree {}",
3802                    guest_id,
3803                    id
3804                );
3805            }
3806
3807            guest_client
3808                .project
3809                .as_ref()
3810                .unwrap()
3811                .read_with(guest_cx, |project, _| {
3812                    assert!(
3813                        !project.has_buffered_operations(),
3814                        "guest {} has buffered operations ",
3815                        guest_id,
3816                    );
3817                });
3818
3819            for guest_buffer in &guest_client.buffers {
3820                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
3821                let host_buffer = host_project.read_with(&host_cx, |project, _| {
3822                    project
3823                        .shared_buffer(guest_client.peer_id, buffer_id)
3824                        .expect(&format!(
3825                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
3826                            guest_id, guest_client.peer_id, buffer_id
3827                        ))
3828                });
3829                assert_eq!(
3830                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
3831                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
3832                    "guest {} buffer {} differs from the host's buffer",
3833                    guest_id,
3834                    buffer_id,
3835                );
3836            }
3837        }
3838    }
3839
3840    struct TestServer {
3841        peer: Arc<Peer>,
3842        app_state: Arc<AppState>,
3843        server: Arc<Server>,
3844        foreground: Rc<executor::Foreground>,
3845        notifications: mpsc::UnboundedReceiver<()>,
3846        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3847        forbid_connections: Arc<AtomicBool>,
3848        _test_db: TestDb,
3849    }
3850
3851    impl TestServer {
3852        async fn start(foreground: Rc<executor::Foreground>, clean_db_pool_on_drop: bool) -> Self {
3853            let mut test_db = TestDb::new();
3854            test_db.set_clean_pool_on_drop(clean_db_pool_on_drop);
3855            let app_state = Self::build_app_state(&test_db).await;
3856            let peer = Peer::new();
3857            let notifications = mpsc::unbounded();
3858            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3859            Self {
3860                peer,
3861                app_state,
3862                server,
3863                foreground,
3864                notifications: notifications.1,
3865                connection_killers: Default::default(),
3866                forbid_connections: Default::default(),
3867                _test_db: test_db,
3868            }
3869        }
3870
3871        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3872            let http = FakeHttpClient::with_404_response();
3873            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3874            let client_name = name.to_string();
3875            let mut client = Client::new(http.clone());
3876            let server = self.server.clone();
3877            let connection_killers = self.connection_killers.clone();
3878            let forbid_connections = self.forbid_connections.clone();
3879            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
3880
3881            Arc::get_mut(&mut client)
3882                .unwrap()
3883                .override_authenticate(move |cx| {
3884                    cx.spawn(|_| async move {
3885                        let access_token = "the-token".to_string();
3886                        Ok(Credentials {
3887                            user_id: user_id.0 as u64,
3888                            access_token,
3889                        })
3890                    })
3891                })
3892                .override_establish_connection(move |credentials, cx| {
3893                    assert_eq!(credentials.user_id, user_id.0 as u64);
3894                    assert_eq!(credentials.access_token, "the-token");
3895
3896                    let server = server.clone();
3897                    let connection_killers = connection_killers.clone();
3898                    let forbid_connections = forbid_connections.clone();
3899                    let client_name = client_name.clone();
3900                    let connection_id_tx = connection_id_tx.clone();
3901                    cx.spawn(move |cx| async move {
3902                        if forbid_connections.load(SeqCst) {
3903                            Err(EstablishConnectionError::other(anyhow!(
3904                                "server is forbidding connections"
3905                            )))
3906                        } else {
3907                            let (client_conn, server_conn, kill_conn) =
3908                                Connection::in_memory(cx.background());
3909                            connection_killers.lock().insert(user_id, kill_conn);
3910                            cx.background()
3911                                .spawn(server.handle_connection(
3912                                    server_conn,
3913                                    client_name,
3914                                    user_id,
3915                                    Some(connection_id_tx),
3916                                    cx.background(),
3917                                ))
3918                                .detach();
3919                            Ok(client_conn)
3920                        }
3921                    })
3922                });
3923
3924            client
3925                .authenticate_and_connect(&cx.to_async())
3926                .await
3927                .unwrap();
3928
3929            Channel::init(&client);
3930            Project::init(&client);
3931
3932            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3933            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3934            let mut authed_user =
3935                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3936            while authed_user.next().await.unwrap().is_none() {}
3937
3938            TestClient {
3939                client,
3940                peer_id,
3941                user_store,
3942                project: Default::default(),
3943                buffers: Default::default(),
3944            }
3945        }
3946
3947        fn disconnect_client(&self, user_id: UserId) {
3948            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3949                let _ = kill_conn.try_send(Some(()));
3950            }
3951        }
3952
3953        fn forbid_connections(&self) {
3954            self.forbid_connections.store(true, SeqCst);
3955        }
3956
3957        fn allow_connections(&self) {
3958            self.forbid_connections.store(false, SeqCst);
3959        }
3960
3961        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3962            let mut config = Config::default();
3963            config.session_secret = "a".repeat(32);
3964            config.database_url = test_db.url.clone();
3965            let github_client = github::AppClient::test();
3966            Arc::new(AppState {
3967                db: test_db.db().clone(),
3968                handlebars: Default::default(),
3969                auth_client: auth::build_client("", ""),
3970                repo_client: github::RepoClient::test(&github_client),
3971                github_client,
3972                config,
3973            })
3974        }
3975
3976        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3977            self.server.store.read()
3978        }
3979
3980        async fn condition<F>(&mut self, mut predicate: F)
3981        where
3982            F: FnMut(&Store) -> bool,
3983        {
3984            async_std::future::timeout(Duration::from_millis(500), async {
3985                while !(predicate)(&*self.server.store.read()) {
3986                    self.foreground.start_waiting();
3987                    self.notifications.next().await;
3988                    self.foreground.finish_waiting();
3989                }
3990            })
3991            .await
3992            .expect("condition timed out");
3993        }
3994    }
3995
3996    impl Drop for TestServer {
3997        fn drop(&mut self) {
3998            self.peer.reset();
3999        }
4000    }
4001
4002    struct TestClient {
4003        client: Arc<Client>,
4004        pub peer_id: PeerId,
4005        pub user_store: ModelHandle<UserStore>,
4006        project: Option<ModelHandle<Project>>,
4007        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4008    }
4009
4010    impl Deref for TestClient {
4011        type Target = Arc<Client>;
4012
4013        fn deref(&self) -> &Self::Target {
4014            &self.client
4015        }
4016    }
4017
4018    impl TestClient {
4019        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4020            UserId::from_proto(
4021                self.user_store
4022                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4023            )
4024        }
4025
4026        async fn simulate_host(
4027            mut self,
4028            project: ModelHandle<Project>,
4029            operations: Rc<Cell<usize>>,
4030            max_operations: usize,
4031            rng: Rc<RefCell<StdRng>>,
4032            mut cx: TestAppContext,
4033        ) -> (Self, TestAppContext) {
4034            let fs = project.read_with(&cx, |project, _| project.fs().clone());
4035            let mut files: Vec<PathBuf> = Default::default();
4036            while operations.get() < max_operations {
4037                operations.set(operations.get() + 1);
4038
4039                let distribution = rng.borrow_mut().gen_range(0..100);
4040                match distribution {
4041                    0..=20 if !files.is_empty() => {
4042                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
4043                        while let Some(parent_path) = path.parent() {
4044                            path = parent_path;
4045                            if rng.borrow_mut().gen() {
4046                                break;
4047                            }
4048                        }
4049
4050                        log::info!("Host: find/create local worktree {:?}", path);
4051                        project
4052                            .update(&mut cx, |project, cx| {
4053                                project.find_or_create_local_worktree(path, false, cx)
4054                            })
4055                            .await
4056                            .unwrap();
4057                    }
4058                    10..=80 if !files.is_empty() => {
4059                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4060                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
4061                            let (worktree, path) = project
4062                                .update(&mut cx, |project, cx| {
4063                                    project.find_or_create_local_worktree(file, false, cx)
4064                                })
4065                                .await
4066                                .unwrap();
4067                            let project_path =
4068                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4069                            log::info!("Host: opening path {:?}", project_path);
4070                            let buffer = project
4071                                .update(&mut cx, |project, cx| {
4072                                    project.open_buffer(project_path, cx)
4073                                })
4074                                .await
4075                                .unwrap();
4076                            self.buffers.insert(buffer.clone());
4077                            buffer
4078                        } else {
4079                            self.buffers
4080                                .iter()
4081                                .choose(&mut *rng.borrow_mut())
4082                                .unwrap()
4083                                .clone()
4084                        };
4085
4086                        if rng.borrow_mut().gen_bool(0.1) {
4087                            cx.update(|cx| {
4088                                log::info!(
4089                                    "Host: dropping buffer {:?}",
4090                                    buffer.read(cx).file().unwrap().full_path(cx)
4091                                );
4092                                self.buffers.remove(&buffer);
4093                                drop(buffer);
4094                            });
4095                        } else {
4096                            buffer.update(&mut cx, |buffer, cx| {
4097                                log::info!(
4098                                    "Host: updating buffer {:?}",
4099                                    buffer.file().unwrap().full_path(cx)
4100                                );
4101                                buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4102                            });
4103                        }
4104                    }
4105                    _ => loop {
4106                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
4107                        let mut path = PathBuf::new();
4108                        path.push("/");
4109                        for _ in 0..path_component_count {
4110                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4111                            path.push(std::str::from_utf8(&[letter]).unwrap());
4112                        }
4113                        path.set_extension("rs");
4114                        let parent_path = path.parent().unwrap();
4115
4116                        log::info!("Host: creating file {:?}", path);
4117                        if fs.create_dir(&parent_path).await.is_ok()
4118                            && fs.create_file(&path, Default::default()).await.is_ok()
4119                        {
4120                            files.push(path);
4121                            break;
4122                        } else {
4123                            log::info!("Host: cannot create file");
4124                        }
4125                    },
4126                }
4127
4128                cx.background().simulate_random_delay().await;
4129            }
4130
4131            self.project = Some(project);
4132            (self, cx)
4133        }
4134
4135        pub async fn simulate_guest(
4136            mut self,
4137            guest_id: usize,
4138            project: ModelHandle<Project>,
4139            operations: Rc<Cell<usize>>,
4140            max_operations: usize,
4141            rng: Rc<RefCell<StdRng>>,
4142            mut cx: TestAppContext,
4143        ) -> (Self, TestAppContext) {
4144            while operations.get() < max_operations {
4145                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4146                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4147                        project
4148                            .worktrees(&cx)
4149                            .filter(|worktree| {
4150                                worktree.read(cx).entries(false).any(|e| e.is_file())
4151                            })
4152                            .choose(&mut *rng.borrow_mut())
4153                    }) {
4154                        worktree
4155                    } else {
4156                        cx.background().simulate_random_delay().await;
4157                        continue;
4158                    };
4159
4160                    operations.set(operations.get() + 1);
4161                    let project_path = worktree.read_with(&cx, |worktree, _| {
4162                        let entry = worktree
4163                            .entries(false)
4164                            .filter(|e| e.is_file())
4165                            .choose(&mut *rng.borrow_mut())
4166                            .unwrap();
4167                        (worktree.id(), entry.path.clone())
4168                    });
4169                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4170                    let buffer = project
4171                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4172                        .await
4173                        .unwrap();
4174                    self.buffers.insert(buffer.clone());
4175                    buffer
4176                } else {
4177                    operations.set(operations.get() + 1);
4178
4179                    self.buffers
4180                        .iter()
4181                        .choose(&mut *rng.borrow_mut())
4182                        .unwrap()
4183                        .clone()
4184                };
4185
4186                let choice = rng.borrow_mut().gen_range(0..100);
4187                match choice {
4188                    0..=9 => {
4189                        cx.update(|cx| {
4190                            log::info!(
4191                                "Guest {}: dropping buffer {:?}",
4192                                guest_id,
4193                                buffer.read(cx).file().unwrap().full_path(cx)
4194                            );
4195                            self.buffers.remove(&buffer);
4196                            drop(buffer);
4197                        });
4198                    }
4199                    10..=19 => {
4200                        let completions = project.update(&mut cx, |project, cx| {
4201                            log::info!(
4202                                "Guest {}: requesting completions for buffer {:?}",
4203                                guest_id,
4204                                buffer.read(cx).file().unwrap().full_path(cx)
4205                            );
4206                            let offset = rng.borrow_mut().gen_range(0..=buffer.read(cx).len());
4207                            project.completions(&buffer, offset, cx)
4208                        });
4209                        let completions = cx.background().spawn(async move {
4210                            completions.await.expect("completions request failed");
4211                        });
4212                        if rng.borrow_mut().gen_bool(0.3) {
4213                            log::info!("Guest {}: detaching completions request", guest_id);
4214                            completions.detach();
4215                        } else {
4216                            completions.await;
4217                        }
4218                    }
4219                    20..=29 => {
4220                        let code_actions = project.update(&mut cx, |project, cx| {
4221                            log::info!(
4222                                "Guest {}: requesting code actions for buffer {:?}",
4223                                guest_id,
4224                                buffer.read(cx).file().unwrap().full_path(cx)
4225                            );
4226                            let range =
4227                                buffer.read(cx).random_byte_range(0, &mut *rng.borrow_mut());
4228                            project.code_actions(&buffer, range, cx)
4229                        });
4230                        let code_actions = cx.background().spawn(async move {
4231                            code_actions.await.expect("code actions request failed");
4232                        });
4233                        if rng.borrow_mut().gen_bool(0.3) {
4234                            log::info!("Guest {}: detaching code actions request", guest_id);
4235                            code_actions.detach();
4236                        } else {
4237                            code_actions.await;
4238                        }
4239                    }
4240                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4241                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4242                            log::info!(
4243                                "Guest {}: saving buffer {:?}",
4244                                guest_id,
4245                                buffer.file().unwrap().full_path(cx)
4246                            );
4247                            (buffer.version(), buffer.save(cx))
4248                        });
4249                        let save = cx.spawn(|cx| async move {
4250                            let (saved_version, _) = save.await.expect("save request failed");
4251                            buffer.read_with(&cx, |buffer, _| {
4252                                assert!(buffer.version().observed_all(&saved_version));
4253                                assert!(saved_version.observed_all(&requested_version));
4254                            });
4255                        });
4256                        if rng.borrow_mut().gen_bool(0.3) {
4257                            log::info!("Guest {}: detaching save request", guest_id);
4258                            save.detach();
4259                        } else {
4260                            save.await;
4261                        }
4262                    }
4263                    _ => {
4264                        buffer.update(&mut cx, |buffer, cx| {
4265                            log::info!(
4266                                "Guest {}: updating buffer {:?}",
4267                                guest_id,
4268                                buffer.file().unwrap().full_path(cx)
4269                            );
4270                            buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4271                        });
4272                    }
4273                }
4274                cx.background().simulate_random_delay().await;
4275            }
4276
4277            self.project = Some(project);
4278            (self, cx)
4279        }
4280    }
4281
4282    impl Executor for Arc<gpui::executor::Background> {
4283        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4284            self.spawn(future).detach();
4285        }
4286    }
4287
4288    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4289        channel
4290            .messages()
4291            .cursor::<()>()
4292            .map(|m| {
4293                (
4294                    m.sender.github_login.clone(),
4295                    m.body.clone(),
4296                    m.is_pending(),
4297                )
4298            })
4299            .collect()
4300    }
4301
4302    struct EmptyView;
4303
4304    impl gpui::Entity for EmptyView {
4305        type Event = ();
4306    }
4307
4308    impl gpui::View for EmptyView {
4309        fn ui_name() -> &'static str {
4310            "empty view"
4311        }
4312
4313        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4314            gpui::Element::boxed(gpui::elements::Empty)
4315        }
4316    }
4317}