rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44pub trait Executor {
  45    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  46}
  47
  48pub struct RealExecutor;
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_request_handler(Server::ping)
  69            .add_request_handler(Server::register_project)
  70            .add_message_handler(Server::unregister_project)
  71            .add_request_handler(Server::share_project)
  72            .add_message_handler(Server::unshare_project)
  73            .add_request_handler(Server::join_project)
  74            .add_message_handler(Server::leave_project)
  75            .add_request_handler(Server::register_worktree)
  76            .add_message_handler(Server::unregister_worktree)
  77            .add_request_handler(Server::share_worktree)
  78            .add_message_handler(Server::update_worktree)
  79            .add_message_handler(Server::update_diagnostic_summary)
  80            .add_message_handler(Server::disk_based_diagnostics_updating)
  81            .add_message_handler(Server::disk_based_diagnostics_updated)
  82            .add_request_handler(Server::get_definition)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::get_channels)
  96            .add_request_handler(Server::get_users)
  97            .add_request_handler(Server::join_channel)
  98            .add_message_handler(Server::leave_channel)
  99            .add_request_handler(Server::send_channel_message)
 100            .add_request_handler(Server::get_channel_messages);
 101
 102        Arc::new(server)
 103    }
 104
 105    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 106    where
 107        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 108        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 109        M: EnvelopedMessage,
 110    {
 111        let prev_handler = self.handlers.insert(
 112            TypeId::of::<M>(),
 113            Box::new(move |server, envelope| {
 114                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 115                (handler)(server, *envelope).boxed()
 116            }),
 117        );
 118        if prev_handler.is_some() {
 119            panic!("registered a handler for the same message twice");
 120        }
 121        self
 122    }
 123
 124    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 128        M: RequestMessage,
 129    {
 130        self.add_message_handler(move |server, envelope| {
 131            let receipt = envelope.receipt();
 132            let response = (handler)(server.clone(), envelope);
 133            async move {
 134                match response.await {
 135                    Ok(response) => {
 136                        server.peer.respond(receipt, response)?;
 137                        Ok(())
 138                    }
 139                    Err(error) => {
 140                        server.peer.respond_with_error(
 141                            receipt,
 142                            proto::Error {
 143                                message: error.to_string(),
 144                            },
 145                        )?;
 146                        Err(error)
 147                    }
 148                }
 149            }
 150        })
 151    }
 152
 153    pub fn handle_connection<E: Executor>(
 154        self: &Arc<Self>,
 155        connection: Connection,
 156        addr: String,
 157        user_id: UserId,
 158        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 159        executor: E,
 160    ) -> impl Future<Output = ()> {
 161        let mut this = self.clone();
 162        async move {
 163            let (connection_id, handle_io, mut incoming_rx) =
 164                this.peer.add_connection(connection).await;
 165
 166            if let Some(send_connection_id) = send_connection_id.as_mut() {
 167                let _ = send_connection_id.send(connection_id).await;
 168            }
 169
 170            this.state_mut().add_connection(connection_id, user_id);
 171            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 172                log::error!("error updating contacts for {:?}: {}", user_id, err);
 173            }
 174
 175            let handle_io = handle_io.fuse();
 176            futures::pin_mut!(handle_io);
 177            loop {
 178                let next_message = incoming_rx.next().fuse();
 179                futures::pin_mut!(next_message);
 180                futures::select_biased! {
 181                    result = handle_io => {
 182                        if let Err(err) = result {
 183                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 184                        }
 185                        break;
 186                    }
 187                    message = next_message => {
 188                        if let Some(message) = message {
 189                            let start_time = Instant::now();
 190                            let type_name = message.payload_type_name();
 191                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 192                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 193                                let handle_message = (handler)(this.clone(), message);
 194                                let notifications = this.notifications.clone();
 195                                executor.spawn_detached(async move {
 196                                    if let Err(err) = handle_message.await {
 197                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 198                                    } else {
 199                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 200                                    }
 201                                    if let Some(mut notifications) = notifications {
 202                                        let _ = notifications.send(()).await;
 203                                    }
 204                                });
 205                            } else {
 206                                log::warn!("unhandled message: {}", type_name);
 207                            }
 208                        } else {
 209                            log::info!("rpc connection closed {:?}", addr);
 210                            break;
 211                        }
 212                    }
 213                }
 214            }
 215
 216            if let Err(err) = this.sign_out(connection_id).await {
 217                log::error!("error signing out connection {:?} - {:?}", addr, err);
 218            }
 219        }
 220    }
 221
 222    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 223        self.peer.disconnect(connection_id);
 224        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 225
 226        for (project_id, project) in removed_connection.hosted_projects {
 227            if let Some(share) = project.share {
 228                broadcast(
 229                    connection_id,
 230                    share.guests.keys().copied().collect(),
 231                    |conn_id| {
 232                        self.peer
 233                            .send(conn_id, proto::UnshareProject { project_id })
 234                    },
 235                )?;
 236            }
 237        }
 238
 239        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 240            broadcast(connection_id, peer_ids, |conn_id| {
 241                self.peer.send(
 242                    conn_id,
 243                    proto::RemoveProjectCollaborator {
 244                        project_id,
 245                        peer_id: connection_id.0,
 246                    },
 247                )
 248            })?;
 249        }
 250
 251        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 252        Ok(())
 253    }
 254
 255    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 256        Ok(proto::Ack {})
 257    }
 258
 259    async fn register_project(
 260        mut self: Arc<Server>,
 261        request: TypedEnvelope<proto::RegisterProject>,
 262    ) -> tide::Result<proto::RegisterProjectResponse> {
 263        let project_id = {
 264            let mut state = self.state_mut();
 265            let user_id = state.user_id_for_connection(request.sender_id)?;
 266            state.register_project(request.sender_id, user_id)
 267        };
 268        Ok(proto::RegisterProjectResponse { project_id })
 269    }
 270
 271    async fn unregister_project(
 272        mut self: Arc<Server>,
 273        request: TypedEnvelope<proto::UnregisterProject>,
 274    ) -> tide::Result<()> {
 275        let project = self
 276            .state_mut()
 277            .unregister_project(request.payload.project_id, request.sender_id)?;
 278        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 279        Ok(())
 280    }
 281
 282    async fn share_project(
 283        mut self: Arc<Server>,
 284        request: TypedEnvelope<proto::ShareProject>,
 285    ) -> tide::Result<proto::Ack> {
 286        self.state_mut()
 287            .share_project(request.payload.project_id, request.sender_id);
 288        Ok(proto::Ack {})
 289    }
 290
 291    async fn unshare_project(
 292        mut self: Arc<Server>,
 293        request: TypedEnvelope<proto::UnshareProject>,
 294    ) -> tide::Result<()> {
 295        let project_id = request.payload.project_id;
 296        let project = self
 297            .state_mut()
 298            .unshare_project(project_id, request.sender_id)?;
 299
 300        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 301            self.peer
 302                .send(conn_id, proto::UnshareProject { project_id })
 303        })?;
 304        self.update_contacts_for_users(&project.authorized_user_ids)?;
 305        Ok(())
 306    }
 307
 308    async fn join_project(
 309        mut self: Arc<Server>,
 310        request: TypedEnvelope<proto::JoinProject>,
 311    ) -> tide::Result<proto::JoinProjectResponse> {
 312        let project_id = request.payload.project_id;
 313
 314        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 315        let (response, connection_ids, contact_user_ids) = self
 316            .state_mut()
 317            .join_project(request.sender_id, user_id, project_id)
 318            .and_then(|joined| {
 319                let share = joined.project.share()?;
 320                let peer_count = share.guests.len();
 321                let mut collaborators = Vec::with_capacity(peer_count);
 322                collaborators.push(proto::Collaborator {
 323                    peer_id: joined.project.host_connection_id.0,
 324                    replica_id: 0,
 325                    user_id: joined.project.host_user_id.to_proto(),
 326                });
 327                let worktrees = joined
 328                    .project
 329                    .worktrees
 330                    .iter()
 331                    .filter_map(|(id, worktree)| {
 332                        worktree.share.as_ref().map(|share| proto::Worktree {
 333                            id: *id,
 334                            root_name: worktree.root_name.clone(),
 335                            entries: share.entries.values().cloned().collect(),
 336                            diagnostic_summaries: share
 337                                .diagnostic_summaries
 338                                .values()
 339                                .cloned()
 340                                .collect(),
 341                            weak: worktree.weak,
 342                            next_update_id: share.next_update_id as u64,
 343                        })
 344                    })
 345                    .collect();
 346                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 347                    if *peer_conn_id != request.sender_id {
 348                        collaborators.push(proto::Collaborator {
 349                            peer_id: peer_conn_id.0,
 350                            replica_id: *peer_replica_id as u32,
 351                            user_id: peer_user_id.to_proto(),
 352                        });
 353                    }
 354                }
 355                let response = proto::JoinProjectResponse {
 356                    worktrees,
 357                    replica_id: joined.replica_id as u32,
 358                    collaborators,
 359                };
 360                let connection_ids = joined.project.connection_ids();
 361                let contact_user_ids = joined.project.authorized_user_ids();
 362                Ok((response, connection_ids, contact_user_ids))
 363            })?;
 364
 365        broadcast(request.sender_id, connection_ids, |conn_id| {
 366            self.peer.send(
 367                conn_id,
 368                proto::AddProjectCollaborator {
 369                    project_id,
 370                    collaborator: Some(proto::Collaborator {
 371                        peer_id: request.sender_id.0,
 372                        replica_id: response.replica_id,
 373                        user_id: user_id.to_proto(),
 374                    }),
 375                },
 376            )
 377        })?;
 378        self.update_contacts_for_users(&contact_user_ids)?;
 379        Ok(response)
 380    }
 381
 382    async fn leave_project(
 383        mut self: Arc<Server>,
 384        request: TypedEnvelope<proto::LeaveProject>,
 385    ) -> tide::Result<()> {
 386        let sender_id = request.sender_id;
 387        let project_id = request.payload.project_id;
 388        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 389
 390        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 391            self.peer.send(
 392                conn_id,
 393                proto::RemoveProjectCollaborator {
 394                    project_id,
 395                    peer_id: sender_id.0,
 396                },
 397            )
 398        })?;
 399        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 400
 401        Ok(())
 402    }
 403
 404    async fn register_worktree(
 405        mut self: Arc<Server>,
 406        request: TypedEnvelope<proto::RegisterWorktree>,
 407    ) -> tide::Result<proto::Ack> {
 408        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 409
 410        let mut contact_user_ids = HashSet::default();
 411        contact_user_ids.insert(host_user_id);
 412        for github_login in request.payload.authorized_logins {
 413            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 414            contact_user_ids.insert(contact_user_id);
 415        }
 416
 417        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 418        self.state_mut().register_worktree(
 419            request.payload.project_id,
 420            request.payload.worktree_id,
 421            request.sender_id,
 422            Worktree {
 423                authorized_user_ids: contact_user_ids.clone(),
 424                root_name: request.payload.root_name,
 425                share: None,
 426                weak: false,
 427            },
 428        )?;
 429        self.update_contacts_for_users(&contact_user_ids)?;
 430        Ok(proto::Ack {})
 431    }
 432
 433    async fn unregister_worktree(
 434        mut self: Arc<Server>,
 435        request: TypedEnvelope<proto::UnregisterWorktree>,
 436    ) -> tide::Result<()> {
 437        let project_id = request.payload.project_id;
 438        let worktree_id = request.payload.worktree_id;
 439        let (worktree, guest_connection_ids) =
 440            self.state_mut()
 441                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 442        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 443            self.peer.send(
 444                conn_id,
 445                proto::UnregisterWorktree {
 446                    project_id,
 447                    worktree_id,
 448                },
 449            )
 450        })?;
 451        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 452        Ok(())
 453    }
 454
 455    async fn share_worktree(
 456        mut self: Arc<Server>,
 457        mut request: TypedEnvelope<proto::ShareWorktree>,
 458    ) -> tide::Result<proto::Ack> {
 459        let worktree = request
 460            .payload
 461            .worktree
 462            .as_mut()
 463            .ok_or_else(|| anyhow!("missing worktree"))?;
 464        let entries = worktree
 465            .entries
 466            .iter()
 467            .map(|entry| (entry.id, entry.clone()))
 468            .collect();
 469        let diagnostic_summaries = worktree
 470            .diagnostic_summaries
 471            .iter()
 472            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 473            .collect();
 474
 475        let shared_worktree = self.state_mut().share_worktree(
 476            request.payload.project_id,
 477            worktree.id,
 478            request.sender_id,
 479            entries,
 480            diagnostic_summaries,
 481            worktree.next_update_id,
 482        )?;
 483
 484        broadcast(
 485            request.sender_id,
 486            shared_worktree.connection_ids,
 487            |connection_id| {
 488                self.peer
 489                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 490            },
 491        )?;
 492        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 493
 494        Ok(proto::Ack {})
 495    }
 496
 497    async fn update_worktree(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateWorktree>,
 500    ) -> tide::Result<()> {
 501        let connection_ids = self.state_mut().update_worktree(
 502            request.sender_id,
 503            request.payload.project_id,
 504            request.payload.worktree_id,
 505            &request.payload.removed_entries,
 506            &request.payload.updated_entries,
 507        )?;
 508
 509        broadcast(request.sender_id, connection_ids, |connection_id| {
 510            self.peer
 511                .forward_send(request.sender_id, connection_id, request.payload.clone())
 512        })?;
 513
 514        Ok(())
 515    }
 516
 517    async fn update_diagnostic_summary(
 518        mut self: Arc<Server>,
 519        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 520    ) -> tide::Result<()> {
 521        let summary = request
 522            .payload
 523            .summary
 524            .clone()
 525            .ok_or_else(|| anyhow!("invalid summary"))?;
 526        let receiver_ids = self.state_mut().update_diagnostic_summary(
 527            request.payload.project_id,
 528            request.payload.worktree_id,
 529            request.sender_id,
 530            summary,
 531        )?;
 532
 533        broadcast(request.sender_id, receiver_ids, |connection_id| {
 534            self.peer
 535                .forward_send(request.sender_id, connection_id, request.payload.clone())
 536        })?;
 537        Ok(())
 538    }
 539
 540    async fn disk_based_diagnostics_updating(
 541        self: Arc<Server>,
 542        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 543    ) -> tide::Result<()> {
 544        let receiver_ids = self
 545            .state()
 546            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 547        broadcast(request.sender_id, receiver_ids, |connection_id| {
 548            self.peer
 549                .forward_send(request.sender_id, connection_id, request.payload.clone())
 550        })?;
 551        Ok(())
 552    }
 553
 554    async fn disk_based_diagnostics_updated(
 555        self: Arc<Server>,
 556        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 557    ) -> tide::Result<()> {
 558        let receiver_ids = self
 559            .state()
 560            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 561        broadcast(request.sender_id, receiver_ids, |connection_id| {
 562            self.peer
 563                .forward_send(request.sender_id, connection_id, request.payload.clone())
 564        })?;
 565        Ok(())
 566    }
 567
 568    async fn get_definition(
 569        self: Arc<Server>,
 570        request: TypedEnvelope<proto::GetDefinition>,
 571    ) -> tide::Result<proto::GetDefinitionResponse> {
 572        let host_connection_id = self
 573            .state()
 574            .read_project(request.payload.project_id, request.sender_id)?
 575            .host_connection_id;
 576        Ok(self
 577            .peer
 578            .forward_request(request.sender_id, host_connection_id, request.payload)
 579            .await?)
 580    }
 581
 582    async fn open_buffer(
 583        self: Arc<Server>,
 584        request: TypedEnvelope<proto::OpenBuffer>,
 585    ) -> tide::Result<proto::OpenBufferResponse> {
 586        let host_connection_id = self
 587            .state()
 588            .read_project(request.payload.project_id, request.sender_id)?
 589            .host_connection_id;
 590        Ok(self
 591            .peer
 592            .forward_request(request.sender_id, host_connection_id, request.payload)
 593            .await?)
 594    }
 595
 596    async fn close_buffer(
 597        self: Arc<Server>,
 598        request: TypedEnvelope<proto::CloseBuffer>,
 599    ) -> tide::Result<()> {
 600        let host_connection_id = self
 601            .state()
 602            .read_project(request.payload.project_id, request.sender_id)?
 603            .host_connection_id;
 604        self.peer
 605            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 606        Ok(())
 607    }
 608
 609    async fn save_buffer(
 610        self: Arc<Server>,
 611        request: TypedEnvelope<proto::SaveBuffer>,
 612    ) -> tide::Result<proto::BufferSaved> {
 613        let host;
 614        let mut guests;
 615        {
 616            let state = self.state();
 617            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 618            host = project.host_connection_id;
 619            guests = project.guest_connection_ids()
 620        }
 621
 622        let response = self
 623            .peer
 624            .forward_request(request.sender_id, host, request.payload.clone())
 625            .await?;
 626
 627        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 628        broadcast(host, guests, |conn_id| {
 629            self.peer.forward_send(host, conn_id, response.clone())
 630        })?;
 631
 632        Ok(response)
 633    }
 634
 635    async fn format_buffers(
 636        self: Arc<Server>,
 637        request: TypedEnvelope<proto::FormatBuffers>,
 638    ) -> tide::Result<proto::FormatBuffersResponse> {
 639        let host = self
 640            .state()
 641            .read_project(request.payload.project_id, request.sender_id)?
 642            .host_connection_id;
 643        Ok(self
 644            .peer
 645            .forward_request(request.sender_id, host, request.payload.clone())
 646            .await?)
 647    }
 648
 649    async fn get_completions(
 650        self: Arc<Server>,
 651        request: TypedEnvelope<proto::GetCompletions>,
 652    ) -> tide::Result<proto::GetCompletionsResponse> {
 653        let host = self
 654            .state()
 655            .read_project(request.payload.project_id, request.sender_id)?
 656            .host_connection_id;
 657        Ok(self
 658            .peer
 659            .forward_request(request.sender_id, host, request.payload.clone())
 660            .await?)
 661    }
 662
 663    async fn apply_additional_edits_for_completion(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 666    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 667        let host = self
 668            .state()
 669            .read_project(request.payload.project_id, request.sender_id)?
 670            .host_connection_id;
 671        Ok(self
 672            .peer
 673            .forward_request(request.sender_id, host, request.payload.clone())
 674            .await?)
 675    }
 676
 677    async fn get_code_actions(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::GetCodeActions>,
 680    ) -> tide::Result<proto::GetCodeActionsResponse> {
 681        let host = self
 682            .state()
 683            .read_project(request.payload.project_id, request.sender_id)?
 684            .host_connection_id;
 685        Ok(self
 686            .peer
 687            .forward_request(request.sender_id, host, request.payload.clone())
 688            .await?)
 689    }
 690
 691    async fn apply_code_action(
 692        self: Arc<Server>,
 693        request: TypedEnvelope<proto::ApplyCodeAction>,
 694    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 695        let host = self
 696            .state()
 697            .read_project(request.payload.project_id, request.sender_id)?
 698            .host_connection_id;
 699        Ok(self
 700            .peer
 701            .forward_request(request.sender_id, host, request.payload.clone())
 702            .await?)
 703    }
 704
 705    async fn update_buffer(
 706        self: Arc<Server>,
 707        request: TypedEnvelope<proto::UpdateBuffer>,
 708    ) -> tide::Result<proto::Ack> {
 709        let receiver_ids = self
 710            .state()
 711            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 712        broadcast(request.sender_id, receiver_ids, |connection_id| {
 713            self.peer
 714                .forward_send(request.sender_id, connection_id, request.payload.clone())
 715        })?;
 716        Ok(proto::Ack {})
 717    }
 718
 719    async fn update_buffer_file(
 720        self: Arc<Server>,
 721        request: TypedEnvelope<proto::UpdateBufferFile>,
 722    ) -> tide::Result<()> {
 723        let receiver_ids = self
 724            .state()
 725            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 726        broadcast(request.sender_id, receiver_ids, |connection_id| {
 727            self.peer
 728                .forward_send(request.sender_id, connection_id, request.payload.clone())
 729        })?;
 730        Ok(())
 731    }
 732
 733    async fn buffer_reloaded(
 734        self: Arc<Server>,
 735        request: TypedEnvelope<proto::BufferReloaded>,
 736    ) -> tide::Result<()> {
 737        let receiver_ids = self
 738            .state()
 739            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 740        broadcast(request.sender_id, receiver_ids, |connection_id| {
 741            self.peer
 742                .forward_send(request.sender_id, connection_id, request.payload.clone())
 743        })?;
 744        Ok(())
 745    }
 746
 747    async fn buffer_saved(
 748        self: Arc<Server>,
 749        request: TypedEnvelope<proto::BufferSaved>,
 750    ) -> tide::Result<()> {
 751        let receiver_ids = self
 752            .state()
 753            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 754        broadcast(request.sender_id, receiver_ids, |connection_id| {
 755            self.peer
 756                .forward_send(request.sender_id, connection_id, request.payload.clone())
 757        })?;
 758        Ok(())
 759    }
 760
 761    async fn get_channels(
 762        self: Arc<Server>,
 763        request: TypedEnvelope<proto::GetChannels>,
 764    ) -> tide::Result<proto::GetChannelsResponse> {
 765        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 766        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 767        Ok(proto::GetChannelsResponse {
 768            channels: channels
 769                .into_iter()
 770                .map(|chan| proto::Channel {
 771                    id: chan.id.to_proto(),
 772                    name: chan.name,
 773                })
 774                .collect(),
 775        })
 776    }
 777
 778    async fn get_users(
 779        self: Arc<Server>,
 780        request: TypedEnvelope<proto::GetUsers>,
 781    ) -> tide::Result<proto::GetUsersResponse> {
 782        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 783        let users = self
 784            .app_state
 785            .db
 786            .get_users_by_ids(user_ids)
 787            .await?
 788            .into_iter()
 789            .map(|user| proto::User {
 790                id: user.id.to_proto(),
 791                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 792                github_login: user.github_login,
 793            })
 794            .collect();
 795        Ok(proto::GetUsersResponse { users })
 796    }
 797
 798    fn update_contacts_for_users<'a>(
 799        self: &Arc<Server>,
 800        user_ids: impl IntoIterator<Item = &'a UserId>,
 801    ) -> anyhow::Result<()> {
 802        let mut result = Ok(());
 803        let state = self.state();
 804        for user_id in user_ids {
 805            let contacts = state.contacts_for_user(*user_id);
 806            for connection_id in state.connection_ids_for_user(*user_id) {
 807                if let Err(error) = self.peer.send(
 808                    connection_id,
 809                    proto::UpdateContacts {
 810                        contacts: contacts.clone(),
 811                    },
 812                ) {
 813                    result = Err(error);
 814                }
 815            }
 816        }
 817        result
 818    }
 819
 820    async fn join_channel(
 821        mut self: Arc<Self>,
 822        request: TypedEnvelope<proto::JoinChannel>,
 823    ) -> tide::Result<proto::JoinChannelResponse> {
 824        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 825        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 826        if !self
 827            .app_state
 828            .db
 829            .can_user_access_channel(user_id, channel_id)
 830            .await?
 831        {
 832            Err(anyhow!("access denied"))?;
 833        }
 834
 835        self.state_mut().join_channel(request.sender_id, channel_id);
 836        let messages = self
 837            .app_state
 838            .db
 839            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 840            .await?
 841            .into_iter()
 842            .map(|msg| proto::ChannelMessage {
 843                id: msg.id.to_proto(),
 844                body: msg.body,
 845                timestamp: msg.sent_at.unix_timestamp() as u64,
 846                sender_id: msg.sender_id.to_proto(),
 847                nonce: Some(msg.nonce.as_u128().into()),
 848            })
 849            .collect::<Vec<_>>();
 850        Ok(proto::JoinChannelResponse {
 851            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 852            messages,
 853        })
 854    }
 855
 856    async fn leave_channel(
 857        mut self: Arc<Self>,
 858        request: TypedEnvelope<proto::LeaveChannel>,
 859    ) -> tide::Result<()> {
 860        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 861        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 862        if !self
 863            .app_state
 864            .db
 865            .can_user_access_channel(user_id, channel_id)
 866            .await?
 867        {
 868            Err(anyhow!("access denied"))?;
 869        }
 870
 871        self.state_mut()
 872            .leave_channel(request.sender_id, channel_id);
 873
 874        Ok(())
 875    }
 876
 877    async fn send_channel_message(
 878        self: Arc<Self>,
 879        request: TypedEnvelope<proto::SendChannelMessage>,
 880    ) -> tide::Result<proto::SendChannelMessageResponse> {
 881        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 882        let user_id;
 883        let connection_ids;
 884        {
 885            let state = self.state();
 886            user_id = state.user_id_for_connection(request.sender_id)?;
 887            connection_ids = state.channel_connection_ids(channel_id)?;
 888        }
 889
 890        // Validate the message body.
 891        let body = request.payload.body.trim().to_string();
 892        if body.len() > MAX_MESSAGE_LEN {
 893            return Err(anyhow!("message is too long"))?;
 894        }
 895        if body.is_empty() {
 896            return Err(anyhow!("message can't be blank"))?;
 897        }
 898
 899        let timestamp = OffsetDateTime::now_utc();
 900        let nonce = request
 901            .payload
 902            .nonce
 903            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 904
 905        let message_id = self
 906            .app_state
 907            .db
 908            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 909            .await?
 910            .to_proto();
 911        let message = proto::ChannelMessage {
 912            sender_id: user_id.to_proto(),
 913            id: message_id,
 914            body,
 915            timestamp: timestamp.unix_timestamp() as u64,
 916            nonce: Some(nonce),
 917        };
 918        broadcast(request.sender_id, connection_ids, |conn_id| {
 919            self.peer.send(
 920                conn_id,
 921                proto::ChannelMessageSent {
 922                    channel_id: channel_id.to_proto(),
 923                    message: Some(message.clone()),
 924                },
 925            )
 926        })?;
 927        Ok(proto::SendChannelMessageResponse {
 928            message: Some(message),
 929        })
 930    }
 931
 932    async fn get_channel_messages(
 933        self: Arc<Self>,
 934        request: TypedEnvelope<proto::GetChannelMessages>,
 935    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 936        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 937        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 938        if !self
 939            .app_state
 940            .db
 941            .can_user_access_channel(user_id, channel_id)
 942            .await?
 943        {
 944            Err(anyhow!("access denied"))?;
 945        }
 946
 947        let messages = self
 948            .app_state
 949            .db
 950            .get_channel_messages(
 951                channel_id,
 952                MESSAGE_COUNT_PER_PAGE,
 953                Some(MessageId::from_proto(request.payload.before_message_id)),
 954            )
 955            .await?
 956            .into_iter()
 957            .map(|msg| proto::ChannelMessage {
 958                id: msg.id.to_proto(),
 959                body: msg.body,
 960                timestamp: msg.sent_at.unix_timestamp() as u64,
 961                sender_id: msg.sender_id.to_proto(),
 962                nonce: Some(msg.nonce.as_u128().into()),
 963            })
 964            .collect::<Vec<_>>();
 965
 966        Ok(proto::GetChannelMessagesResponse {
 967            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 968            messages,
 969        })
 970    }
 971
 972    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 973        self.store.read()
 974    }
 975
 976    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 977        self.store.write()
 978    }
 979}
 980
 981impl Executor for RealExecutor {
 982    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 983        task::spawn(future);
 984    }
 985}
 986
 987fn broadcast<F>(
 988    sender_id: ConnectionId,
 989    receiver_ids: Vec<ConnectionId>,
 990    mut f: F,
 991) -> anyhow::Result<()>
 992where
 993    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 994{
 995    let mut result = Ok(());
 996    for receiver_id in receiver_ids {
 997        if receiver_id != sender_id {
 998            if let Err(error) = f(receiver_id) {
 999                if result.is_ok() {
1000                    result = Err(error);
1001                }
1002            }
1003        }
1004    }
1005    result
1006}
1007
1008pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1009    let server = Server::new(app.state().clone(), rpc.clone(), None);
1010    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1011        let server = server.clone();
1012        async move {
1013            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1014
1015            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1016            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1017            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1018            let client_protocol_version: Option<u32> = request
1019                .header("X-Zed-Protocol-Version")
1020                .and_then(|v| v.as_str().parse().ok());
1021
1022            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1023                return Ok(Response::new(StatusCode::UpgradeRequired));
1024            }
1025
1026            let header = match request.header("Sec-Websocket-Key") {
1027                Some(h) => h.as_str(),
1028                None => return Err(anyhow!("expected sec-websocket-key"))?,
1029            };
1030
1031            let user_id = process_auth_header(&request).await?;
1032
1033            let mut response = Response::new(StatusCode::SwitchingProtocols);
1034            response.insert_header(UPGRADE, "websocket");
1035            response.insert_header(CONNECTION, "Upgrade");
1036            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1037            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1038            response.insert_header("Sec-Websocket-Version", "13");
1039
1040            let http_res: &mut tide::http::Response = response.as_mut();
1041            let upgrade_receiver = http_res.recv_upgrade().await;
1042            let addr = request.remote().unwrap_or("unknown").to_string();
1043            task::spawn(async move {
1044                if let Some(stream) = upgrade_receiver.await {
1045                    server
1046                        .handle_connection(
1047                            Connection::new(
1048                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1049                            ),
1050                            addr,
1051                            user_id,
1052                            None,
1053                            RealExecutor,
1054                        )
1055                        .await;
1056                }
1057            });
1058
1059            Ok(response)
1060        }
1061    });
1062}
1063
1064fn header_contains_ignore_case<T>(
1065    request: &tide::Request<T>,
1066    header_name: HeaderName,
1067    value: &str,
1068) -> bool {
1069    request
1070        .header(header_name)
1071        .map(|h| {
1072            h.as_str()
1073                .split(',')
1074                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1075        })
1076        .unwrap_or(false)
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use super::*;
1082    use crate::{
1083        auth,
1084        db::{tests::TestDb, UserId},
1085        github, AppState, Config,
1086    };
1087    use ::rpc::Peer;
1088    use gpui::{executor, ModelHandle, TestAppContext};
1089    use parking_lot::Mutex;
1090    use postage::{mpsc, watch};
1091    use rand::prelude::*;
1092    use rpc::PeerId;
1093    use serde_json::json;
1094    use sqlx::types::time::OffsetDateTime;
1095    use std::{
1096        cell::{Cell, RefCell},
1097        env,
1098        ops::Deref,
1099        path::Path,
1100        rc::Rc,
1101        sync::{
1102            atomic::{AtomicBool, Ordering::SeqCst},
1103            Arc,
1104        },
1105        time::Duration,
1106    };
1107    use zed::{
1108        client::{
1109            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1110            EstablishConnectionError, UserStore,
1111        },
1112        editor::{
1113            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1114            Redo, ToggleCodeActions, Undo,
1115        },
1116        fs::{FakeFs, Fs as _},
1117        language::{
1118            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1119            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1120        },
1121        lsp,
1122        project::{DiagnosticSummary, Project, ProjectPath},
1123        workspace::{Workspace, WorkspaceParams},
1124    };
1125
1126    #[cfg(test)]
1127    #[ctor::ctor]
1128    fn init_logger() {
1129        if std::env::var("RUST_LOG").is_ok() {
1130            env_logger::init();
1131        }
1132    }
1133
1134    #[gpui::test(iterations = 10)]
1135    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1136        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1137        let lang_registry = Arc::new(LanguageRegistry::new());
1138        let fs = Arc::new(FakeFs::new(cx_a.background()));
1139        cx_a.foreground().forbid_parking();
1140
1141        // Connect to a server as 2 clients.
1142        let mut server = TestServer::start(cx_a.foreground()).await;
1143        let client_a = server.create_client(&mut cx_a, "user_a").await;
1144        let client_b = server.create_client(&mut cx_b, "user_b").await;
1145
1146        // Share a project as client A
1147        fs.insert_tree(
1148            "/a",
1149            json!({
1150                ".zed.toml": r#"collaborators = ["user_b"]"#,
1151                "a.txt": "a-contents",
1152                "b.txt": "b-contents",
1153            }),
1154        )
1155        .await;
1156        let project_a = cx_a.update(|cx| {
1157            Project::local(
1158                client_a.clone(),
1159                client_a.user_store.clone(),
1160                lang_registry.clone(),
1161                fs.clone(),
1162                cx,
1163            )
1164        });
1165        let (worktree_a, _) = project_a
1166            .update(&mut cx_a, |p, cx| {
1167                p.find_or_create_local_worktree("/a", false, cx)
1168            })
1169            .await
1170            .unwrap();
1171        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1172        worktree_a
1173            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1174            .await;
1175        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1176        project_a
1177            .update(&mut cx_a, |p, cx| p.share(cx))
1178            .await
1179            .unwrap();
1180
1181        // Join that project as client B
1182        let project_b = Project::remote(
1183            project_id,
1184            client_b.clone(),
1185            client_b.user_store.clone(),
1186            lang_registry.clone(),
1187            fs.clone(),
1188            &mut cx_b.to_async(),
1189        )
1190        .await
1191        .unwrap();
1192
1193        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1194            assert_eq!(
1195                project
1196                    .collaborators()
1197                    .get(&client_a.peer_id)
1198                    .unwrap()
1199                    .user
1200                    .github_login,
1201                "user_a"
1202            );
1203            project.replica_id()
1204        });
1205        project_a
1206            .condition(&cx_a, |tree, _| {
1207                tree.collaborators()
1208                    .get(&client_b.peer_id)
1209                    .map_or(false, |collaborator| {
1210                        collaborator.replica_id == replica_id_b
1211                            && collaborator.user.github_login == "user_b"
1212                    })
1213            })
1214            .await;
1215
1216        // Open the same file as client B and client A.
1217        let buffer_b = project_b
1218            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1219            .await
1220            .unwrap();
1221        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1222        buffer_b.read_with(&cx_b, |buf, cx| {
1223            assert_eq!(buf.read(cx).text(), "b-contents")
1224        });
1225        project_a.read_with(&cx_a, |project, cx| {
1226            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1227        });
1228        let buffer_a = project_a
1229            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1230            .await
1231            .unwrap();
1232
1233        let editor_b = cx_b.add_view(window_b, |cx| {
1234            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1235        });
1236
1237        // TODO
1238        // // Create a selection set as client B and see that selection set as client A.
1239        // buffer_a
1240        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1241        //     .await;
1242
1243        // Edit the buffer as client B and see that edit as client A.
1244        editor_b.update(&mut cx_b, |editor, cx| {
1245            editor.handle_input(&Input("ok, ".into()), cx)
1246        });
1247        buffer_a
1248            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1249            .await;
1250
1251        // TODO
1252        // // Remove the selection set as client B, see those selections disappear as client A.
1253        cx_b.update(move |_| drop(editor_b));
1254        // buffer_a
1255        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1256        //     .await;
1257
1258        // Close the buffer as client A, see that the buffer is closed.
1259        cx_a.update(move |_| drop(buffer_a));
1260        project_a
1261            .condition(&cx_a, |project, cx| {
1262                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1263            })
1264            .await;
1265
1266        // Dropping the client B's project removes client B from client A's collaborators.
1267        cx_b.update(move |_| drop(project_b));
1268        project_a
1269            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1270            .await;
1271    }
1272
1273    #[gpui::test(iterations = 10)]
1274    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1275        let lang_registry = Arc::new(LanguageRegistry::new());
1276        let fs = Arc::new(FakeFs::new(cx_a.background()));
1277        cx_a.foreground().forbid_parking();
1278
1279        // Connect to a server as 2 clients.
1280        let mut server = TestServer::start(cx_a.foreground()).await;
1281        let client_a = server.create_client(&mut cx_a, "user_a").await;
1282        let client_b = server.create_client(&mut cx_b, "user_b").await;
1283
1284        // Share a project as client A
1285        fs.insert_tree(
1286            "/a",
1287            json!({
1288                ".zed.toml": r#"collaborators = ["user_b"]"#,
1289                "a.txt": "a-contents",
1290                "b.txt": "b-contents",
1291            }),
1292        )
1293        .await;
1294        let project_a = cx_a.update(|cx| {
1295            Project::local(
1296                client_a.clone(),
1297                client_a.user_store.clone(),
1298                lang_registry.clone(),
1299                fs.clone(),
1300                cx,
1301            )
1302        });
1303        let (worktree_a, _) = project_a
1304            .update(&mut cx_a, |p, cx| {
1305                p.find_or_create_local_worktree("/a", false, cx)
1306            })
1307            .await
1308            .unwrap();
1309        worktree_a
1310            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1311            .await;
1312        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1313        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1314        project_a
1315            .update(&mut cx_a, |p, cx| p.share(cx))
1316            .await
1317            .unwrap();
1318        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1319
1320        // Join that project as client B
1321        let project_b = Project::remote(
1322            project_id,
1323            client_b.clone(),
1324            client_b.user_store.clone(),
1325            lang_registry.clone(),
1326            fs.clone(),
1327            &mut cx_b.to_async(),
1328        )
1329        .await
1330        .unwrap();
1331        project_b
1332            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1333            .await
1334            .unwrap();
1335
1336        // Unshare the project as client A
1337        project_a
1338            .update(&mut cx_a, |project, cx| project.unshare(cx))
1339            .await
1340            .unwrap();
1341        project_b
1342            .condition(&mut cx_b, |project, _| project.is_read_only())
1343            .await;
1344        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1345        drop(project_b);
1346
1347        // Share the project again and ensure guests can still join.
1348        project_a
1349            .update(&mut cx_a, |project, cx| project.share(cx))
1350            .await
1351            .unwrap();
1352        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1353
1354        let project_c = Project::remote(
1355            project_id,
1356            client_b.clone(),
1357            client_b.user_store.clone(),
1358            lang_registry.clone(),
1359            fs.clone(),
1360            &mut cx_b.to_async(),
1361        )
1362        .await
1363        .unwrap();
1364        project_c
1365            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1366            .await
1367            .unwrap();
1368    }
1369
1370    #[gpui::test(iterations = 10)]
1371    async fn test_propagate_saves_and_fs_changes(
1372        mut cx_a: TestAppContext,
1373        mut cx_b: TestAppContext,
1374        mut cx_c: TestAppContext,
1375    ) {
1376        let lang_registry = Arc::new(LanguageRegistry::new());
1377        let fs = Arc::new(FakeFs::new(cx_a.background()));
1378        cx_a.foreground().forbid_parking();
1379
1380        // Connect to a server as 3 clients.
1381        let mut server = TestServer::start(cx_a.foreground()).await;
1382        let client_a = server.create_client(&mut cx_a, "user_a").await;
1383        let client_b = server.create_client(&mut cx_b, "user_b").await;
1384        let client_c = server.create_client(&mut cx_c, "user_c").await;
1385
1386        // Share a worktree as client A.
1387        fs.insert_tree(
1388            "/a",
1389            json!({
1390                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1391                "file1": "",
1392                "file2": ""
1393            }),
1394        )
1395        .await;
1396        let project_a = cx_a.update(|cx| {
1397            Project::local(
1398                client_a.clone(),
1399                client_a.user_store.clone(),
1400                lang_registry.clone(),
1401                fs.clone(),
1402                cx,
1403            )
1404        });
1405        let (worktree_a, _) = project_a
1406            .update(&mut cx_a, |p, cx| {
1407                p.find_or_create_local_worktree("/a", false, cx)
1408            })
1409            .await
1410            .unwrap();
1411        worktree_a
1412            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1413            .await;
1414        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1415        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1416        project_a
1417            .update(&mut cx_a, |p, cx| p.share(cx))
1418            .await
1419            .unwrap();
1420
1421        // Join that worktree as clients B and C.
1422        let project_b = Project::remote(
1423            project_id,
1424            client_b.clone(),
1425            client_b.user_store.clone(),
1426            lang_registry.clone(),
1427            fs.clone(),
1428            &mut cx_b.to_async(),
1429        )
1430        .await
1431        .unwrap();
1432        let project_c = Project::remote(
1433            project_id,
1434            client_c.clone(),
1435            client_c.user_store.clone(),
1436            lang_registry.clone(),
1437            fs.clone(),
1438            &mut cx_c.to_async(),
1439        )
1440        .await
1441        .unwrap();
1442        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1443        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1444
1445        // Open and edit a buffer as both guests B and C.
1446        let buffer_b = project_b
1447            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1448            .await
1449            .unwrap();
1450        let buffer_c = project_c
1451            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1452            .await
1453            .unwrap();
1454        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1455        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1456
1457        // Open and edit that buffer as the host.
1458        let buffer_a = project_a
1459            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1460            .await
1461            .unwrap();
1462
1463        buffer_a
1464            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1465            .await;
1466        buffer_a.update(&mut cx_a, |buf, cx| {
1467            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1468        });
1469
1470        // Wait for edits to propagate
1471        buffer_a
1472            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1473            .await;
1474        buffer_b
1475            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1476            .await;
1477        buffer_c
1478            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1479            .await;
1480
1481        // Edit the buffer as the host and concurrently save as guest B.
1482        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1483        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1484        save_b.await.unwrap();
1485        assert_eq!(
1486            fs.load("/a/file1".as_ref()).await.unwrap(),
1487            "hi-a, i-am-c, i-am-b, i-am-a"
1488        );
1489        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1490        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1491        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1492
1493        // Make changes on host's file system, see those changes on guest worktrees.
1494        fs.rename(
1495            "/a/file1".as_ref(),
1496            "/a/file1-renamed".as_ref(),
1497            Default::default(),
1498        )
1499        .await
1500        .unwrap();
1501
1502        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1503            .await
1504            .unwrap();
1505        fs.insert_file(Path::new("/a/file4"), "4".into())
1506            .await
1507            .unwrap();
1508
1509        worktree_a
1510            .condition(&cx_a, |tree, _| {
1511                tree.paths()
1512                    .map(|p| p.to_string_lossy())
1513                    .collect::<Vec<_>>()
1514                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1515            })
1516            .await;
1517        worktree_b
1518            .condition(&cx_b, |tree, _| {
1519                tree.paths()
1520                    .map(|p| p.to_string_lossy())
1521                    .collect::<Vec<_>>()
1522                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1523            })
1524            .await;
1525        worktree_c
1526            .condition(&cx_c, |tree, _| {
1527                tree.paths()
1528                    .map(|p| p.to_string_lossy())
1529                    .collect::<Vec<_>>()
1530                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1531            })
1532            .await;
1533
1534        // Ensure buffer files are updated as well.
1535        buffer_a
1536            .condition(&cx_a, |buf, _| {
1537                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1538            })
1539            .await;
1540        buffer_b
1541            .condition(&cx_b, |buf, _| {
1542                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1543            })
1544            .await;
1545        buffer_c
1546            .condition(&cx_c, |buf, _| {
1547                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1548            })
1549            .await;
1550    }
1551
1552    #[gpui::test(iterations = 10)]
1553    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1554        cx_a.foreground().forbid_parking();
1555        let lang_registry = Arc::new(LanguageRegistry::new());
1556        let fs = Arc::new(FakeFs::new(cx_a.background()));
1557
1558        // Connect to a server as 2 clients.
1559        let mut server = TestServer::start(cx_a.foreground()).await;
1560        let client_a = server.create_client(&mut cx_a, "user_a").await;
1561        let client_b = server.create_client(&mut cx_b, "user_b").await;
1562
1563        // Share a project as client A
1564        fs.insert_tree(
1565            "/dir",
1566            json!({
1567                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1568                "a.txt": "a-contents",
1569            }),
1570        )
1571        .await;
1572
1573        let project_a = cx_a.update(|cx| {
1574            Project::local(
1575                client_a.clone(),
1576                client_a.user_store.clone(),
1577                lang_registry.clone(),
1578                fs.clone(),
1579                cx,
1580            )
1581        });
1582        let (worktree_a, _) = project_a
1583            .update(&mut cx_a, |p, cx| {
1584                p.find_or_create_local_worktree("/dir", false, cx)
1585            })
1586            .await
1587            .unwrap();
1588        worktree_a
1589            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1590            .await;
1591        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1592        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1593        project_a
1594            .update(&mut cx_a, |p, cx| p.share(cx))
1595            .await
1596            .unwrap();
1597
1598        // Join that project as client B
1599        let project_b = Project::remote(
1600            project_id,
1601            client_b.clone(),
1602            client_b.user_store.clone(),
1603            lang_registry.clone(),
1604            fs.clone(),
1605            &mut cx_b.to_async(),
1606        )
1607        .await
1608        .unwrap();
1609
1610        // Open a buffer as client B
1611        let buffer_b = project_b
1612            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1613            .await
1614            .unwrap();
1615
1616        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1617        buffer_b.read_with(&cx_b, |buf, _| {
1618            assert!(buf.is_dirty());
1619            assert!(!buf.has_conflict());
1620        });
1621
1622        buffer_b
1623            .update(&mut cx_b, |buf, cx| buf.save(cx))
1624            .await
1625            .unwrap();
1626        buffer_b
1627            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1628            .await;
1629        buffer_b.read_with(&cx_b, |buf, _| {
1630            assert!(!buf.has_conflict());
1631        });
1632
1633        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1634        buffer_b.read_with(&cx_b, |buf, _| {
1635            assert!(buf.is_dirty());
1636            assert!(!buf.has_conflict());
1637        });
1638    }
1639
1640    #[gpui::test(iterations = 10)]
1641    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1642        cx_a.foreground().forbid_parking();
1643        let lang_registry = Arc::new(LanguageRegistry::new());
1644        let fs = Arc::new(FakeFs::new(cx_a.background()));
1645
1646        // Connect to a server as 2 clients.
1647        let mut server = TestServer::start(cx_a.foreground()).await;
1648        let client_a = server.create_client(&mut cx_a, "user_a").await;
1649        let client_b = server.create_client(&mut cx_b, "user_b").await;
1650
1651        // Share a project as client A
1652        fs.insert_tree(
1653            "/dir",
1654            json!({
1655                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1656                "a.txt": "a-contents",
1657            }),
1658        )
1659        .await;
1660
1661        let project_a = cx_a.update(|cx| {
1662            Project::local(
1663                client_a.clone(),
1664                client_a.user_store.clone(),
1665                lang_registry.clone(),
1666                fs.clone(),
1667                cx,
1668            )
1669        });
1670        let (worktree_a, _) = project_a
1671            .update(&mut cx_a, |p, cx| {
1672                p.find_or_create_local_worktree("/dir", false, cx)
1673            })
1674            .await
1675            .unwrap();
1676        worktree_a
1677            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1678            .await;
1679        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1680        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1681        project_a
1682            .update(&mut cx_a, |p, cx| p.share(cx))
1683            .await
1684            .unwrap();
1685
1686        // Join that project as client B
1687        let project_b = Project::remote(
1688            project_id,
1689            client_b.clone(),
1690            client_b.user_store.clone(),
1691            lang_registry.clone(),
1692            fs.clone(),
1693            &mut cx_b.to_async(),
1694        )
1695        .await
1696        .unwrap();
1697        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1698
1699        // Open a buffer as client B
1700        let buffer_b = project_b
1701            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1702            .await
1703            .unwrap();
1704        buffer_b.read_with(&cx_b, |buf, _| {
1705            assert!(!buf.is_dirty());
1706            assert!(!buf.has_conflict());
1707        });
1708
1709        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1710            .await
1711            .unwrap();
1712        buffer_b
1713            .condition(&cx_b, |buf, _| {
1714                buf.text() == "new contents" && !buf.is_dirty()
1715            })
1716            .await;
1717        buffer_b.read_with(&cx_b, |buf, _| {
1718            assert!(!buf.has_conflict());
1719        });
1720    }
1721
1722    #[gpui::test(iterations = 10)]
1723    async fn test_editing_while_guest_opens_buffer(
1724        mut cx_a: TestAppContext,
1725        mut cx_b: TestAppContext,
1726    ) {
1727        cx_a.foreground().forbid_parking();
1728        let lang_registry = Arc::new(LanguageRegistry::new());
1729        let fs = Arc::new(FakeFs::new(cx_a.background()));
1730
1731        // Connect to a server as 2 clients.
1732        let mut server = TestServer::start(cx_a.foreground()).await;
1733        let client_a = server.create_client(&mut cx_a, "user_a").await;
1734        let client_b = server.create_client(&mut cx_b, "user_b").await;
1735
1736        // Share a project as client A
1737        fs.insert_tree(
1738            "/dir",
1739            json!({
1740                ".zed.toml": r#"collaborators = ["user_b"]"#,
1741                "a.txt": "a-contents",
1742            }),
1743        )
1744        .await;
1745        let project_a = cx_a.update(|cx| {
1746            Project::local(
1747                client_a.clone(),
1748                client_a.user_store.clone(),
1749                lang_registry.clone(),
1750                fs.clone(),
1751                cx,
1752            )
1753        });
1754        let (worktree_a, _) = project_a
1755            .update(&mut cx_a, |p, cx| {
1756                p.find_or_create_local_worktree("/dir", false, cx)
1757            })
1758            .await
1759            .unwrap();
1760        worktree_a
1761            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1762            .await;
1763        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1764        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1765        project_a
1766            .update(&mut cx_a, |p, cx| p.share(cx))
1767            .await
1768            .unwrap();
1769
1770        // Join that project as client B
1771        let project_b = Project::remote(
1772            project_id,
1773            client_b.clone(),
1774            client_b.user_store.clone(),
1775            lang_registry.clone(),
1776            fs.clone(),
1777            &mut cx_b.to_async(),
1778        )
1779        .await
1780        .unwrap();
1781
1782        // Open a buffer as client A
1783        let buffer_a = project_a
1784            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1785            .await
1786            .unwrap();
1787
1788        // Start opening the same buffer as client B
1789        let buffer_b = cx_b
1790            .background()
1791            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1792
1793        // Edit the buffer as client A while client B is still opening it.
1794        cx_b.background().simulate_random_delay().await;
1795        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1796        cx_b.background().simulate_random_delay().await;
1797        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1798
1799        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1800        let buffer_b = buffer_b.await.unwrap();
1801        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1802    }
1803
1804    #[gpui::test(iterations = 10)]
1805    async fn test_leaving_worktree_while_opening_buffer(
1806        mut cx_a: TestAppContext,
1807        mut cx_b: TestAppContext,
1808    ) {
1809        cx_a.foreground().forbid_parking();
1810        let lang_registry = Arc::new(LanguageRegistry::new());
1811        let fs = Arc::new(FakeFs::new(cx_a.background()));
1812
1813        // Connect to a server as 2 clients.
1814        let mut server = TestServer::start(cx_a.foreground()).await;
1815        let client_a = server.create_client(&mut cx_a, "user_a").await;
1816        let client_b = server.create_client(&mut cx_b, "user_b").await;
1817
1818        // Share a project as client A
1819        fs.insert_tree(
1820            "/dir",
1821            json!({
1822                ".zed.toml": r#"collaborators = ["user_b"]"#,
1823                "a.txt": "a-contents",
1824            }),
1825        )
1826        .await;
1827        let project_a = cx_a.update(|cx| {
1828            Project::local(
1829                client_a.clone(),
1830                client_a.user_store.clone(),
1831                lang_registry.clone(),
1832                fs.clone(),
1833                cx,
1834            )
1835        });
1836        let (worktree_a, _) = project_a
1837            .update(&mut cx_a, |p, cx| {
1838                p.find_or_create_local_worktree("/dir", false, cx)
1839            })
1840            .await
1841            .unwrap();
1842        worktree_a
1843            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1844            .await;
1845        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1846        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1847        project_a
1848            .update(&mut cx_a, |p, cx| p.share(cx))
1849            .await
1850            .unwrap();
1851
1852        // Join that project as client B
1853        let project_b = Project::remote(
1854            project_id,
1855            client_b.clone(),
1856            client_b.user_store.clone(),
1857            lang_registry.clone(),
1858            fs.clone(),
1859            &mut cx_b.to_async(),
1860        )
1861        .await
1862        .unwrap();
1863
1864        // See that a guest has joined as client A.
1865        project_a
1866            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1867            .await;
1868
1869        // Begin opening a buffer as client B, but leave the project before the open completes.
1870        let buffer_b = cx_b
1871            .background()
1872            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1873        cx_b.update(|_| drop(project_b));
1874        drop(buffer_b);
1875
1876        // See that the guest has left.
1877        project_a
1878            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1879            .await;
1880    }
1881
1882    #[gpui::test(iterations = 10)]
1883    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1884        cx_a.foreground().forbid_parking();
1885        let lang_registry = Arc::new(LanguageRegistry::new());
1886        let fs = Arc::new(FakeFs::new(cx_a.background()));
1887
1888        // Connect to a server as 2 clients.
1889        let mut server = TestServer::start(cx_a.foreground()).await;
1890        let client_a = server.create_client(&mut cx_a, "user_a").await;
1891        let client_b = server.create_client(&mut cx_b, "user_b").await;
1892
1893        // Share a project as client A
1894        fs.insert_tree(
1895            "/a",
1896            json!({
1897                ".zed.toml": r#"collaborators = ["user_b"]"#,
1898                "a.txt": "a-contents",
1899                "b.txt": "b-contents",
1900            }),
1901        )
1902        .await;
1903        let project_a = cx_a.update(|cx| {
1904            Project::local(
1905                client_a.clone(),
1906                client_a.user_store.clone(),
1907                lang_registry.clone(),
1908                fs.clone(),
1909                cx,
1910            )
1911        });
1912        let (worktree_a, _) = project_a
1913            .update(&mut cx_a, |p, cx| {
1914                p.find_or_create_local_worktree("/a", false, cx)
1915            })
1916            .await
1917            .unwrap();
1918        worktree_a
1919            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1920            .await;
1921        let project_id = project_a
1922            .update(&mut cx_a, |project, _| project.next_remote_id())
1923            .await;
1924        project_a
1925            .update(&mut cx_a, |project, cx| project.share(cx))
1926            .await
1927            .unwrap();
1928
1929        // Join that project as client B
1930        let _project_b = Project::remote(
1931            project_id,
1932            client_b.clone(),
1933            client_b.user_store.clone(),
1934            lang_registry.clone(),
1935            fs.clone(),
1936            &mut cx_b.to_async(),
1937        )
1938        .await
1939        .unwrap();
1940
1941        // See that a guest has joined as client A.
1942        project_a
1943            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1944            .await;
1945
1946        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1947        client_b.disconnect(&cx_b.to_async()).unwrap();
1948        project_a
1949            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1950            .await;
1951    }
1952
1953    #[gpui::test(iterations = 10)]
1954    async fn test_collaborating_with_diagnostics(
1955        mut cx_a: TestAppContext,
1956        mut cx_b: TestAppContext,
1957    ) {
1958        cx_a.foreground().forbid_parking();
1959        let mut lang_registry = Arc::new(LanguageRegistry::new());
1960        let fs = Arc::new(FakeFs::new(cx_a.background()));
1961
1962        // Set up a fake language server.
1963        let (language_server_config, mut fake_language_server) =
1964            LanguageServerConfig::fake(&cx_a).await;
1965        Arc::get_mut(&mut lang_registry)
1966            .unwrap()
1967            .add(Arc::new(Language::new(
1968                LanguageConfig {
1969                    name: "Rust".to_string(),
1970                    path_suffixes: vec!["rs".to_string()],
1971                    language_server: Some(language_server_config),
1972                    ..Default::default()
1973                },
1974                Some(tree_sitter_rust::language()),
1975            )));
1976
1977        // Connect to a server as 2 clients.
1978        let mut server = TestServer::start(cx_a.foreground()).await;
1979        let client_a = server.create_client(&mut cx_a, "user_a").await;
1980        let client_b = server.create_client(&mut cx_b, "user_b").await;
1981
1982        // Share a project as client A
1983        fs.insert_tree(
1984            "/a",
1985            json!({
1986                ".zed.toml": r#"collaborators = ["user_b"]"#,
1987                "a.rs": "let one = two",
1988                "other.rs": "",
1989            }),
1990        )
1991        .await;
1992        let project_a = cx_a.update(|cx| {
1993            Project::local(
1994                client_a.clone(),
1995                client_a.user_store.clone(),
1996                lang_registry.clone(),
1997                fs.clone(),
1998                cx,
1999            )
2000        });
2001        let (worktree_a, _) = project_a
2002            .update(&mut cx_a, |p, cx| {
2003                p.find_or_create_local_worktree("/a", false, cx)
2004            })
2005            .await
2006            .unwrap();
2007        worktree_a
2008            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2009            .await;
2010        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2011        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2012        project_a
2013            .update(&mut cx_a, |p, cx| p.share(cx))
2014            .await
2015            .unwrap();
2016
2017        // Cause the language server to start.
2018        let _ = cx_a
2019            .background()
2020            .spawn(project_a.update(&mut cx_a, |project, cx| {
2021                project.open_buffer(
2022                    ProjectPath {
2023                        worktree_id,
2024                        path: Path::new("other.rs").into(),
2025                    },
2026                    cx,
2027                )
2028            }))
2029            .await
2030            .unwrap();
2031
2032        // Simulate a language server reporting errors for a file.
2033        fake_language_server
2034            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2035                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2036                version: None,
2037                diagnostics: vec![lsp::Diagnostic {
2038                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2039                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2040                    message: "message 1".to_string(),
2041                    ..Default::default()
2042                }],
2043            })
2044            .await;
2045
2046        // Wait for server to see the diagnostics update.
2047        server
2048            .condition(|store| {
2049                let worktree = store
2050                    .project(project_id)
2051                    .unwrap()
2052                    .worktrees
2053                    .get(&worktree_id.to_proto())
2054                    .unwrap();
2055
2056                !worktree
2057                    .share
2058                    .as_ref()
2059                    .unwrap()
2060                    .diagnostic_summaries
2061                    .is_empty()
2062            })
2063            .await;
2064
2065        // Join the worktree as client B.
2066        let project_b = Project::remote(
2067            project_id,
2068            client_b.clone(),
2069            client_b.user_store.clone(),
2070            lang_registry.clone(),
2071            fs.clone(),
2072            &mut cx_b.to_async(),
2073        )
2074        .await
2075        .unwrap();
2076
2077        project_b.read_with(&cx_b, |project, cx| {
2078            assert_eq!(
2079                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2080                &[(
2081                    ProjectPath {
2082                        worktree_id,
2083                        path: Arc::from(Path::new("a.rs")),
2084                    },
2085                    DiagnosticSummary {
2086                        error_count: 1,
2087                        warning_count: 0,
2088                        ..Default::default()
2089                    },
2090                )]
2091            )
2092        });
2093
2094        // Simulate a language server reporting more errors for a file.
2095        fake_language_server
2096            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2097                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2098                version: None,
2099                diagnostics: vec![
2100                    lsp::Diagnostic {
2101                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2102                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2103                        message: "message 1".to_string(),
2104                        ..Default::default()
2105                    },
2106                    lsp::Diagnostic {
2107                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2108                        range: lsp::Range::new(
2109                            lsp::Position::new(0, 10),
2110                            lsp::Position::new(0, 13),
2111                        ),
2112                        message: "message 2".to_string(),
2113                        ..Default::default()
2114                    },
2115                ],
2116            })
2117            .await;
2118
2119        // Client b gets the updated summaries
2120        project_b
2121            .condition(&cx_b, |project, cx| {
2122                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2123                    == &[(
2124                        ProjectPath {
2125                            worktree_id,
2126                            path: Arc::from(Path::new("a.rs")),
2127                        },
2128                        DiagnosticSummary {
2129                            error_count: 1,
2130                            warning_count: 1,
2131                            ..Default::default()
2132                        },
2133                    )]
2134            })
2135            .await;
2136
2137        // Open the file with the errors on client B. They should be present.
2138        let buffer_b = cx_b
2139            .background()
2140            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2141            .await
2142            .unwrap();
2143
2144        buffer_b.read_with(&cx_b, |buffer, _| {
2145            assert_eq!(
2146                buffer
2147                    .snapshot()
2148                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2149                    .map(|entry| entry)
2150                    .collect::<Vec<_>>(),
2151                &[
2152                    DiagnosticEntry {
2153                        range: Point::new(0, 4)..Point::new(0, 7),
2154                        diagnostic: Diagnostic {
2155                            group_id: 0,
2156                            message: "message 1".to_string(),
2157                            severity: lsp::DiagnosticSeverity::ERROR,
2158                            is_primary: true,
2159                            ..Default::default()
2160                        }
2161                    },
2162                    DiagnosticEntry {
2163                        range: Point::new(0, 10)..Point::new(0, 13),
2164                        diagnostic: Diagnostic {
2165                            group_id: 1,
2166                            severity: lsp::DiagnosticSeverity::WARNING,
2167                            message: "message 2".to_string(),
2168                            is_primary: true,
2169                            ..Default::default()
2170                        }
2171                    }
2172                ]
2173            );
2174        });
2175    }
2176
2177    #[gpui::test(iterations = 10)]
2178    async fn test_collaborating_with_completion(
2179        mut cx_a: TestAppContext,
2180        mut cx_b: TestAppContext,
2181    ) {
2182        cx_a.foreground().forbid_parking();
2183        let mut lang_registry = Arc::new(LanguageRegistry::new());
2184        let fs = Arc::new(FakeFs::new(cx_a.background()));
2185
2186        // Set up a fake language server.
2187        let (language_server_config, mut fake_language_server) =
2188            LanguageServerConfig::fake_with_capabilities(
2189                lsp::ServerCapabilities {
2190                    completion_provider: Some(lsp::CompletionOptions {
2191                        trigger_characters: Some(vec![".".to_string()]),
2192                        ..Default::default()
2193                    }),
2194                    ..Default::default()
2195                },
2196                &cx_a,
2197            )
2198            .await;
2199        Arc::get_mut(&mut lang_registry)
2200            .unwrap()
2201            .add(Arc::new(Language::new(
2202                LanguageConfig {
2203                    name: "Rust".to_string(),
2204                    path_suffixes: vec!["rs".to_string()],
2205                    language_server: Some(language_server_config),
2206                    ..Default::default()
2207                },
2208                Some(tree_sitter_rust::language()),
2209            )));
2210
2211        // Connect to a server as 2 clients.
2212        let mut server = TestServer::start(cx_a.foreground()).await;
2213        let client_a = server.create_client(&mut cx_a, "user_a").await;
2214        let client_b = server.create_client(&mut cx_b, "user_b").await;
2215
2216        // Share a project as client A
2217        fs.insert_tree(
2218            "/a",
2219            json!({
2220                ".zed.toml": r#"collaborators = ["user_b"]"#,
2221                "main.rs": "fn main() { a }",
2222                "other.rs": "",
2223            }),
2224        )
2225        .await;
2226        let project_a = cx_a.update(|cx| {
2227            Project::local(
2228                client_a.clone(),
2229                client_a.user_store.clone(),
2230                lang_registry.clone(),
2231                fs.clone(),
2232                cx,
2233            )
2234        });
2235        let (worktree_a, _) = project_a
2236            .update(&mut cx_a, |p, cx| {
2237                p.find_or_create_local_worktree("/a", false, cx)
2238            })
2239            .await
2240            .unwrap();
2241        worktree_a
2242            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2243            .await;
2244        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2245        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2246        project_a
2247            .update(&mut cx_a, |p, cx| p.share(cx))
2248            .await
2249            .unwrap();
2250
2251        // Join the worktree as client B.
2252        let project_b = Project::remote(
2253            project_id,
2254            client_b.clone(),
2255            client_b.user_store.clone(),
2256            lang_registry.clone(),
2257            fs.clone(),
2258            &mut cx_b.to_async(),
2259        )
2260        .await
2261        .unwrap();
2262
2263        // Open a file in an editor as the guest.
2264        let buffer_b = project_b
2265            .update(&mut cx_b, |p, cx| {
2266                p.open_buffer((worktree_id, "main.rs"), cx)
2267            })
2268            .await
2269            .unwrap();
2270        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2271        let editor_b = cx_b.add_view(window_b, |cx| {
2272            Editor::for_buffer(
2273                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2274                Arc::new(|cx| EditorSettings::test(cx)),
2275                Some(project_b.clone()),
2276                cx,
2277            )
2278        });
2279
2280        // Type a completion trigger character as the guest.
2281        editor_b.update(&mut cx_b, |editor, cx| {
2282            editor.select_ranges([13..13], None, cx);
2283            editor.handle_input(&Input(".".into()), cx);
2284            cx.focus(&editor_b);
2285        });
2286
2287        // Receive a completion request as the host's language server.
2288        // Return some completions from the host's language server.
2289        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2290            assert_eq!(
2291                params.text_document_position.text_document.uri,
2292                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2293            );
2294            assert_eq!(
2295                params.text_document_position.position,
2296                lsp::Position::new(0, 14),
2297            );
2298
2299            Some(lsp::CompletionResponse::Array(vec![
2300                lsp::CompletionItem {
2301                    label: "first_method(…)".into(),
2302                    detail: Some("fn(&mut self, B) -> C".into()),
2303                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2304                        new_text: "first_method($1)".to_string(),
2305                        range: lsp::Range::new(
2306                            lsp::Position::new(0, 14),
2307                            lsp::Position::new(0, 14),
2308                        ),
2309                    })),
2310                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2311                    ..Default::default()
2312                },
2313                lsp::CompletionItem {
2314                    label: "second_method(…)".into(),
2315                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2316                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2317                        new_text: "second_method()".to_string(),
2318                        range: lsp::Range::new(
2319                            lsp::Position::new(0, 14),
2320                            lsp::Position::new(0, 14),
2321                        ),
2322                    })),
2323                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2324                    ..Default::default()
2325                },
2326            ]))
2327        });
2328
2329        // Open the buffer on the host.
2330        let buffer_a = project_a
2331            .update(&mut cx_a, |p, cx| {
2332                p.open_buffer((worktree_id, "main.rs"), cx)
2333            })
2334            .await
2335            .unwrap();
2336        buffer_a
2337            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2338            .await;
2339
2340        // Confirm a completion on the guest.
2341        editor_b
2342            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2343            .await;
2344        editor_b.update(&mut cx_b, |editor, cx| {
2345            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2346            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2347        });
2348
2349        // Return a resolved completion from the host's language server.
2350        // The resolved completion has an additional text edit.
2351        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2352            assert_eq!(params.label, "first_method(…)");
2353            lsp::CompletionItem {
2354                label: "first_method(…)".into(),
2355                detail: Some("fn(&mut self, B) -> C".into()),
2356                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2357                    new_text: "first_method($1)".to_string(),
2358                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2359                })),
2360                additional_text_edits: Some(vec![lsp::TextEdit {
2361                    new_text: "use d::SomeTrait;\n".to_string(),
2362                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2363                }]),
2364                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2365                ..Default::default()
2366            }
2367        });
2368
2369        // The additional edit is applied.
2370        buffer_a
2371            .condition(&cx_a, |buffer, _| {
2372                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2373            })
2374            .await;
2375        buffer_b
2376            .condition(&cx_b, |buffer, _| {
2377                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2378            })
2379            .await;
2380    }
2381
2382    #[gpui::test(iterations = 10)]
2383    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2384        cx_a.foreground().forbid_parking();
2385        let mut lang_registry = Arc::new(LanguageRegistry::new());
2386        let fs = Arc::new(FakeFs::new(cx_a.background()));
2387
2388        // Set up a fake language server.
2389        let (language_server_config, mut fake_language_server) =
2390            LanguageServerConfig::fake(&cx_a).await;
2391        Arc::get_mut(&mut lang_registry)
2392            .unwrap()
2393            .add(Arc::new(Language::new(
2394                LanguageConfig {
2395                    name: "Rust".to_string(),
2396                    path_suffixes: vec!["rs".to_string()],
2397                    language_server: Some(language_server_config),
2398                    ..Default::default()
2399                },
2400                Some(tree_sitter_rust::language()),
2401            )));
2402
2403        // Connect to a server as 2 clients.
2404        let mut server = TestServer::start(cx_a.foreground()).await;
2405        let client_a = server.create_client(&mut cx_a, "user_a").await;
2406        let client_b = server.create_client(&mut cx_b, "user_b").await;
2407
2408        // Share a project as client A
2409        fs.insert_tree(
2410            "/a",
2411            json!({
2412                ".zed.toml": r#"collaborators = ["user_b"]"#,
2413                "a.rs": "let one = two",
2414            }),
2415        )
2416        .await;
2417        let project_a = cx_a.update(|cx| {
2418            Project::local(
2419                client_a.clone(),
2420                client_a.user_store.clone(),
2421                lang_registry.clone(),
2422                fs.clone(),
2423                cx,
2424            )
2425        });
2426        let (worktree_a, _) = project_a
2427            .update(&mut cx_a, |p, cx| {
2428                p.find_or_create_local_worktree("/a", false, cx)
2429            })
2430            .await
2431            .unwrap();
2432        worktree_a
2433            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2434            .await;
2435        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2436        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2437        project_a
2438            .update(&mut cx_a, |p, cx| p.share(cx))
2439            .await
2440            .unwrap();
2441
2442        // Join the worktree as client B.
2443        let project_b = Project::remote(
2444            project_id,
2445            client_b.clone(),
2446            client_b.user_store.clone(),
2447            lang_registry.clone(),
2448            fs.clone(),
2449            &mut cx_b.to_async(),
2450        )
2451        .await
2452        .unwrap();
2453
2454        let buffer_b = cx_b
2455            .background()
2456            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2457            .await
2458            .unwrap();
2459
2460        let format = project_b.update(&mut cx_b, |project, cx| {
2461            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2462        });
2463
2464        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2465            Some(vec![
2466                lsp::TextEdit {
2467                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2468                    new_text: "h".to_string(),
2469                },
2470                lsp::TextEdit {
2471                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2472                    new_text: "y".to_string(),
2473                },
2474            ])
2475        });
2476
2477        format.await.unwrap();
2478        assert_eq!(
2479            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2480            "let honey = two"
2481        );
2482    }
2483
2484    #[gpui::test(iterations = 10)]
2485    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2486        cx_a.foreground().forbid_parking();
2487        let mut lang_registry = Arc::new(LanguageRegistry::new());
2488        let fs = Arc::new(FakeFs::new(cx_a.background()));
2489        fs.insert_tree(
2490            "/root-1",
2491            json!({
2492                ".zed.toml": r#"collaborators = ["user_b"]"#,
2493                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2494            }),
2495        )
2496        .await;
2497        fs.insert_tree(
2498            "/root-2",
2499            json!({
2500                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2501            }),
2502        )
2503        .await;
2504
2505        // Set up a fake language server.
2506        let (language_server_config, mut fake_language_server) =
2507            LanguageServerConfig::fake(&cx_a).await;
2508        Arc::get_mut(&mut lang_registry)
2509            .unwrap()
2510            .add(Arc::new(Language::new(
2511                LanguageConfig {
2512                    name: "Rust".to_string(),
2513                    path_suffixes: vec!["rs".to_string()],
2514                    language_server: Some(language_server_config),
2515                    ..Default::default()
2516                },
2517                Some(tree_sitter_rust::language()),
2518            )));
2519
2520        // Connect to a server as 2 clients.
2521        let mut server = TestServer::start(cx_a.foreground()).await;
2522        let client_a = server.create_client(&mut cx_a, "user_a").await;
2523        let client_b = server.create_client(&mut cx_b, "user_b").await;
2524
2525        // Share a project as client A
2526        let project_a = cx_a.update(|cx| {
2527            Project::local(
2528                client_a.clone(),
2529                client_a.user_store.clone(),
2530                lang_registry.clone(),
2531                fs.clone(),
2532                cx,
2533            )
2534        });
2535        let (worktree_a, _) = project_a
2536            .update(&mut cx_a, |p, cx| {
2537                p.find_or_create_local_worktree("/root-1", false, cx)
2538            })
2539            .await
2540            .unwrap();
2541        worktree_a
2542            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2543            .await;
2544        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2545        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2546        project_a
2547            .update(&mut cx_a, |p, cx| p.share(cx))
2548            .await
2549            .unwrap();
2550
2551        // Join the worktree as client B.
2552        let project_b = Project::remote(
2553            project_id,
2554            client_b.clone(),
2555            client_b.user_store.clone(),
2556            lang_registry.clone(),
2557            fs.clone(),
2558            &mut cx_b.to_async(),
2559        )
2560        .await
2561        .unwrap();
2562
2563        // Open the file on client B.
2564        let buffer_b = cx_b
2565            .background()
2566            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2567            .await
2568            .unwrap();
2569
2570        // Request the definition of a symbol as the guest.
2571        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2572        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2573            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2574                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2575                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2576            )))
2577        });
2578
2579        let definitions_1 = definitions_1.await.unwrap();
2580        cx_b.read(|cx| {
2581            assert_eq!(definitions_1.len(), 1);
2582            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2583            let target_buffer = definitions_1[0].target_buffer.read(cx);
2584            assert_eq!(
2585                target_buffer.text(),
2586                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2587            );
2588            assert_eq!(
2589                definitions_1[0].target_range.to_point(target_buffer),
2590                Point::new(0, 6)..Point::new(0, 9)
2591            );
2592        });
2593
2594        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2595        // the previous call to `definition`.
2596        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2597        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2598            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2599                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2600                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2601            )))
2602        });
2603
2604        let definitions_2 = definitions_2.await.unwrap();
2605        cx_b.read(|cx| {
2606            assert_eq!(definitions_2.len(), 1);
2607            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2608            let target_buffer = definitions_2[0].target_buffer.read(cx);
2609            assert_eq!(
2610                target_buffer.text(),
2611                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2612            );
2613            assert_eq!(
2614                definitions_2[0].target_range.to_point(target_buffer),
2615                Point::new(1, 6)..Point::new(1, 11)
2616            );
2617        });
2618        assert_eq!(
2619            definitions_1[0].target_buffer,
2620            definitions_2[0].target_buffer
2621        );
2622
2623        cx_b.update(|_| {
2624            drop(definitions_1);
2625            drop(definitions_2);
2626        });
2627        project_b
2628            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2629            .await;
2630    }
2631
2632    #[gpui::test(iterations = 10)]
2633    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2634        mut cx_a: TestAppContext,
2635        mut cx_b: TestAppContext,
2636        mut rng: StdRng,
2637    ) {
2638        cx_a.foreground().forbid_parking();
2639        let mut lang_registry = Arc::new(LanguageRegistry::new());
2640        let fs = Arc::new(FakeFs::new(cx_a.background()));
2641        fs.insert_tree(
2642            "/root",
2643            json!({
2644                ".zed.toml": r#"collaborators = ["user_b"]"#,
2645                "a.rs": "const ONE: usize = b::TWO;",
2646                "b.rs": "const TWO: usize = 2",
2647            }),
2648        )
2649        .await;
2650
2651        // Set up a fake language server.
2652        let (language_server_config, mut fake_language_server) =
2653            LanguageServerConfig::fake(&cx_a).await;
2654
2655        Arc::get_mut(&mut lang_registry)
2656            .unwrap()
2657            .add(Arc::new(Language::new(
2658                LanguageConfig {
2659                    name: "Rust".to_string(),
2660                    path_suffixes: vec!["rs".to_string()],
2661                    language_server: Some(language_server_config),
2662                    ..Default::default()
2663                },
2664                Some(tree_sitter_rust::language()),
2665            )));
2666
2667        // Connect to a server as 2 clients.
2668        let mut server = TestServer::start(cx_a.foreground()).await;
2669        let client_a = server.create_client(&mut cx_a, "user_a").await;
2670        let client_b = server.create_client(&mut cx_b, "user_b").await;
2671
2672        // Share a project as client A
2673        let project_a = cx_a.update(|cx| {
2674            Project::local(
2675                client_a.clone(),
2676                client_a.user_store.clone(),
2677                lang_registry.clone(),
2678                fs.clone(),
2679                cx,
2680            )
2681        });
2682
2683        let (worktree_a, _) = project_a
2684            .update(&mut cx_a, |p, cx| {
2685                p.find_or_create_local_worktree("/root", false, cx)
2686            })
2687            .await
2688            .unwrap();
2689        worktree_a
2690            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2691            .await;
2692        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2693        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2694        project_a
2695            .update(&mut cx_a, |p, cx| p.share(cx))
2696            .await
2697            .unwrap();
2698
2699        // Join the worktree as client B.
2700        let project_b = Project::remote(
2701            project_id,
2702            client_b.clone(),
2703            client_b.user_store.clone(),
2704            lang_registry.clone(),
2705            fs.clone(),
2706            &mut cx_b.to_async(),
2707        )
2708        .await
2709        .unwrap();
2710
2711        let buffer_b1 = cx_b
2712            .background()
2713            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2714            .await
2715            .unwrap();
2716
2717        let definitions;
2718        let buffer_b2;
2719        if rng.gen() {
2720            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2721            buffer_b2 =
2722                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2723        } else {
2724            buffer_b2 =
2725                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2726            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2727        }
2728
2729        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2730            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2731                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2732                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2733            )))
2734        });
2735
2736        let buffer_b2 = buffer_b2.await.unwrap();
2737        let definitions = definitions.await.unwrap();
2738        assert_eq!(definitions.len(), 1);
2739        assert_eq!(definitions[0].target_buffer, buffer_b2);
2740    }
2741
2742    #[gpui::test(iterations = 10)]
2743    async fn test_collaborating_with_code_actions(
2744        mut cx_a: TestAppContext,
2745        mut cx_b: TestAppContext,
2746    ) {
2747        cx_a.foreground().forbid_parking();
2748        let mut lang_registry = Arc::new(LanguageRegistry::new());
2749        let fs = Arc::new(FakeFs::new(cx_a.background()));
2750        let mut path_openers_b = Vec::new();
2751        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2752
2753        // Set up a fake language server.
2754        let (language_server_config, mut fake_language_server) =
2755            LanguageServerConfig::fake_with_capabilities(
2756                lsp::ServerCapabilities {
2757                    ..Default::default()
2758                },
2759                &cx_a,
2760            )
2761            .await;
2762        Arc::get_mut(&mut lang_registry)
2763            .unwrap()
2764            .add(Arc::new(Language::new(
2765                LanguageConfig {
2766                    name: "Rust".to_string(),
2767                    path_suffixes: vec!["rs".to_string()],
2768                    language_server: Some(language_server_config),
2769                    ..Default::default()
2770                },
2771                Some(tree_sitter_rust::language()),
2772            )));
2773
2774        // Connect to a server as 2 clients.
2775        let mut server = TestServer::start(cx_a.foreground()).await;
2776        let client_a = server.create_client(&mut cx_a, "user_a").await;
2777        let client_b = server.create_client(&mut cx_b, "user_b").await;
2778
2779        // Share a project as client A
2780        fs.insert_tree(
2781            "/a",
2782            json!({
2783                ".zed.toml": r#"collaborators = ["user_b"]"#,
2784                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2785                "other.rs": "pub fn foo() -> usize { 4 }",
2786            }),
2787        )
2788        .await;
2789        let project_a = cx_a.update(|cx| {
2790            Project::local(
2791                client_a.clone(),
2792                client_a.user_store.clone(),
2793                lang_registry.clone(),
2794                fs.clone(),
2795                cx,
2796            )
2797        });
2798        let (worktree_a, _) = project_a
2799            .update(&mut cx_a, |p, cx| {
2800                p.find_or_create_local_worktree("/a", false, cx)
2801            })
2802            .await
2803            .unwrap();
2804        worktree_a
2805            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2806            .await;
2807        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2808        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2809        project_a
2810            .update(&mut cx_a, |p, cx| p.share(cx))
2811            .await
2812            .unwrap();
2813
2814        // Join the worktree as client B.
2815        let project_b = Project::remote(
2816            project_id,
2817            client_b.clone(),
2818            client_b.user_store.clone(),
2819            lang_registry.clone(),
2820            fs.clone(),
2821            &mut cx_b.to_async(),
2822        )
2823        .await
2824        .unwrap();
2825        let mut params = cx_b.update(WorkspaceParams::test);
2826        params.languages = lang_registry.clone();
2827        params.client = client_b.client.clone();
2828        params.user_store = client_b.user_store.clone();
2829        params.project = project_b;
2830        params.path_openers = path_openers_b.into();
2831
2832        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2833        let editor_b = workspace_b
2834            .update(&mut cx_b, |workspace, cx| {
2835                workspace.open_path((worktree_id, "main.rs").into(), cx)
2836            })
2837            .await
2838            .unwrap()
2839            .downcast::<Editor>()
2840            .unwrap();
2841        fake_language_server
2842            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2843                assert_eq!(
2844                    params.text_document.uri,
2845                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2846                );
2847                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2848                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2849                None
2850            })
2851            .next()
2852            .await;
2853
2854        // Move cursor to a location that contains code actions.
2855        editor_b.update(&mut cx_b, |editor, cx| {
2856            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2857            cx.focus(&editor_b);
2858        });
2859        fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2860            assert_eq!(
2861                params.text_document.uri,
2862                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2863            );
2864            assert_eq!(params.range.start, lsp::Position::new(1, 31));
2865            assert_eq!(params.range.end, lsp::Position::new(1, 31));
2866
2867            Some(vec![lsp::CodeActionOrCommand::CodeAction(
2868                lsp::CodeAction {
2869                    title: "Inline into all callers".to_string(),
2870                    edit: Some(lsp::WorkspaceEdit {
2871                        changes: Some(
2872                            [
2873                                (
2874                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2875                                    vec![lsp::TextEdit::new(
2876                                        lsp::Range::new(
2877                                            lsp::Position::new(1, 22),
2878                                            lsp::Position::new(1, 34),
2879                                        ),
2880                                        "4".to_string(),
2881                                    )],
2882                                ),
2883                                (
2884                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
2885                                    vec![lsp::TextEdit::new(
2886                                        lsp::Range::new(
2887                                            lsp::Position::new(0, 0),
2888                                            lsp::Position::new(0, 27),
2889                                        ),
2890                                        "".to_string(),
2891                                    )],
2892                                ),
2893                            ]
2894                            .into_iter()
2895                            .collect(),
2896                        ),
2897                        ..Default::default()
2898                    }),
2899                    data: Some(json!({
2900                        "codeActionParams": {
2901                            "range": {
2902                                "start": {"line": 1, "column": 31},
2903                                "end": {"line": 1, "column": 31},
2904                            }
2905                        }
2906                    })),
2907                    ..Default::default()
2908                },
2909            )])
2910        });
2911
2912        // Toggle code actions and wait for them to display.
2913        editor_b.update(&mut cx_b, |editor, cx| {
2914            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2915        });
2916        editor_b
2917            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2918            .await;
2919
2920        // Confirming the code action will trigger a resolve request.
2921        let confirm_action = workspace_b
2922            .update(&mut cx_b, |workspace, cx| {
2923                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2924            })
2925            .unwrap();
2926        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2927            lsp::CodeAction {
2928                title: "Inline into all callers".to_string(),
2929                edit: Some(lsp::WorkspaceEdit {
2930                    changes: Some(
2931                        [
2932                            (
2933                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2934                                vec![lsp::TextEdit::new(
2935                                    lsp::Range::new(
2936                                        lsp::Position::new(1, 22),
2937                                        lsp::Position::new(1, 34),
2938                                    ),
2939                                    "4".to_string(),
2940                                )],
2941                            ),
2942                            (
2943                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2944                                vec![lsp::TextEdit::new(
2945                                    lsp::Range::new(
2946                                        lsp::Position::new(0, 0),
2947                                        lsp::Position::new(0, 27),
2948                                    ),
2949                                    "".to_string(),
2950                                )],
2951                            ),
2952                        ]
2953                        .into_iter()
2954                        .collect(),
2955                    ),
2956                    ..Default::default()
2957                }),
2958                ..Default::default()
2959            }
2960        });
2961
2962        // After the action is confirmed, an editor containing both modified files is opened.
2963        confirm_action.await.unwrap();
2964        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2965            workspace
2966                .active_item(cx)
2967                .unwrap()
2968                .downcast::<Editor>()
2969                .unwrap()
2970        });
2971        code_action_editor.update(&mut cx_b, |editor, cx| {
2972            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2973            editor.undo(&Undo, cx);
2974            assert_eq!(
2975                editor.text(cx),
2976                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2977            );
2978            editor.redo(&Redo, cx);
2979            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2980        });
2981    }
2982
2983    #[gpui::test(iterations = 10)]
2984    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2985        cx_a.foreground().forbid_parking();
2986
2987        // Connect to a server as 2 clients.
2988        let mut server = TestServer::start(cx_a.foreground()).await;
2989        let client_a = server.create_client(&mut cx_a, "user_a").await;
2990        let client_b = server.create_client(&mut cx_b, "user_b").await;
2991
2992        // Create an org that includes these 2 users.
2993        let db = &server.app_state.db;
2994        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2995        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2996            .await
2997            .unwrap();
2998        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2999            .await
3000            .unwrap();
3001
3002        // Create a channel that includes all the users.
3003        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3004        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3005            .await
3006            .unwrap();
3007        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3008            .await
3009            .unwrap();
3010        db.create_channel_message(
3011            channel_id,
3012            client_b.current_user_id(&cx_b),
3013            "hello A, it's B.",
3014            OffsetDateTime::now_utc(),
3015            1,
3016        )
3017        .await
3018        .unwrap();
3019
3020        let channels_a = cx_a
3021            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3022        channels_a
3023            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3024            .await;
3025        channels_a.read_with(&cx_a, |list, _| {
3026            assert_eq!(
3027                list.available_channels().unwrap(),
3028                &[ChannelDetails {
3029                    id: channel_id.to_proto(),
3030                    name: "test-channel".to_string()
3031                }]
3032            )
3033        });
3034        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3035            this.get_channel(channel_id.to_proto(), cx).unwrap()
3036        });
3037        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3038        channel_a
3039            .condition(&cx_a, |channel, _| {
3040                channel_messages(channel)
3041                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3042            })
3043            .await;
3044
3045        let channels_b = cx_b
3046            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3047        channels_b
3048            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3049            .await;
3050        channels_b.read_with(&cx_b, |list, _| {
3051            assert_eq!(
3052                list.available_channels().unwrap(),
3053                &[ChannelDetails {
3054                    id: channel_id.to_proto(),
3055                    name: "test-channel".to_string()
3056                }]
3057            )
3058        });
3059
3060        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3061            this.get_channel(channel_id.to_proto(), cx).unwrap()
3062        });
3063        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3064        channel_b
3065            .condition(&cx_b, |channel, _| {
3066                channel_messages(channel)
3067                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3068            })
3069            .await;
3070
3071        channel_a
3072            .update(&mut cx_a, |channel, cx| {
3073                channel
3074                    .send_message("oh, hi B.".to_string(), cx)
3075                    .unwrap()
3076                    .detach();
3077                let task = channel.send_message("sup".to_string(), cx).unwrap();
3078                assert_eq!(
3079                    channel_messages(channel),
3080                    &[
3081                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3082                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3083                        ("user_a".to_string(), "sup".to_string(), true)
3084                    ]
3085                );
3086                task
3087            })
3088            .await
3089            .unwrap();
3090
3091        channel_b
3092            .condition(&cx_b, |channel, _| {
3093                channel_messages(channel)
3094                    == [
3095                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3096                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3097                        ("user_a".to_string(), "sup".to_string(), false),
3098                    ]
3099            })
3100            .await;
3101
3102        assert_eq!(
3103            server
3104                .state()
3105                .await
3106                .channel(channel_id)
3107                .unwrap()
3108                .connection_ids
3109                .len(),
3110            2
3111        );
3112        cx_b.update(|_| drop(channel_b));
3113        server
3114            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3115            .await;
3116
3117        cx_a.update(|_| drop(channel_a));
3118        server
3119            .condition(|state| state.channel(channel_id).is_none())
3120            .await;
3121    }
3122
3123    #[gpui::test(iterations = 10)]
3124    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3125        cx_a.foreground().forbid_parking();
3126
3127        let mut server = TestServer::start(cx_a.foreground()).await;
3128        let client_a = server.create_client(&mut cx_a, "user_a").await;
3129
3130        let db = &server.app_state.db;
3131        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3132        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3133        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3134            .await
3135            .unwrap();
3136        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3137            .await
3138            .unwrap();
3139
3140        let channels_a = cx_a
3141            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3142        channels_a
3143            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3144            .await;
3145        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3146            this.get_channel(channel_id.to_proto(), cx).unwrap()
3147        });
3148
3149        // Messages aren't allowed to be too long.
3150        channel_a
3151            .update(&mut cx_a, |channel, cx| {
3152                let long_body = "this is long.\n".repeat(1024);
3153                channel.send_message(long_body, cx).unwrap()
3154            })
3155            .await
3156            .unwrap_err();
3157
3158        // Messages aren't allowed to be blank.
3159        channel_a.update(&mut cx_a, |channel, cx| {
3160            channel.send_message(String::new(), cx).unwrap_err()
3161        });
3162
3163        // Leading and trailing whitespace are trimmed.
3164        channel_a
3165            .update(&mut cx_a, |channel, cx| {
3166                channel
3167                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3168                    .unwrap()
3169            })
3170            .await
3171            .unwrap();
3172        assert_eq!(
3173            db.get_channel_messages(channel_id, 10, None)
3174                .await
3175                .unwrap()
3176                .iter()
3177                .map(|m| &m.body)
3178                .collect::<Vec<_>>(),
3179            &["surrounded by whitespace"]
3180        );
3181    }
3182
3183    #[gpui::test(iterations = 10)]
3184    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3185        cx_a.foreground().forbid_parking();
3186
3187        // Connect to a server as 2 clients.
3188        let mut server = TestServer::start(cx_a.foreground()).await;
3189        let client_a = server.create_client(&mut cx_a, "user_a").await;
3190        let client_b = server.create_client(&mut cx_b, "user_b").await;
3191        let mut status_b = client_b.status();
3192
3193        // Create an org that includes these 2 users.
3194        let db = &server.app_state.db;
3195        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3196        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3197            .await
3198            .unwrap();
3199        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3200            .await
3201            .unwrap();
3202
3203        // Create a channel that includes all the users.
3204        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3205        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3206            .await
3207            .unwrap();
3208        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3209            .await
3210            .unwrap();
3211        db.create_channel_message(
3212            channel_id,
3213            client_b.current_user_id(&cx_b),
3214            "hello A, it's B.",
3215            OffsetDateTime::now_utc(),
3216            2,
3217        )
3218        .await
3219        .unwrap();
3220
3221        let channels_a = cx_a
3222            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3223        channels_a
3224            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3225            .await;
3226
3227        channels_a.read_with(&cx_a, |list, _| {
3228            assert_eq!(
3229                list.available_channels().unwrap(),
3230                &[ChannelDetails {
3231                    id: channel_id.to_proto(),
3232                    name: "test-channel".to_string()
3233                }]
3234            )
3235        });
3236        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3237            this.get_channel(channel_id.to_proto(), cx).unwrap()
3238        });
3239        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3240        channel_a
3241            .condition(&cx_a, |channel, _| {
3242                channel_messages(channel)
3243                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3244            })
3245            .await;
3246
3247        let channels_b = cx_b
3248            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3249        channels_b
3250            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3251            .await;
3252        channels_b.read_with(&cx_b, |list, _| {
3253            assert_eq!(
3254                list.available_channels().unwrap(),
3255                &[ChannelDetails {
3256                    id: channel_id.to_proto(),
3257                    name: "test-channel".to_string()
3258                }]
3259            )
3260        });
3261
3262        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3263            this.get_channel(channel_id.to_proto(), cx).unwrap()
3264        });
3265        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3266        channel_b
3267            .condition(&cx_b, |channel, _| {
3268                channel_messages(channel)
3269                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3270            })
3271            .await;
3272
3273        // Disconnect client B, ensuring we can still access its cached channel data.
3274        server.forbid_connections();
3275        server.disconnect_client(client_b.current_user_id(&cx_b));
3276        while !matches!(
3277            status_b.next().await,
3278            Some(client::Status::ReconnectionError { .. })
3279        ) {}
3280
3281        channels_b.read_with(&cx_b, |channels, _| {
3282            assert_eq!(
3283                channels.available_channels().unwrap(),
3284                [ChannelDetails {
3285                    id: channel_id.to_proto(),
3286                    name: "test-channel".to_string()
3287                }]
3288            )
3289        });
3290        channel_b.read_with(&cx_b, |channel, _| {
3291            assert_eq!(
3292                channel_messages(channel),
3293                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3294            )
3295        });
3296
3297        // Send a message from client B while it is disconnected.
3298        channel_b
3299            .update(&mut cx_b, |channel, cx| {
3300                let task = channel
3301                    .send_message("can you see this?".to_string(), cx)
3302                    .unwrap();
3303                assert_eq!(
3304                    channel_messages(channel),
3305                    &[
3306                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3307                        ("user_b".to_string(), "can you see this?".to_string(), true)
3308                    ]
3309                );
3310                task
3311            })
3312            .await
3313            .unwrap_err();
3314
3315        // Send a message from client A while B is disconnected.
3316        channel_a
3317            .update(&mut cx_a, |channel, cx| {
3318                channel
3319                    .send_message("oh, hi B.".to_string(), cx)
3320                    .unwrap()
3321                    .detach();
3322                let task = channel.send_message("sup".to_string(), cx).unwrap();
3323                assert_eq!(
3324                    channel_messages(channel),
3325                    &[
3326                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3327                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3328                        ("user_a".to_string(), "sup".to_string(), true)
3329                    ]
3330                );
3331                task
3332            })
3333            .await
3334            .unwrap();
3335
3336        // Give client B a chance to reconnect.
3337        server.allow_connections();
3338        cx_b.foreground().advance_clock(Duration::from_secs(10));
3339
3340        // Verify that B sees the new messages upon reconnection, as well as the message client B
3341        // sent while offline.
3342        channel_b
3343            .condition(&cx_b, |channel, _| {
3344                channel_messages(channel)
3345                    == [
3346                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3347                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3348                        ("user_a".to_string(), "sup".to_string(), false),
3349                        ("user_b".to_string(), "can you see this?".to_string(), false),
3350                    ]
3351            })
3352            .await;
3353
3354        // Ensure client A and B can communicate normally after reconnection.
3355        channel_a
3356            .update(&mut cx_a, |channel, cx| {
3357                channel.send_message("you online?".to_string(), cx).unwrap()
3358            })
3359            .await
3360            .unwrap();
3361        channel_b
3362            .condition(&cx_b, |channel, _| {
3363                channel_messages(channel)
3364                    == [
3365                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3366                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3367                        ("user_a".to_string(), "sup".to_string(), false),
3368                        ("user_b".to_string(), "can you see this?".to_string(), false),
3369                        ("user_a".to_string(), "you online?".to_string(), false),
3370                    ]
3371            })
3372            .await;
3373
3374        channel_b
3375            .update(&mut cx_b, |channel, cx| {
3376                channel.send_message("yep".to_string(), cx).unwrap()
3377            })
3378            .await
3379            .unwrap();
3380        channel_a
3381            .condition(&cx_a, |channel, _| {
3382                channel_messages(channel)
3383                    == [
3384                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3385                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3386                        ("user_a".to_string(), "sup".to_string(), false),
3387                        ("user_b".to_string(), "can you see this?".to_string(), false),
3388                        ("user_a".to_string(), "you online?".to_string(), false),
3389                        ("user_b".to_string(), "yep".to_string(), false),
3390                    ]
3391            })
3392            .await;
3393    }
3394
3395    #[gpui::test(iterations = 10)]
3396    async fn test_contacts(
3397        mut cx_a: TestAppContext,
3398        mut cx_b: TestAppContext,
3399        mut cx_c: TestAppContext,
3400    ) {
3401        cx_a.foreground().forbid_parking();
3402        let lang_registry = Arc::new(LanguageRegistry::new());
3403        let fs = Arc::new(FakeFs::new(cx_a.background()));
3404
3405        // Connect to a server as 3 clients.
3406        let mut server = TestServer::start(cx_a.foreground()).await;
3407        let client_a = server.create_client(&mut cx_a, "user_a").await;
3408        let client_b = server.create_client(&mut cx_b, "user_b").await;
3409        let client_c = server.create_client(&mut cx_c, "user_c").await;
3410
3411        // Share a worktree as client A.
3412        fs.insert_tree(
3413            "/a",
3414            json!({
3415                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3416            }),
3417        )
3418        .await;
3419
3420        let project_a = cx_a.update(|cx| {
3421            Project::local(
3422                client_a.clone(),
3423                client_a.user_store.clone(),
3424                lang_registry.clone(),
3425                fs.clone(),
3426                cx,
3427            )
3428        });
3429        let (worktree_a, _) = project_a
3430            .update(&mut cx_a, |p, cx| {
3431                p.find_or_create_local_worktree("/a", false, cx)
3432            })
3433            .await
3434            .unwrap();
3435        worktree_a
3436            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3437            .await;
3438
3439        client_a
3440            .user_store
3441            .condition(&cx_a, |user_store, _| {
3442                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3443            })
3444            .await;
3445        client_b
3446            .user_store
3447            .condition(&cx_b, |user_store, _| {
3448                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3449            })
3450            .await;
3451        client_c
3452            .user_store
3453            .condition(&cx_c, |user_store, _| {
3454                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3455            })
3456            .await;
3457
3458        let project_id = project_a
3459            .update(&mut cx_a, |project, _| project.next_remote_id())
3460            .await;
3461        project_a
3462            .update(&mut cx_a, |project, cx| project.share(cx))
3463            .await
3464            .unwrap();
3465
3466        let _project_b = Project::remote(
3467            project_id,
3468            client_b.clone(),
3469            client_b.user_store.clone(),
3470            lang_registry.clone(),
3471            fs.clone(),
3472            &mut cx_b.to_async(),
3473        )
3474        .await
3475        .unwrap();
3476
3477        client_a
3478            .user_store
3479            .condition(&cx_a, |user_store, _| {
3480                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3481            })
3482            .await;
3483        client_b
3484            .user_store
3485            .condition(&cx_b, |user_store, _| {
3486                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3487            })
3488            .await;
3489        client_c
3490            .user_store
3491            .condition(&cx_c, |user_store, _| {
3492                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3493            })
3494            .await;
3495
3496        project_a
3497            .condition(&cx_a, |project, _| {
3498                project.collaborators().contains_key(&client_b.peer_id)
3499            })
3500            .await;
3501
3502        cx_a.update(move |_| drop(project_a));
3503        client_a
3504            .user_store
3505            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3506            .await;
3507        client_b
3508            .user_store
3509            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3510            .await;
3511        client_c
3512            .user_store
3513            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3514            .await;
3515
3516        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3517            user_store
3518                .contacts()
3519                .iter()
3520                .map(|contact| {
3521                    let worktrees = contact
3522                        .projects
3523                        .iter()
3524                        .map(|p| {
3525                            (
3526                                p.worktree_root_names[0].as_str(),
3527                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3528                            )
3529                        })
3530                        .collect();
3531                    (contact.user.github_login.as_str(), worktrees)
3532                })
3533                .collect()
3534        }
3535    }
3536
3537    #[gpui::test(iterations = 10)]
3538    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
3539        cx.foreground().forbid_parking();
3540        let max_peers = env::var("MAX_PEERS")
3541            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3542            .unwrap_or(5);
3543        let max_operations = env::var("OPERATIONS")
3544            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3545            .unwrap_or(10);
3546
3547        let rng = Rc::new(RefCell::new(rng));
3548        let lang_registry = Arc::new(LanguageRegistry::new());
3549        let fs = Arc::new(FakeFs::new(cx.background()));
3550        fs.insert_tree(
3551            "/_collab",
3552            json!({
3553                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3554            }),
3555        )
3556        .await;
3557
3558        let operations = Rc::new(Cell::new(0));
3559        let mut server = TestServer::start(cx.foreground()).await;
3560        let mut clients = Vec::new();
3561
3562        let mut next_entity_id = 100000;
3563        let mut host_cx = TestAppContext::new(
3564            cx.foreground_platform(),
3565            cx.platform(),
3566            cx.foreground(),
3567            cx.background(),
3568            cx.font_cache(),
3569            next_entity_id,
3570        );
3571        let host = server.create_client(&mut host_cx, "host").await;
3572        let host_project = host_cx.update(|cx| {
3573            Project::local(
3574                host.client.clone(),
3575                host.user_store.clone(),
3576                lang_registry.clone(),
3577                fs.clone(),
3578                cx,
3579            )
3580        });
3581        let host_project_id = host_project
3582            .update(&mut host_cx, |p, _| p.next_remote_id())
3583            .await;
3584
3585        let (collab_worktree, _) = host_project
3586            .update(&mut host_cx, |project, cx| {
3587                project.find_or_create_local_worktree("/_collab", false, cx)
3588            })
3589            .await
3590            .unwrap();
3591        collab_worktree
3592            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3593            .await;
3594        host_project
3595            .update(&mut host_cx, |project, cx| project.share(cx))
3596            .await
3597            .unwrap();
3598
3599        clients.push(cx.foreground().spawn(host.simulate_host(
3600            host_project,
3601            operations.clone(),
3602            max_operations,
3603            rng.clone(),
3604            host_cx,
3605        )));
3606
3607        while operations.get() < max_operations {
3608            cx.background().simulate_random_delay().await;
3609            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3610                operations.set(operations.get() + 1);
3611
3612                let guest_id = clients.len();
3613                log::info!("Adding guest {}", guest_id);
3614                next_entity_id += 100000;
3615                let mut guest_cx = TestAppContext::new(
3616                    cx.foreground_platform(),
3617                    cx.platform(),
3618                    cx.foreground(),
3619                    cx.background(),
3620                    cx.font_cache(),
3621                    next_entity_id,
3622                );
3623                let guest = server
3624                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3625                    .await;
3626                let guest_project = Project::remote(
3627                    host_project_id,
3628                    guest.client.clone(),
3629                    guest.user_store.clone(),
3630                    lang_registry.clone(),
3631                    fs.clone(),
3632                    &mut guest_cx.to_async(),
3633                )
3634                .await
3635                .unwrap();
3636                clients.push(cx.foreground().spawn(guest.simulate_guest(
3637                    guest_id,
3638                    guest_project,
3639                    operations.clone(),
3640                    max_operations,
3641                    rng.clone(),
3642                    guest_cx,
3643                )));
3644
3645                log::info!("Guest {} added", guest_id);
3646            }
3647        }
3648
3649        let clients = futures::future::join_all(clients).await;
3650        for (ix, (client_a, cx_a)) in clients.iter().enumerate() {
3651            for buffer_a in &client_a.buffers {
3652                let buffer_id = buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id());
3653                for (client_b, cx_b) in &clients[ix + 1..] {
3654                    if let Some(buffer_b) = client_b.buffers.iter().find(|buffer| {
3655                        buffer.read_with(cx_b, |buffer, _| buffer.remote_id() == buffer_id)
3656                    }) {
3657                        assert_eq!(
3658                            buffer_a.read_with(cx_a, |buffer, _| buffer.text()),
3659                            buffer_b.read_with(cx_b, |buffer, _| buffer.text())
3660                        );
3661                    }
3662                }
3663            }
3664        }
3665    }
3666
3667    struct TestServer {
3668        peer: Arc<Peer>,
3669        app_state: Arc<AppState>,
3670        server: Arc<Server>,
3671        foreground: Rc<executor::Foreground>,
3672        notifications: mpsc::Receiver<()>,
3673        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3674        forbid_connections: Arc<AtomicBool>,
3675        _test_db: TestDb,
3676    }
3677
3678    impl TestServer {
3679        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3680            let test_db = TestDb::new();
3681            let app_state = Self::build_app_state(&test_db).await;
3682            let peer = Peer::new();
3683            let notifications = mpsc::channel(128);
3684            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3685            Self {
3686                peer,
3687                app_state,
3688                server,
3689                foreground,
3690                notifications: notifications.1,
3691                connection_killers: Default::default(),
3692                forbid_connections: Default::default(),
3693                _test_db: test_db,
3694            }
3695        }
3696
3697        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3698            let http = FakeHttpClient::with_404_response();
3699            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3700            let client_name = name.to_string();
3701            let mut client = Client::new(http.clone());
3702            let server = self.server.clone();
3703            let connection_killers = self.connection_killers.clone();
3704            let forbid_connections = self.forbid_connections.clone();
3705            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3706
3707            Arc::get_mut(&mut client)
3708                .unwrap()
3709                .override_authenticate(move |cx| {
3710                    cx.spawn(|_| async move {
3711                        let access_token = "the-token".to_string();
3712                        Ok(Credentials {
3713                            user_id: user_id.0 as u64,
3714                            access_token,
3715                        })
3716                    })
3717                })
3718                .override_establish_connection(move |credentials, cx| {
3719                    assert_eq!(credentials.user_id, user_id.0 as u64);
3720                    assert_eq!(credentials.access_token, "the-token");
3721
3722                    let server = server.clone();
3723                    let connection_killers = connection_killers.clone();
3724                    let forbid_connections = forbid_connections.clone();
3725                    let client_name = client_name.clone();
3726                    let connection_id_tx = connection_id_tx.clone();
3727                    cx.spawn(move |cx| async move {
3728                        if forbid_connections.load(SeqCst) {
3729                            Err(EstablishConnectionError::other(anyhow!(
3730                                "server is forbidding connections"
3731                            )))
3732                        } else {
3733                            let (client_conn, server_conn, kill_conn) =
3734                                Connection::in_memory(cx.background());
3735                            connection_killers.lock().insert(user_id, kill_conn);
3736                            cx.background()
3737                                .spawn(server.handle_connection(
3738                                    server_conn,
3739                                    client_name,
3740                                    user_id,
3741                                    Some(connection_id_tx),
3742                                    cx.background(),
3743                                ))
3744                                .detach();
3745                            Ok(client_conn)
3746                        }
3747                    })
3748                });
3749
3750            client
3751                .authenticate_and_connect(&cx.to_async())
3752                .await
3753                .unwrap();
3754
3755            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3756            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3757            let mut authed_user =
3758                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3759            while authed_user.next().await.unwrap().is_none() {}
3760
3761            TestClient {
3762                client,
3763                peer_id,
3764                user_store,
3765                project: Default::default(),
3766                buffers: Default::default(),
3767            }
3768        }
3769
3770        fn disconnect_client(&self, user_id: UserId) {
3771            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3772                let _ = kill_conn.try_send(Some(()));
3773            }
3774        }
3775
3776        fn forbid_connections(&self) {
3777            self.forbid_connections.store(true, SeqCst);
3778        }
3779
3780        fn allow_connections(&self) {
3781            self.forbid_connections.store(false, SeqCst);
3782        }
3783
3784        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3785            let mut config = Config::default();
3786            config.session_secret = "a".repeat(32);
3787            config.database_url = test_db.url.clone();
3788            let github_client = github::AppClient::test();
3789            Arc::new(AppState {
3790                db: test_db.db().clone(),
3791                handlebars: Default::default(),
3792                auth_client: auth::build_client("", ""),
3793                repo_client: github::RepoClient::test(&github_client),
3794                github_client,
3795                config,
3796            })
3797        }
3798
3799        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3800            self.server.store.read()
3801        }
3802
3803        async fn condition<F>(&mut self, mut predicate: F)
3804        where
3805            F: FnMut(&Store) -> bool,
3806        {
3807            async_std::future::timeout(Duration::from_millis(500), async {
3808                while !(predicate)(&*self.server.store.read()) {
3809                    self.foreground.start_waiting();
3810                    self.notifications.next().await;
3811                    self.foreground.finish_waiting();
3812                }
3813            })
3814            .await
3815            .expect("condition timed out");
3816        }
3817    }
3818
3819    impl Drop for TestServer {
3820        fn drop(&mut self) {
3821            self.peer.reset();
3822        }
3823    }
3824
3825    struct TestClient {
3826        client: Arc<Client>,
3827        pub peer_id: PeerId,
3828        pub user_store: ModelHandle<UserStore>,
3829        project: Option<ModelHandle<Project>>,
3830        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3831    }
3832
3833    impl Deref for TestClient {
3834        type Target = Arc<Client>;
3835
3836        fn deref(&self) -> &Self::Target {
3837            &self.client
3838        }
3839    }
3840
3841    impl TestClient {
3842        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3843            UserId::from_proto(
3844                self.user_store
3845                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3846            )
3847        }
3848
3849        async fn simulate_host(
3850            mut self,
3851            project: ModelHandle<Project>,
3852            operations: Rc<Cell<usize>>,
3853            max_operations: usize,
3854            rng: Rc<RefCell<StdRng>>,
3855            mut cx: TestAppContext,
3856        ) -> (Self, TestAppContext) {
3857            let fs = project.read_with(&cx, |project, _| project.fs().clone());
3858            let mut files: Vec<PathBuf> = Default::default();
3859            while operations.get() < max_operations {
3860                operations.set(operations.get() + 1);
3861
3862                let distribution = rng.borrow_mut().gen_range(0..100);
3863                match distribution {
3864                    0..=20 if !files.is_empty() => {
3865                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
3866                        while let Some(parent_path) = path.parent() {
3867                            path = parent_path;
3868                            if rng.borrow_mut().gen() {
3869                                break;
3870                            }
3871                        }
3872
3873                        log::info!("Host: find/create local worktree {:?}", path);
3874                        project
3875                            .update(&mut cx, |project, cx| {
3876                                project.find_or_create_local_worktree(path, false, cx)
3877                            })
3878                            .await
3879                            .unwrap();
3880                    }
3881                    10..=80 if !files.is_empty() => {
3882                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
3883                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
3884                            let (worktree, path) = project
3885                                .update(&mut cx, |project, cx| {
3886                                    project.find_or_create_local_worktree(file, false, cx)
3887                                })
3888                                .await
3889                                .unwrap();
3890                            let project_path =
3891                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
3892                            log::info!("Host: opening path {:?}", project_path);
3893                            let buffer = project
3894                                .update(&mut cx, |project, cx| {
3895                                    project.open_buffer(project_path, cx)
3896                                })
3897                                .await
3898                                .unwrap();
3899                            self.buffers.insert(buffer.clone());
3900                            buffer
3901                        } else {
3902                            self.buffers
3903                                .iter()
3904                                .choose(&mut *rng.borrow_mut())
3905                                .unwrap()
3906                                .clone()
3907                        };
3908
3909                        buffer.update(&mut cx, |buffer, cx| {
3910                            log::info!(
3911                                "Host: updating buffer {:?}",
3912                                buffer.file().unwrap().full_path(cx)
3913                            );
3914                            buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
3915                        });
3916                    }
3917                    _ => loop {
3918                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
3919                        let mut path = PathBuf::new();
3920                        path.push("/");
3921                        for _ in 0..path_component_count {
3922                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
3923                            path.push(std::str::from_utf8(&[letter]).unwrap());
3924                        }
3925                        let parent_path = path.parent().unwrap();
3926
3927                        log::info!("Host: creating file {:?}", path);
3928                        if fs.create_dir(&parent_path).await.is_ok()
3929                            && fs.create_file(&path, Default::default()).await.is_ok()
3930                        {
3931                            files.push(path);
3932                            break;
3933                        } else {
3934                            log::info!("Host: cannot create file");
3935                        }
3936                    },
3937                }
3938
3939                cx.background().simulate_random_delay().await;
3940            }
3941
3942            self.project = Some(project);
3943            (self, cx)
3944        }
3945
3946        pub async fn simulate_guest(
3947            mut self,
3948            guest_id: usize,
3949            project: ModelHandle<Project>,
3950            operations: Rc<Cell<usize>>,
3951            max_operations: usize,
3952            rng: Rc<RefCell<StdRng>>,
3953            mut cx: TestAppContext,
3954        ) -> (Self, TestAppContext) {
3955            while operations.get() < max_operations {
3956                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
3957                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
3958                        project
3959                            .worktrees(&cx)
3960                            .filter(|worktree| {
3961                                worktree.read(cx).entries(false).any(|e| e.is_file())
3962                            })
3963                            .choose(&mut *rng.borrow_mut())
3964                    }) {
3965                        worktree
3966                    } else {
3967                        cx.background().simulate_random_delay().await;
3968                        continue;
3969                    };
3970
3971                    operations.set(operations.get() + 1);
3972                    let project_path = worktree.read_with(&cx, |worktree, _| {
3973                        let entry = worktree
3974                            .entries(false)
3975                            .filter(|e| e.is_file())
3976                            .choose(&mut *rng.borrow_mut())
3977                            .unwrap();
3978                        (worktree.id(), entry.path.clone())
3979                    });
3980                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
3981                    let buffer = project
3982                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
3983                        .await
3984                        .unwrap();
3985                    self.buffers.insert(buffer.clone());
3986                    buffer
3987                } else {
3988                    self.buffers
3989                        .iter()
3990                        .choose(&mut *rng.borrow_mut())
3991                        .unwrap()
3992                        .clone()
3993                };
3994
3995                buffer.update(&mut cx, |buffer, cx| {
3996                    log::info!(
3997                        "Guest {}: updating buffer {:?}",
3998                        guest_id,
3999                        buffer.file().unwrap().full_path(cx)
4000                    );
4001                    buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4002                });
4003
4004                cx.background().simulate_random_delay().await;
4005            }
4006
4007            self.project = Some(project);
4008            (self, cx)
4009        }
4010    }
4011
4012    impl Executor for Arc<gpui::executor::Background> {
4013        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4014            self.spawn(future).detach();
4015        }
4016    }
4017
4018    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4019        channel
4020            .messages()
4021            .cursor::<()>()
4022            .map(|m| {
4023                (
4024                    m.sender.github_login.clone(),
4025                    m.body.clone(),
4026                    m.is_pending(),
4027                )
4028            })
4029            .collect()
4030    }
4031
4032    struct EmptyView;
4033
4034    impl gpui::Entity for EmptyView {
4035        type Event = ();
4036    }
4037
4038    impl gpui::View for EmptyView {
4039        fn ui_name() -> &'static str {
4040            "empty view"
4041        }
4042
4043        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4044            gpui::Element::boxed(gpui::elements::Empty)
4045        }
4046    }
4047}