rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44pub trait Executor {
  45    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  46}
  47
  48pub struct RealExecutor;
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_request_handler(Server::ping)
  69            .add_request_handler(Server::register_project)
  70            .add_message_handler(Server::unregister_project)
  71            .add_request_handler(Server::share_project)
  72            .add_message_handler(Server::unshare_project)
  73            .add_request_handler(Server::join_project)
  74            .add_message_handler(Server::leave_project)
  75            .add_request_handler(Server::register_worktree)
  76            .add_message_handler(Server::unregister_worktree)
  77            .add_request_handler(Server::share_worktree)
  78            .add_request_handler(Server::update_worktree)
  79            .add_message_handler(Server::update_diagnostic_summary)
  80            .add_message_handler(Server::disk_based_diagnostics_updating)
  81            .add_message_handler(Server::disk_based_diagnostics_updated)
  82            .add_request_handler(Server::get_definition)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::get_channels)
  96            .add_request_handler(Server::get_users)
  97            .add_request_handler(Server::join_channel)
  98            .add_message_handler(Server::leave_channel)
  99            .add_request_handler(Server::send_channel_message)
 100            .add_request_handler(Server::get_channel_messages);
 101
 102        Arc::new(server)
 103    }
 104
 105    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 106    where
 107        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 108        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 109        M: EnvelopedMessage,
 110    {
 111        let prev_handler = self.handlers.insert(
 112            TypeId::of::<M>(),
 113            Box::new(move |server, envelope| {
 114                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 115                (handler)(server, *envelope).boxed()
 116            }),
 117        );
 118        if prev_handler.is_some() {
 119            panic!("registered a handler for the same message twice");
 120        }
 121        self
 122    }
 123
 124    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 128        M: RequestMessage,
 129    {
 130        self.add_message_handler(move |server, envelope| {
 131            let receipt = envelope.receipt();
 132            let response = (handler)(server.clone(), envelope);
 133            async move {
 134                match response.await {
 135                    Ok(response) => {
 136                        server.peer.respond(receipt, response)?;
 137                        Ok(())
 138                    }
 139                    Err(error) => {
 140                        server.peer.respond_with_error(
 141                            receipt,
 142                            proto::Error {
 143                                message: error.to_string(),
 144                            },
 145                        )?;
 146                        Err(error)
 147                    }
 148                }
 149            }
 150        })
 151    }
 152
 153    pub fn handle_connection<E: Executor>(
 154        self: &Arc<Self>,
 155        connection: Connection,
 156        addr: String,
 157        user_id: UserId,
 158        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 159        executor: E,
 160    ) -> impl Future<Output = ()> {
 161        let mut this = self.clone();
 162        async move {
 163            let (connection_id, handle_io, mut incoming_rx) =
 164                this.peer.add_connection(connection).await;
 165
 166            if let Some(send_connection_id) = send_connection_id.as_mut() {
 167                let _ = send_connection_id.send(connection_id).await;
 168            }
 169
 170            this.state_mut().add_connection(connection_id, user_id);
 171            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 172                log::error!("error updating contacts for {:?}: {}", user_id, err);
 173            }
 174
 175            let handle_io = handle_io.fuse();
 176            futures::pin_mut!(handle_io);
 177            loop {
 178                let next_message = incoming_rx.next().fuse();
 179                futures::pin_mut!(next_message);
 180                futures::select_biased! {
 181                    result = handle_io => {
 182                        if let Err(err) = result {
 183                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 184                        }
 185                        break;
 186                    }
 187                    message = next_message => {
 188                        if let Some(message) = message {
 189                            let start_time = Instant::now();
 190                            let type_name = message.payload_type_name();
 191                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 192                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 193                                let handle_message = (handler)(this.clone(), message);
 194                                let notifications = this.notifications.clone();
 195                                executor.spawn_detached(async move {
 196                                    if let Err(err) = handle_message.await {
 197                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 198                                    } else {
 199                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 200                                    }
 201                                    if let Some(mut notifications) = notifications {
 202                                        let _ = notifications.send(()).await;
 203                                    }
 204                                });
 205                            } else {
 206                                log::warn!("unhandled message: {}", type_name);
 207                            }
 208                        } else {
 209                            log::info!("rpc connection closed {:?}", addr);
 210                            break;
 211                        }
 212                    }
 213                }
 214            }
 215
 216            if let Err(err) = this.sign_out(connection_id).await {
 217                log::error!("error signing out connection {:?} - {:?}", addr, err);
 218            }
 219        }
 220    }
 221
 222    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 223        self.peer.disconnect(connection_id);
 224        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 225
 226        for (project_id, project) in removed_connection.hosted_projects {
 227            if let Some(share) = project.share {
 228                broadcast(
 229                    connection_id,
 230                    share.guests.keys().copied().collect(),
 231                    |conn_id| {
 232                        self.peer
 233                            .send(conn_id, proto::UnshareProject { project_id })
 234                    },
 235                )?;
 236            }
 237        }
 238
 239        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 240            broadcast(connection_id, peer_ids, |conn_id| {
 241                self.peer.send(
 242                    conn_id,
 243                    proto::RemoveProjectCollaborator {
 244                        project_id,
 245                        peer_id: connection_id.0,
 246                    },
 247                )
 248            })?;
 249        }
 250
 251        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 252        Ok(())
 253    }
 254
 255    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 256        Ok(proto::Ack {})
 257    }
 258
 259    async fn register_project(
 260        mut self: Arc<Server>,
 261        request: TypedEnvelope<proto::RegisterProject>,
 262    ) -> tide::Result<proto::RegisterProjectResponse> {
 263        let project_id = {
 264            let mut state = self.state_mut();
 265            let user_id = state.user_id_for_connection(request.sender_id)?;
 266            state.register_project(request.sender_id, user_id)
 267        };
 268        Ok(proto::RegisterProjectResponse { project_id })
 269    }
 270
 271    async fn unregister_project(
 272        mut self: Arc<Server>,
 273        request: TypedEnvelope<proto::UnregisterProject>,
 274    ) -> tide::Result<()> {
 275        let project = self
 276            .state_mut()
 277            .unregister_project(request.payload.project_id, request.sender_id)?;
 278        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 279        Ok(())
 280    }
 281
 282    async fn share_project(
 283        mut self: Arc<Server>,
 284        request: TypedEnvelope<proto::ShareProject>,
 285    ) -> tide::Result<proto::Ack> {
 286        self.state_mut()
 287            .share_project(request.payload.project_id, request.sender_id);
 288        Ok(proto::Ack {})
 289    }
 290
 291    async fn unshare_project(
 292        mut self: Arc<Server>,
 293        request: TypedEnvelope<proto::UnshareProject>,
 294    ) -> tide::Result<()> {
 295        let project_id = request.payload.project_id;
 296        let project = self
 297            .state_mut()
 298            .unshare_project(project_id, request.sender_id)?;
 299
 300        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 301            self.peer
 302                .send(conn_id, proto::UnshareProject { project_id })
 303        })?;
 304        self.update_contacts_for_users(&project.authorized_user_ids)?;
 305        Ok(())
 306    }
 307
 308    async fn join_project(
 309        mut self: Arc<Server>,
 310        request: TypedEnvelope<proto::JoinProject>,
 311    ) -> tide::Result<proto::JoinProjectResponse> {
 312        let project_id = request.payload.project_id;
 313
 314        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 315        let (response, connection_ids, contact_user_ids) = self
 316            .state_mut()
 317            .join_project(request.sender_id, user_id, project_id)
 318            .and_then(|joined| {
 319                let share = joined.project.share()?;
 320                let peer_count = share.guests.len();
 321                let mut collaborators = Vec::with_capacity(peer_count);
 322                collaborators.push(proto::Collaborator {
 323                    peer_id: joined.project.host_connection_id.0,
 324                    replica_id: 0,
 325                    user_id: joined.project.host_user_id.to_proto(),
 326                });
 327                let worktrees = joined
 328                    .project
 329                    .worktrees
 330                    .iter()
 331                    .filter_map(|(id, worktree)| {
 332                        worktree.share.as_ref().map(|share| proto::Worktree {
 333                            id: *id,
 334                            root_name: worktree.root_name.clone(),
 335                            entries: share.entries.values().cloned().collect(),
 336                            diagnostic_summaries: share
 337                                .diagnostic_summaries
 338                                .values()
 339                                .cloned()
 340                                .collect(),
 341                            weak: worktree.weak,
 342                            next_update_id: share.next_update_id as u64,
 343                        })
 344                    })
 345                    .collect();
 346                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 347                    if *peer_conn_id != request.sender_id {
 348                        collaborators.push(proto::Collaborator {
 349                            peer_id: peer_conn_id.0,
 350                            replica_id: *peer_replica_id as u32,
 351                            user_id: peer_user_id.to_proto(),
 352                        });
 353                    }
 354                }
 355                let response = proto::JoinProjectResponse {
 356                    worktrees,
 357                    replica_id: joined.replica_id as u32,
 358                    collaborators,
 359                };
 360                let connection_ids = joined.project.connection_ids();
 361                let contact_user_ids = joined.project.authorized_user_ids();
 362                Ok((response, connection_ids, contact_user_ids))
 363            })?;
 364
 365        broadcast(request.sender_id, connection_ids, |conn_id| {
 366            self.peer.send(
 367                conn_id,
 368                proto::AddProjectCollaborator {
 369                    project_id,
 370                    collaborator: Some(proto::Collaborator {
 371                        peer_id: request.sender_id.0,
 372                        replica_id: response.replica_id,
 373                        user_id: user_id.to_proto(),
 374                    }),
 375                },
 376            )
 377        })?;
 378        self.update_contacts_for_users(&contact_user_ids)?;
 379        Ok(response)
 380    }
 381
 382    async fn leave_project(
 383        mut self: Arc<Server>,
 384        request: TypedEnvelope<proto::LeaveProject>,
 385    ) -> tide::Result<()> {
 386        let sender_id = request.sender_id;
 387        let project_id = request.payload.project_id;
 388        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 389
 390        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 391            self.peer.send(
 392                conn_id,
 393                proto::RemoveProjectCollaborator {
 394                    project_id,
 395                    peer_id: sender_id.0,
 396                },
 397            )
 398        })?;
 399        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 400
 401        Ok(())
 402    }
 403
 404    async fn register_worktree(
 405        mut self: Arc<Server>,
 406        request: TypedEnvelope<proto::RegisterWorktree>,
 407    ) -> tide::Result<proto::Ack> {
 408        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 409
 410        let mut contact_user_ids = HashSet::default();
 411        contact_user_ids.insert(host_user_id);
 412        for github_login in request.payload.authorized_logins {
 413            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 414            contact_user_ids.insert(contact_user_id);
 415        }
 416
 417        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 418        self.state_mut().register_worktree(
 419            request.payload.project_id,
 420            request.payload.worktree_id,
 421            request.sender_id,
 422            Worktree {
 423                authorized_user_ids: contact_user_ids.clone(),
 424                root_name: request.payload.root_name,
 425                share: None,
 426                weak: false,
 427            },
 428        )?;
 429        self.update_contacts_for_users(&contact_user_ids)?;
 430        Ok(proto::Ack {})
 431    }
 432
 433    async fn unregister_worktree(
 434        mut self: Arc<Server>,
 435        request: TypedEnvelope<proto::UnregisterWorktree>,
 436    ) -> tide::Result<()> {
 437        let project_id = request.payload.project_id;
 438        let worktree_id = request.payload.worktree_id;
 439        let (worktree, guest_connection_ids) =
 440            self.state_mut()
 441                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 442        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 443            self.peer.send(
 444                conn_id,
 445                proto::UnregisterWorktree {
 446                    project_id,
 447                    worktree_id,
 448                },
 449            )
 450        })?;
 451        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 452        Ok(())
 453    }
 454
 455    async fn share_worktree(
 456        mut self: Arc<Server>,
 457        mut request: TypedEnvelope<proto::ShareWorktree>,
 458    ) -> tide::Result<proto::Ack> {
 459        let worktree = request
 460            .payload
 461            .worktree
 462            .as_mut()
 463            .ok_or_else(|| anyhow!("missing worktree"))?;
 464        let entries = worktree
 465            .entries
 466            .iter()
 467            .map(|entry| (entry.id, entry.clone()))
 468            .collect();
 469        let diagnostic_summaries = worktree
 470            .diagnostic_summaries
 471            .iter()
 472            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 473            .collect();
 474
 475        let shared_worktree = self.state_mut().share_worktree(
 476            request.payload.project_id,
 477            worktree.id,
 478            request.sender_id,
 479            entries,
 480            diagnostic_summaries,
 481            worktree.next_update_id,
 482        )?;
 483
 484        broadcast(
 485            request.sender_id,
 486            shared_worktree.connection_ids,
 487            |connection_id| {
 488                self.peer
 489                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 490            },
 491        )?;
 492        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 493
 494        Ok(proto::Ack {})
 495    }
 496
 497    async fn update_worktree(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateWorktree>,
 500    ) -> tide::Result<proto::Ack> {
 501        let connection_ids = self.state_mut().update_worktree(
 502            request.sender_id,
 503            request.payload.project_id,
 504            request.payload.worktree_id,
 505            request.payload.id,
 506            &request.payload.removed_entries,
 507            &request.payload.updated_entries,
 508        )?;
 509
 510        broadcast(request.sender_id, connection_ids, |connection_id| {
 511            self.peer
 512                .forward_send(request.sender_id, connection_id, request.payload.clone())
 513        })?;
 514
 515        Ok(proto::Ack {})
 516    }
 517
 518    async fn update_diagnostic_summary(
 519        mut self: Arc<Server>,
 520        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 521    ) -> tide::Result<()> {
 522        let summary = request
 523            .payload
 524            .summary
 525            .clone()
 526            .ok_or_else(|| anyhow!("invalid summary"))?;
 527        let receiver_ids = self.state_mut().update_diagnostic_summary(
 528            request.payload.project_id,
 529            request.payload.worktree_id,
 530            request.sender_id,
 531            summary,
 532        )?;
 533
 534        broadcast(request.sender_id, receiver_ids, |connection_id| {
 535            self.peer
 536                .forward_send(request.sender_id, connection_id, request.payload.clone())
 537        })?;
 538        Ok(())
 539    }
 540
 541    async fn disk_based_diagnostics_updating(
 542        self: Arc<Server>,
 543        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 544    ) -> tide::Result<()> {
 545        let receiver_ids = self
 546            .state()
 547            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 548        broadcast(request.sender_id, receiver_ids, |connection_id| {
 549            self.peer
 550                .forward_send(request.sender_id, connection_id, request.payload.clone())
 551        })?;
 552        Ok(())
 553    }
 554
 555    async fn disk_based_diagnostics_updated(
 556        self: Arc<Server>,
 557        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 558    ) -> tide::Result<()> {
 559        let receiver_ids = self
 560            .state()
 561            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 562        broadcast(request.sender_id, receiver_ids, |connection_id| {
 563            self.peer
 564                .forward_send(request.sender_id, connection_id, request.payload.clone())
 565        })?;
 566        Ok(())
 567    }
 568
 569    async fn get_definition(
 570        self: Arc<Server>,
 571        request: TypedEnvelope<proto::GetDefinition>,
 572    ) -> tide::Result<proto::GetDefinitionResponse> {
 573        let host_connection_id = self
 574            .state()
 575            .read_project(request.payload.project_id, request.sender_id)?
 576            .host_connection_id;
 577        Ok(self
 578            .peer
 579            .forward_request(request.sender_id, host_connection_id, request.payload)
 580            .await?)
 581    }
 582
 583    async fn open_buffer(
 584        self: Arc<Server>,
 585        request: TypedEnvelope<proto::OpenBuffer>,
 586    ) -> tide::Result<proto::OpenBufferResponse> {
 587        let host_connection_id = self
 588            .state()
 589            .read_project(request.payload.project_id, request.sender_id)?
 590            .host_connection_id;
 591        Ok(self
 592            .peer
 593            .forward_request(request.sender_id, host_connection_id, request.payload)
 594            .await?)
 595    }
 596
 597    async fn close_buffer(
 598        self: Arc<Server>,
 599        request: TypedEnvelope<proto::CloseBuffer>,
 600    ) -> tide::Result<()> {
 601        let host_connection_id = self
 602            .state()
 603            .read_project(request.payload.project_id, request.sender_id)?
 604            .host_connection_id;
 605        self.peer
 606            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 607        Ok(())
 608    }
 609
 610    async fn save_buffer(
 611        self: Arc<Server>,
 612        request: TypedEnvelope<proto::SaveBuffer>,
 613    ) -> tide::Result<proto::BufferSaved> {
 614        let host;
 615        let mut guests;
 616        {
 617            let state = self.state();
 618            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 619            host = project.host_connection_id;
 620            guests = project.guest_connection_ids()
 621        }
 622
 623        let response = self
 624            .peer
 625            .forward_request(request.sender_id, host, request.payload.clone())
 626            .await?;
 627
 628        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 629        broadcast(host, guests, |conn_id| {
 630            self.peer.forward_send(host, conn_id, response.clone())
 631        })?;
 632
 633        Ok(response)
 634    }
 635
 636    async fn format_buffers(
 637        self: Arc<Server>,
 638        request: TypedEnvelope<proto::FormatBuffers>,
 639    ) -> tide::Result<proto::FormatBuffersResponse> {
 640        let host = self
 641            .state()
 642            .read_project(request.payload.project_id, request.sender_id)?
 643            .host_connection_id;
 644        Ok(self
 645            .peer
 646            .forward_request(request.sender_id, host, request.payload.clone())
 647            .await?)
 648    }
 649
 650    async fn get_completions(
 651        self: Arc<Server>,
 652        request: TypedEnvelope<proto::GetCompletions>,
 653    ) -> tide::Result<proto::GetCompletionsResponse> {
 654        let host = self
 655            .state()
 656            .read_project(request.payload.project_id, request.sender_id)?
 657            .host_connection_id;
 658        Ok(self
 659            .peer
 660            .forward_request(request.sender_id, host, request.payload.clone())
 661            .await?)
 662    }
 663
 664    async fn apply_additional_edits_for_completion(
 665        self: Arc<Server>,
 666        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 667    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 668        let host = self
 669            .state()
 670            .read_project(request.payload.project_id, request.sender_id)?
 671            .host_connection_id;
 672        Ok(self
 673            .peer
 674            .forward_request(request.sender_id, host, request.payload.clone())
 675            .await?)
 676    }
 677
 678    async fn get_code_actions(
 679        self: Arc<Server>,
 680        request: TypedEnvelope<proto::GetCodeActions>,
 681    ) -> tide::Result<proto::GetCodeActionsResponse> {
 682        let host = self
 683            .state()
 684            .read_project(request.payload.project_id, request.sender_id)?
 685            .host_connection_id;
 686        Ok(self
 687            .peer
 688            .forward_request(request.sender_id, host, request.payload.clone())
 689            .await?)
 690    }
 691
 692    async fn apply_code_action(
 693        self: Arc<Server>,
 694        request: TypedEnvelope<proto::ApplyCodeAction>,
 695    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 696        let host = self
 697            .state()
 698            .read_project(request.payload.project_id, request.sender_id)?
 699            .host_connection_id;
 700        Ok(self
 701            .peer
 702            .forward_request(request.sender_id, host, request.payload.clone())
 703            .await?)
 704    }
 705
 706    async fn update_buffer(
 707        self: Arc<Server>,
 708        request: TypedEnvelope<proto::UpdateBuffer>,
 709    ) -> tide::Result<proto::Ack> {
 710        let receiver_ids = self
 711            .state()
 712            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 713        broadcast(request.sender_id, receiver_ids, |connection_id| {
 714            self.peer
 715                .forward_send(request.sender_id, connection_id, request.payload.clone())
 716        })?;
 717        Ok(proto::Ack {})
 718    }
 719
 720    async fn update_buffer_file(
 721        self: Arc<Server>,
 722        request: TypedEnvelope<proto::UpdateBufferFile>,
 723    ) -> tide::Result<()> {
 724        let receiver_ids = self
 725            .state()
 726            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 727        broadcast(request.sender_id, receiver_ids, |connection_id| {
 728            self.peer
 729                .forward_send(request.sender_id, connection_id, request.payload.clone())
 730        })?;
 731        Ok(())
 732    }
 733
 734    async fn buffer_reloaded(
 735        self: Arc<Server>,
 736        request: TypedEnvelope<proto::BufferReloaded>,
 737    ) -> tide::Result<()> {
 738        let receiver_ids = self
 739            .state()
 740            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 741        broadcast(request.sender_id, receiver_ids, |connection_id| {
 742            self.peer
 743                .forward_send(request.sender_id, connection_id, request.payload.clone())
 744        })?;
 745        Ok(())
 746    }
 747
 748    async fn buffer_saved(
 749        self: Arc<Server>,
 750        request: TypedEnvelope<proto::BufferSaved>,
 751    ) -> tide::Result<()> {
 752        let receiver_ids = self
 753            .state()
 754            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 755        broadcast(request.sender_id, receiver_ids, |connection_id| {
 756            self.peer
 757                .forward_send(request.sender_id, connection_id, request.payload.clone())
 758        })?;
 759        Ok(())
 760    }
 761
 762    async fn get_channels(
 763        self: Arc<Server>,
 764        request: TypedEnvelope<proto::GetChannels>,
 765    ) -> tide::Result<proto::GetChannelsResponse> {
 766        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 767        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 768        Ok(proto::GetChannelsResponse {
 769            channels: channels
 770                .into_iter()
 771                .map(|chan| proto::Channel {
 772                    id: chan.id.to_proto(),
 773                    name: chan.name,
 774                })
 775                .collect(),
 776        })
 777    }
 778
 779    async fn get_users(
 780        self: Arc<Server>,
 781        request: TypedEnvelope<proto::GetUsers>,
 782    ) -> tide::Result<proto::GetUsersResponse> {
 783        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 784        let users = self
 785            .app_state
 786            .db
 787            .get_users_by_ids(user_ids)
 788            .await?
 789            .into_iter()
 790            .map(|user| proto::User {
 791                id: user.id.to_proto(),
 792                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 793                github_login: user.github_login,
 794            })
 795            .collect();
 796        Ok(proto::GetUsersResponse { users })
 797    }
 798
 799    fn update_contacts_for_users<'a>(
 800        self: &Arc<Server>,
 801        user_ids: impl IntoIterator<Item = &'a UserId>,
 802    ) -> anyhow::Result<()> {
 803        let mut result = Ok(());
 804        let state = self.state();
 805        for user_id in user_ids {
 806            let contacts = state.contacts_for_user(*user_id);
 807            for connection_id in state.connection_ids_for_user(*user_id) {
 808                if let Err(error) = self.peer.send(
 809                    connection_id,
 810                    proto::UpdateContacts {
 811                        contacts: contacts.clone(),
 812                    },
 813                ) {
 814                    result = Err(error);
 815                }
 816            }
 817        }
 818        result
 819    }
 820
 821    async fn join_channel(
 822        mut self: Arc<Self>,
 823        request: TypedEnvelope<proto::JoinChannel>,
 824    ) -> tide::Result<proto::JoinChannelResponse> {
 825        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 826        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 827        if !self
 828            .app_state
 829            .db
 830            .can_user_access_channel(user_id, channel_id)
 831            .await?
 832        {
 833            Err(anyhow!("access denied"))?;
 834        }
 835
 836        self.state_mut().join_channel(request.sender_id, channel_id);
 837        let messages = self
 838            .app_state
 839            .db
 840            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 841            .await?
 842            .into_iter()
 843            .map(|msg| proto::ChannelMessage {
 844                id: msg.id.to_proto(),
 845                body: msg.body,
 846                timestamp: msg.sent_at.unix_timestamp() as u64,
 847                sender_id: msg.sender_id.to_proto(),
 848                nonce: Some(msg.nonce.as_u128().into()),
 849            })
 850            .collect::<Vec<_>>();
 851        Ok(proto::JoinChannelResponse {
 852            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 853            messages,
 854        })
 855    }
 856
 857    async fn leave_channel(
 858        mut self: Arc<Self>,
 859        request: TypedEnvelope<proto::LeaveChannel>,
 860    ) -> tide::Result<()> {
 861        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 862        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 863        if !self
 864            .app_state
 865            .db
 866            .can_user_access_channel(user_id, channel_id)
 867            .await?
 868        {
 869            Err(anyhow!("access denied"))?;
 870        }
 871
 872        self.state_mut()
 873            .leave_channel(request.sender_id, channel_id);
 874
 875        Ok(())
 876    }
 877
 878    async fn send_channel_message(
 879        self: Arc<Self>,
 880        request: TypedEnvelope<proto::SendChannelMessage>,
 881    ) -> tide::Result<proto::SendChannelMessageResponse> {
 882        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 883        let user_id;
 884        let connection_ids;
 885        {
 886            let state = self.state();
 887            user_id = state.user_id_for_connection(request.sender_id)?;
 888            connection_ids = state.channel_connection_ids(channel_id)?;
 889        }
 890
 891        // Validate the message body.
 892        let body = request.payload.body.trim().to_string();
 893        if body.len() > MAX_MESSAGE_LEN {
 894            return Err(anyhow!("message is too long"))?;
 895        }
 896        if body.is_empty() {
 897            return Err(anyhow!("message can't be blank"))?;
 898        }
 899
 900        let timestamp = OffsetDateTime::now_utc();
 901        let nonce = request
 902            .payload
 903            .nonce
 904            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 905
 906        let message_id = self
 907            .app_state
 908            .db
 909            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 910            .await?
 911            .to_proto();
 912        let message = proto::ChannelMessage {
 913            sender_id: user_id.to_proto(),
 914            id: message_id,
 915            body,
 916            timestamp: timestamp.unix_timestamp() as u64,
 917            nonce: Some(nonce),
 918        };
 919        broadcast(request.sender_id, connection_ids, |conn_id| {
 920            self.peer.send(
 921                conn_id,
 922                proto::ChannelMessageSent {
 923                    channel_id: channel_id.to_proto(),
 924                    message: Some(message.clone()),
 925                },
 926            )
 927        })?;
 928        Ok(proto::SendChannelMessageResponse {
 929            message: Some(message),
 930        })
 931    }
 932
 933    async fn get_channel_messages(
 934        self: Arc<Self>,
 935        request: TypedEnvelope<proto::GetChannelMessages>,
 936    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 937        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 938        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 939        if !self
 940            .app_state
 941            .db
 942            .can_user_access_channel(user_id, channel_id)
 943            .await?
 944        {
 945            Err(anyhow!("access denied"))?;
 946        }
 947
 948        let messages = self
 949            .app_state
 950            .db
 951            .get_channel_messages(
 952                channel_id,
 953                MESSAGE_COUNT_PER_PAGE,
 954                Some(MessageId::from_proto(request.payload.before_message_id)),
 955            )
 956            .await?
 957            .into_iter()
 958            .map(|msg| proto::ChannelMessage {
 959                id: msg.id.to_proto(),
 960                body: msg.body,
 961                timestamp: msg.sent_at.unix_timestamp() as u64,
 962                sender_id: msg.sender_id.to_proto(),
 963                nonce: Some(msg.nonce.as_u128().into()),
 964            })
 965            .collect::<Vec<_>>();
 966
 967        Ok(proto::GetChannelMessagesResponse {
 968            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 969            messages,
 970        })
 971    }
 972
 973    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 974        self.store.read()
 975    }
 976
 977    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 978        self.store.write()
 979    }
 980}
 981
 982impl Executor for RealExecutor {
 983    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 984        task::spawn(future);
 985    }
 986}
 987
 988fn broadcast<F>(
 989    sender_id: ConnectionId,
 990    receiver_ids: Vec<ConnectionId>,
 991    mut f: F,
 992) -> anyhow::Result<()>
 993where
 994    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 995{
 996    let mut result = Ok(());
 997    for receiver_id in receiver_ids {
 998        if receiver_id != sender_id {
 999            if let Err(error) = f(receiver_id) {
1000                if result.is_ok() {
1001                    result = Err(error);
1002                }
1003            }
1004        }
1005    }
1006    result
1007}
1008
1009pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1010    let server = Server::new(app.state().clone(), rpc.clone(), None);
1011    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1012        let server = server.clone();
1013        async move {
1014            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1015
1016            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1017            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1018            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1019            let client_protocol_version: Option<u32> = request
1020                .header("X-Zed-Protocol-Version")
1021                .and_then(|v| v.as_str().parse().ok());
1022
1023            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1024                return Ok(Response::new(StatusCode::UpgradeRequired));
1025            }
1026
1027            let header = match request.header("Sec-Websocket-Key") {
1028                Some(h) => h.as_str(),
1029                None => return Err(anyhow!("expected sec-websocket-key"))?,
1030            };
1031
1032            let user_id = process_auth_header(&request).await?;
1033
1034            let mut response = Response::new(StatusCode::SwitchingProtocols);
1035            response.insert_header(UPGRADE, "websocket");
1036            response.insert_header(CONNECTION, "Upgrade");
1037            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1038            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1039            response.insert_header("Sec-Websocket-Version", "13");
1040
1041            let http_res: &mut tide::http::Response = response.as_mut();
1042            let upgrade_receiver = http_res.recv_upgrade().await;
1043            let addr = request.remote().unwrap_or("unknown").to_string();
1044            task::spawn(async move {
1045                if let Some(stream) = upgrade_receiver.await {
1046                    server
1047                        .handle_connection(
1048                            Connection::new(
1049                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1050                            ),
1051                            addr,
1052                            user_id,
1053                            None,
1054                            RealExecutor,
1055                        )
1056                        .await;
1057                }
1058            });
1059
1060            Ok(response)
1061        }
1062    });
1063}
1064
1065fn header_contains_ignore_case<T>(
1066    request: &tide::Request<T>,
1067    header_name: HeaderName,
1068    value: &str,
1069) -> bool {
1070    request
1071        .header(header_name)
1072        .map(|h| {
1073            h.as_str()
1074                .split(',')
1075                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1076        })
1077        .unwrap_or(false)
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use super::*;
1083    use crate::{
1084        auth,
1085        db::{tests::TestDb, UserId},
1086        github, AppState, Config,
1087    };
1088    use ::rpc::Peer;
1089    use collections::BTreeMap;
1090    use gpui::{executor, ModelHandle, TestAppContext};
1091    use parking_lot::Mutex;
1092    use postage::{mpsc, watch};
1093    use rand::prelude::*;
1094    use rpc::PeerId;
1095    use serde_json::json;
1096    use sqlx::types::time::OffsetDateTime;
1097    use std::{
1098        cell::{Cell, RefCell},
1099        env,
1100        ops::Deref,
1101        path::Path,
1102        rc::Rc,
1103        sync::{
1104            atomic::{AtomicBool, Ordering::SeqCst},
1105            Arc,
1106        },
1107        time::Duration,
1108    };
1109    use zed::{
1110        client::{
1111            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1112            EstablishConnectionError, UserStore,
1113        },
1114        editor::{
1115            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1116            Redo, ToggleCodeActions, Undo,
1117        },
1118        fs::{FakeFs, Fs as _},
1119        language::{
1120            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1121            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1122        },
1123        lsp,
1124        project::{DiagnosticSummary, Project, ProjectPath},
1125        workspace::{Workspace, WorkspaceParams},
1126    };
1127
1128    #[cfg(test)]
1129    #[ctor::ctor]
1130    fn init_logger() {
1131        if std::env::var("RUST_LOG").is_ok() {
1132            env_logger::init();
1133        }
1134    }
1135
1136    #[gpui::test(iterations = 10)]
1137    async fn test_share_project(
1138        mut cx_a: TestAppContext,
1139        mut cx_b: TestAppContext,
1140        last_iteration: bool,
1141    ) {
1142        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1143        let lang_registry = Arc::new(LanguageRegistry::new());
1144        let fs = Arc::new(FakeFs::new(cx_a.background()));
1145        cx_a.foreground().forbid_parking();
1146
1147        // Connect to a server as 2 clients.
1148        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1149        let client_a = server.create_client(&mut cx_a, "user_a").await;
1150        let client_b = server.create_client(&mut cx_b, "user_b").await;
1151
1152        // Share a project as client A
1153        fs.insert_tree(
1154            "/a",
1155            json!({
1156                ".zed.toml": r#"collaborators = ["user_b"]"#,
1157                "a.txt": "a-contents",
1158                "b.txt": "b-contents",
1159            }),
1160        )
1161        .await;
1162        let project_a = cx_a.update(|cx| {
1163            Project::local(
1164                client_a.clone(),
1165                client_a.user_store.clone(),
1166                lang_registry.clone(),
1167                fs.clone(),
1168                cx,
1169            )
1170        });
1171        let (worktree_a, _) = project_a
1172            .update(&mut cx_a, |p, cx| {
1173                p.find_or_create_local_worktree("/a", false, cx)
1174            })
1175            .await
1176            .unwrap();
1177        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1178        worktree_a
1179            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1180            .await;
1181        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1182        project_a
1183            .update(&mut cx_a, |p, cx| p.share(cx))
1184            .await
1185            .unwrap();
1186
1187        // Join that project as client B
1188        let project_b = Project::remote(
1189            project_id,
1190            client_b.clone(),
1191            client_b.user_store.clone(),
1192            lang_registry.clone(),
1193            fs.clone(),
1194            &mut cx_b.to_async(),
1195        )
1196        .await
1197        .unwrap();
1198
1199        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1200            assert_eq!(
1201                project
1202                    .collaborators()
1203                    .get(&client_a.peer_id)
1204                    .unwrap()
1205                    .user
1206                    .github_login,
1207                "user_a"
1208            );
1209            project.replica_id()
1210        });
1211        project_a
1212            .condition(&cx_a, |tree, _| {
1213                tree.collaborators()
1214                    .get(&client_b.peer_id)
1215                    .map_or(false, |collaborator| {
1216                        collaborator.replica_id == replica_id_b
1217                            && collaborator.user.github_login == "user_b"
1218                    })
1219            })
1220            .await;
1221
1222        // Open the same file as client B and client A.
1223        let buffer_b = project_b
1224            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1225            .await
1226            .unwrap();
1227        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1228        buffer_b.read_with(&cx_b, |buf, cx| {
1229            assert_eq!(buf.read(cx).text(), "b-contents")
1230        });
1231        project_a.read_with(&cx_a, |project, cx| {
1232            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1233        });
1234        let buffer_a = project_a
1235            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1236            .await
1237            .unwrap();
1238
1239        let editor_b = cx_b.add_view(window_b, |cx| {
1240            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1241        });
1242
1243        // TODO
1244        // // Create a selection set as client B and see that selection set as client A.
1245        // buffer_a
1246        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1247        //     .await;
1248
1249        // Edit the buffer as client B and see that edit as client A.
1250        editor_b.update(&mut cx_b, |editor, cx| {
1251            editor.handle_input(&Input("ok, ".into()), cx)
1252        });
1253        buffer_a
1254            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1255            .await;
1256
1257        // TODO
1258        // // Remove the selection set as client B, see those selections disappear as client A.
1259        cx_b.update(move |_| drop(editor_b));
1260        // buffer_a
1261        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1262        //     .await;
1263
1264        // Close the buffer as client A, see that the buffer is closed.
1265        cx_a.update(move |_| drop(buffer_a));
1266        project_a
1267            .condition(&cx_a, |project, cx| {
1268                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1269            })
1270            .await;
1271
1272        // Dropping the client B's project removes client B from client A's collaborators.
1273        cx_b.update(move |_| drop(project_b));
1274        project_a
1275            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1276            .await;
1277    }
1278
1279    #[gpui::test(iterations = 10)]
1280    async fn test_unshare_project(
1281        mut cx_a: TestAppContext,
1282        mut cx_b: TestAppContext,
1283        last_iteration: bool,
1284    ) {
1285        let lang_registry = Arc::new(LanguageRegistry::new());
1286        let fs = Arc::new(FakeFs::new(cx_a.background()));
1287        cx_a.foreground().forbid_parking();
1288
1289        // Connect to a server as 2 clients.
1290        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1291        let client_a = server.create_client(&mut cx_a, "user_a").await;
1292        let client_b = server.create_client(&mut cx_b, "user_b").await;
1293
1294        // Share a project as client A
1295        fs.insert_tree(
1296            "/a",
1297            json!({
1298                ".zed.toml": r#"collaborators = ["user_b"]"#,
1299                "a.txt": "a-contents",
1300                "b.txt": "b-contents",
1301            }),
1302        )
1303        .await;
1304        let project_a = cx_a.update(|cx| {
1305            Project::local(
1306                client_a.clone(),
1307                client_a.user_store.clone(),
1308                lang_registry.clone(),
1309                fs.clone(),
1310                cx,
1311            )
1312        });
1313        let (worktree_a, _) = project_a
1314            .update(&mut cx_a, |p, cx| {
1315                p.find_or_create_local_worktree("/a", false, cx)
1316            })
1317            .await
1318            .unwrap();
1319        worktree_a
1320            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1321            .await;
1322        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1323        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1324        project_a
1325            .update(&mut cx_a, |p, cx| p.share(cx))
1326            .await
1327            .unwrap();
1328        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1329
1330        // Join that project as client B
1331        let project_b = Project::remote(
1332            project_id,
1333            client_b.clone(),
1334            client_b.user_store.clone(),
1335            lang_registry.clone(),
1336            fs.clone(),
1337            &mut cx_b.to_async(),
1338        )
1339        .await
1340        .unwrap();
1341        project_b
1342            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1343            .await
1344            .unwrap();
1345
1346        // Unshare the project as client A
1347        project_a
1348            .update(&mut cx_a, |project, cx| project.unshare(cx))
1349            .await
1350            .unwrap();
1351        project_b
1352            .condition(&mut cx_b, |project, _| project.is_read_only())
1353            .await;
1354        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1355        drop(project_b);
1356
1357        // Share the project again and ensure guests can still join.
1358        project_a
1359            .update(&mut cx_a, |project, cx| project.share(cx))
1360            .await
1361            .unwrap();
1362        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1363
1364        let project_c = Project::remote(
1365            project_id,
1366            client_b.clone(),
1367            client_b.user_store.clone(),
1368            lang_registry.clone(),
1369            fs.clone(),
1370            &mut cx_b.to_async(),
1371        )
1372        .await
1373        .unwrap();
1374        project_c
1375            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1376            .await
1377            .unwrap();
1378    }
1379
1380    #[gpui::test(iterations = 10)]
1381    async fn test_propagate_saves_and_fs_changes(
1382        mut cx_a: TestAppContext,
1383        mut cx_b: TestAppContext,
1384        mut cx_c: TestAppContext,
1385        last_iteration: bool,
1386    ) {
1387        let lang_registry = Arc::new(LanguageRegistry::new());
1388        let fs = Arc::new(FakeFs::new(cx_a.background()));
1389        cx_a.foreground().forbid_parking();
1390
1391        // Connect to a server as 3 clients.
1392        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1393        let client_a = server.create_client(&mut cx_a, "user_a").await;
1394        let client_b = server.create_client(&mut cx_b, "user_b").await;
1395        let client_c = server.create_client(&mut cx_c, "user_c").await;
1396
1397        // Share a worktree as client A.
1398        fs.insert_tree(
1399            "/a",
1400            json!({
1401                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1402                "file1": "",
1403                "file2": ""
1404            }),
1405        )
1406        .await;
1407        let project_a = cx_a.update(|cx| {
1408            Project::local(
1409                client_a.clone(),
1410                client_a.user_store.clone(),
1411                lang_registry.clone(),
1412                fs.clone(),
1413                cx,
1414            )
1415        });
1416        let (worktree_a, _) = project_a
1417            .update(&mut cx_a, |p, cx| {
1418                p.find_or_create_local_worktree("/a", false, cx)
1419            })
1420            .await
1421            .unwrap();
1422        worktree_a
1423            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1424            .await;
1425        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1426        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1427        project_a
1428            .update(&mut cx_a, |p, cx| p.share(cx))
1429            .await
1430            .unwrap();
1431
1432        // Join that worktree as clients B and C.
1433        let project_b = Project::remote(
1434            project_id,
1435            client_b.clone(),
1436            client_b.user_store.clone(),
1437            lang_registry.clone(),
1438            fs.clone(),
1439            &mut cx_b.to_async(),
1440        )
1441        .await
1442        .unwrap();
1443        let project_c = Project::remote(
1444            project_id,
1445            client_c.clone(),
1446            client_c.user_store.clone(),
1447            lang_registry.clone(),
1448            fs.clone(),
1449            &mut cx_c.to_async(),
1450        )
1451        .await
1452        .unwrap();
1453        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1454        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1455
1456        // Open and edit a buffer as both guests B and C.
1457        let buffer_b = project_b
1458            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1459            .await
1460            .unwrap();
1461        let buffer_c = project_c
1462            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1463            .await
1464            .unwrap();
1465        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1466        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1467
1468        // Open and edit that buffer as the host.
1469        let buffer_a = project_a
1470            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1471            .await
1472            .unwrap();
1473
1474        buffer_a
1475            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1476            .await;
1477        buffer_a.update(&mut cx_a, |buf, cx| {
1478            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1479        });
1480
1481        // Wait for edits to propagate
1482        buffer_a
1483            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1484            .await;
1485        buffer_b
1486            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1487            .await;
1488        buffer_c
1489            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1490            .await;
1491
1492        // Edit the buffer as the host and concurrently save as guest B.
1493        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1494        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1495        save_b.await.unwrap();
1496        assert_eq!(
1497            fs.load("/a/file1".as_ref()).await.unwrap(),
1498            "hi-a, i-am-c, i-am-b, i-am-a"
1499        );
1500        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1501        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1502        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1503
1504        // Make changes on host's file system, see those changes on guest worktrees.
1505        fs.rename(
1506            "/a/file1".as_ref(),
1507            "/a/file1-renamed".as_ref(),
1508            Default::default(),
1509        )
1510        .await
1511        .unwrap();
1512
1513        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1514            .await
1515            .unwrap();
1516        fs.insert_file(Path::new("/a/file4"), "4".into())
1517            .await
1518            .unwrap();
1519
1520        worktree_a
1521            .condition(&cx_a, |tree, _| {
1522                tree.paths()
1523                    .map(|p| p.to_string_lossy())
1524                    .collect::<Vec<_>>()
1525                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1526            })
1527            .await;
1528        worktree_b
1529            .condition(&cx_b, |tree, _| {
1530                tree.paths()
1531                    .map(|p| p.to_string_lossy())
1532                    .collect::<Vec<_>>()
1533                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1534            })
1535            .await;
1536        worktree_c
1537            .condition(&cx_c, |tree, _| {
1538                tree.paths()
1539                    .map(|p| p.to_string_lossy())
1540                    .collect::<Vec<_>>()
1541                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1542            })
1543            .await;
1544
1545        // Ensure buffer files are updated as well.
1546        buffer_a
1547            .condition(&cx_a, |buf, _| {
1548                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1549            })
1550            .await;
1551        buffer_b
1552            .condition(&cx_b, |buf, _| {
1553                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1554            })
1555            .await;
1556        buffer_c
1557            .condition(&cx_c, |buf, _| {
1558                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1559            })
1560            .await;
1561    }
1562
1563    #[gpui::test(iterations = 10)]
1564    async fn test_buffer_conflict_after_save(
1565        mut cx_a: TestAppContext,
1566        mut cx_b: TestAppContext,
1567        last_iteration: bool,
1568    ) {
1569        cx_a.foreground().forbid_parking();
1570        let lang_registry = Arc::new(LanguageRegistry::new());
1571        let fs = Arc::new(FakeFs::new(cx_a.background()));
1572
1573        // Connect to a server as 2 clients.
1574        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1575        let client_a = server.create_client(&mut cx_a, "user_a").await;
1576        let client_b = server.create_client(&mut cx_b, "user_b").await;
1577
1578        // Share a project as client A
1579        fs.insert_tree(
1580            "/dir",
1581            json!({
1582                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1583                "a.txt": "a-contents",
1584            }),
1585        )
1586        .await;
1587
1588        let project_a = cx_a.update(|cx| {
1589            Project::local(
1590                client_a.clone(),
1591                client_a.user_store.clone(),
1592                lang_registry.clone(),
1593                fs.clone(),
1594                cx,
1595            )
1596        });
1597        let (worktree_a, _) = project_a
1598            .update(&mut cx_a, |p, cx| {
1599                p.find_or_create_local_worktree("/dir", false, cx)
1600            })
1601            .await
1602            .unwrap();
1603        worktree_a
1604            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1605            .await;
1606        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1607        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1608        project_a
1609            .update(&mut cx_a, |p, cx| p.share(cx))
1610            .await
1611            .unwrap();
1612
1613        // Join that project as client B
1614        let project_b = Project::remote(
1615            project_id,
1616            client_b.clone(),
1617            client_b.user_store.clone(),
1618            lang_registry.clone(),
1619            fs.clone(),
1620            &mut cx_b.to_async(),
1621        )
1622        .await
1623        .unwrap();
1624
1625        // Open a buffer as client B
1626        let buffer_b = project_b
1627            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1628            .await
1629            .unwrap();
1630
1631        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1632        buffer_b.read_with(&cx_b, |buf, _| {
1633            assert!(buf.is_dirty());
1634            assert!(!buf.has_conflict());
1635        });
1636
1637        buffer_b
1638            .update(&mut cx_b, |buf, cx| buf.save(cx))
1639            .await
1640            .unwrap();
1641        buffer_b
1642            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1643            .await;
1644        buffer_b.read_with(&cx_b, |buf, _| {
1645            assert!(!buf.has_conflict());
1646        });
1647
1648        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1649        buffer_b.read_with(&cx_b, |buf, _| {
1650            assert!(buf.is_dirty());
1651            assert!(!buf.has_conflict());
1652        });
1653    }
1654
1655    #[gpui::test(iterations = 10)]
1656    async fn test_buffer_reloading(
1657        mut cx_a: TestAppContext,
1658        mut cx_b: TestAppContext,
1659        last_iteration: bool,
1660    ) {
1661        cx_a.foreground().forbid_parking();
1662        let lang_registry = Arc::new(LanguageRegistry::new());
1663        let fs = Arc::new(FakeFs::new(cx_a.background()));
1664
1665        // Connect to a server as 2 clients.
1666        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1667        let client_a = server.create_client(&mut cx_a, "user_a").await;
1668        let client_b = server.create_client(&mut cx_b, "user_b").await;
1669
1670        // Share a project as client A
1671        fs.insert_tree(
1672            "/dir",
1673            json!({
1674                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1675                "a.txt": "a-contents",
1676            }),
1677        )
1678        .await;
1679
1680        let project_a = cx_a.update(|cx| {
1681            Project::local(
1682                client_a.clone(),
1683                client_a.user_store.clone(),
1684                lang_registry.clone(),
1685                fs.clone(),
1686                cx,
1687            )
1688        });
1689        let (worktree_a, _) = project_a
1690            .update(&mut cx_a, |p, cx| {
1691                p.find_or_create_local_worktree("/dir", false, cx)
1692            })
1693            .await
1694            .unwrap();
1695        worktree_a
1696            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1697            .await;
1698        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1699        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1700        project_a
1701            .update(&mut cx_a, |p, cx| p.share(cx))
1702            .await
1703            .unwrap();
1704
1705        // Join that project as client B
1706        let project_b = Project::remote(
1707            project_id,
1708            client_b.clone(),
1709            client_b.user_store.clone(),
1710            lang_registry.clone(),
1711            fs.clone(),
1712            &mut cx_b.to_async(),
1713        )
1714        .await
1715        .unwrap();
1716        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1717
1718        // Open a buffer as client B
1719        let buffer_b = project_b
1720            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1721            .await
1722            .unwrap();
1723        buffer_b.read_with(&cx_b, |buf, _| {
1724            assert!(!buf.is_dirty());
1725            assert!(!buf.has_conflict());
1726        });
1727
1728        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1729            .await
1730            .unwrap();
1731        buffer_b
1732            .condition(&cx_b, |buf, _| {
1733                buf.text() == "new contents" && !buf.is_dirty()
1734            })
1735            .await;
1736        buffer_b.read_with(&cx_b, |buf, _| {
1737            assert!(!buf.has_conflict());
1738        });
1739    }
1740
1741    #[gpui::test(iterations = 10)]
1742    async fn test_editing_while_guest_opens_buffer(
1743        mut cx_a: TestAppContext,
1744        mut cx_b: TestAppContext,
1745        last_iteration: bool,
1746    ) {
1747        cx_a.foreground().forbid_parking();
1748        let lang_registry = Arc::new(LanguageRegistry::new());
1749        let fs = Arc::new(FakeFs::new(cx_a.background()));
1750
1751        // Connect to a server as 2 clients.
1752        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1753        let client_a = server.create_client(&mut cx_a, "user_a").await;
1754        let client_b = server.create_client(&mut cx_b, "user_b").await;
1755
1756        // Share a project as client A
1757        fs.insert_tree(
1758            "/dir",
1759            json!({
1760                ".zed.toml": r#"collaborators = ["user_b"]"#,
1761                "a.txt": "a-contents",
1762            }),
1763        )
1764        .await;
1765        let project_a = cx_a.update(|cx| {
1766            Project::local(
1767                client_a.clone(),
1768                client_a.user_store.clone(),
1769                lang_registry.clone(),
1770                fs.clone(),
1771                cx,
1772            )
1773        });
1774        let (worktree_a, _) = project_a
1775            .update(&mut cx_a, |p, cx| {
1776                p.find_or_create_local_worktree("/dir", false, cx)
1777            })
1778            .await
1779            .unwrap();
1780        worktree_a
1781            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1782            .await;
1783        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1784        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1785        project_a
1786            .update(&mut cx_a, |p, cx| p.share(cx))
1787            .await
1788            .unwrap();
1789
1790        // Join that project as client B
1791        let project_b = Project::remote(
1792            project_id,
1793            client_b.clone(),
1794            client_b.user_store.clone(),
1795            lang_registry.clone(),
1796            fs.clone(),
1797            &mut cx_b.to_async(),
1798        )
1799        .await
1800        .unwrap();
1801
1802        // Open a buffer as client A
1803        let buffer_a = project_a
1804            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1805            .await
1806            .unwrap();
1807
1808        // Start opening the same buffer as client B
1809        let buffer_b = cx_b
1810            .background()
1811            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1812
1813        // Edit the buffer as client A while client B is still opening it.
1814        cx_b.background().simulate_random_delay().await;
1815        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1816        cx_b.background().simulate_random_delay().await;
1817        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1818
1819        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1820        let buffer_b = buffer_b.await.unwrap();
1821        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1822    }
1823
1824    #[gpui::test(iterations = 10)]
1825    async fn test_leaving_worktree_while_opening_buffer(
1826        mut cx_a: TestAppContext,
1827        mut cx_b: TestAppContext,
1828        last_iteration: bool,
1829    ) {
1830        cx_a.foreground().forbid_parking();
1831        let lang_registry = Arc::new(LanguageRegistry::new());
1832        let fs = Arc::new(FakeFs::new(cx_a.background()));
1833
1834        // Connect to a server as 2 clients.
1835        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1836        let client_a = server.create_client(&mut cx_a, "user_a").await;
1837        let client_b = server.create_client(&mut cx_b, "user_b").await;
1838
1839        // Share a project as client A
1840        fs.insert_tree(
1841            "/dir",
1842            json!({
1843                ".zed.toml": r#"collaborators = ["user_b"]"#,
1844                "a.txt": "a-contents",
1845            }),
1846        )
1847        .await;
1848        let project_a = cx_a.update(|cx| {
1849            Project::local(
1850                client_a.clone(),
1851                client_a.user_store.clone(),
1852                lang_registry.clone(),
1853                fs.clone(),
1854                cx,
1855            )
1856        });
1857        let (worktree_a, _) = project_a
1858            .update(&mut cx_a, |p, cx| {
1859                p.find_or_create_local_worktree("/dir", false, cx)
1860            })
1861            .await
1862            .unwrap();
1863        worktree_a
1864            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1865            .await;
1866        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1867        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1868        project_a
1869            .update(&mut cx_a, |p, cx| p.share(cx))
1870            .await
1871            .unwrap();
1872
1873        // Join that project as client B
1874        let project_b = Project::remote(
1875            project_id,
1876            client_b.clone(),
1877            client_b.user_store.clone(),
1878            lang_registry.clone(),
1879            fs.clone(),
1880            &mut cx_b.to_async(),
1881        )
1882        .await
1883        .unwrap();
1884
1885        // See that a guest has joined as client A.
1886        project_a
1887            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1888            .await;
1889
1890        // Begin opening a buffer as client B, but leave the project before the open completes.
1891        let buffer_b = cx_b
1892            .background()
1893            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1894        cx_b.update(|_| drop(project_b));
1895        drop(buffer_b);
1896
1897        // See that the guest has left.
1898        project_a
1899            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1900            .await;
1901    }
1902
1903    #[gpui::test(iterations = 10)]
1904    async fn test_peer_disconnection(
1905        mut cx_a: TestAppContext,
1906        mut cx_b: TestAppContext,
1907        last_iteration: bool,
1908    ) {
1909        cx_a.foreground().forbid_parking();
1910        let lang_registry = Arc::new(LanguageRegistry::new());
1911        let fs = Arc::new(FakeFs::new(cx_a.background()));
1912
1913        // Connect to a server as 2 clients.
1914        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
1915        let client_a = server.create_client(&mut cx_a, "user_a").await;
1916        let client_b = server.create_client(&mut cx_b, "user_b").await;
1917
1918        // Share a project as client A
1919        fs.insert_tree(
1920            "/a",
1921            json!({
1922                ".zed.toml": r#"collaborators = ["user_b"]"#,
1923                "a.txt": "a-contents",
1924                "b.txt": "b-contents",
1925            }),
1926        )
1927        .await;
1928        let project_a = cx_a.update(|cx| {
1929            Project::local(
1930                client_a.clone(),
1931                client_a.user_store.clone(),
1932                lang_registry.clone(),
1933                fs.clone(),
1934                cx,
1935            )
1936        });
1937        let (worktree_a, _) = project_a
1938            .update(&mut cx_a, |p, cx| {
1939                p.find_or_create_local_worktree("/a", false, cx)
1940            })
1941            .await
1942            .unwrap();
1943        worktree_a
1944            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1945            .await;
1946        let project_id = project_a
1947            .update(&mut cx_a, |project, _| project.next_remote_id())
1948            .await;
1949        project_a
1950            .update(&mut cx_a, |project, cx| project.share(cx))
1951            .await
1952            .unwrap();
1953
1954        // Join that project as client B
1955        let _project_b = Project::remote(
1956            project_id,
1957            client_b.clone(),
1958            client_b.user_store.clone(),
1959            lang_registry.clone(),
1960            fs.clone(),
1961            &mut cx_b.to_async(),
1962        )
1963        .await
1964        .unwrap();
1965
1966        // See that a guest has joined as client A.
1967        project_a
1968            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1969            .await;
1970
1971        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1972        client_b.disconnect(&cx_b.to_async()).unwrap();
1973        project_a
1974            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1975            .await;
1976    }
1977
1978    #[gpui::test(iterations = 10)]
1979    async fn test_collaborating_with_diagnostics(
1980        mut cx_a: TestAppContext,
1981        mut cx_b: TestAppContext,
1982        last_iteration: bool,
1983    ) {
1984        cx_a.foreground().forbid_parking();
1985        let mut lang_registry = Arc::new(LanguageRegistry::new());
1986        let fs = Arc::new(FakeFs::new(cx_a.background()));
1987
1988        // Set up a fake language server.
1989        let (language_server_config, mut fake_language_server) =
1990            LanguageServerConfig::fake(&cx_a).await;
1991        Arc::get_mut(&mut lang_registry)
1992            .unwrap()
1993            .add(Arc::new(Language::new(
1994                LanguageConfig {
1995                    name: "Rust".to_string(),
1996                    path_suffixes: vec!["rs".to_string()],
1997                    language_server: Some(language_server_config),
1998                    ..Default::default()
1999                },
2000                Some(tree_sitter_rust::language()),
2001            )));
2002
2003        // Connect to a server as 2 clients.
2004        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2005        let client_a = server.create_client(&mut cx_a, "user_a").await;
2006        let client_b = server.create_client(&mut cx_b, "user_b").await;
2007
2008        // Share a project as client A
2009        fs.insert_tree(
2010            "/a",
2011            json!({
2012                ".zed.toml": r#"collaborators = ["user_b"]"#,
2013                "a.rs": "let one = two",
2014                "other.rs": "",
2015            }),
2016        )
2017        .await;
2018        let project_a = cx_a.update(|cx| {
2019            Project::local(
2020                client_a.clone(),
2021                client_a.user_store.clone(),
2022                lang_registry.clone(),
2023                fs.clone(),
2024                cx,
2025            )
2026        });
2027        let (worktree_a, _) = project_a
2028            .update(&mut cx_a, |p, cx| {
2029                p.find_or_create_local_worktree("/a", false, cx)
2030            })
2031            .await
2032            .unwrap();
2033        worktree_a
2034            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2035            .await;
2036        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2037        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2038        project_a
2039            .update(&mut cx_a, |p, cx| p.share(cx))
2040            .await
2041            .unwrap();
2042
2043        // Cause the language server to start.
2044        let _ = cx_a
2045            .background()
2046            .spawn(project_a.update(&mut cx_a, |project, cx| {
2047                project.open_buffer(
2048                    ProjectPath {
2049                        worktree_id,
2050                        path: Path::new("other.rs").into(),
2051                    },
2052                    cx,
2053                )
2054            }))
2055            .await
2056            .unwrap();
2057
2058        // Simulate a language server reporting errors for a file.
2059        fake_language_server
2060            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2061                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2062                version: None,
2063                diagnostics: vec![lsp::Diagnostic {
2064                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2065                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2066                    message: "message 1".to_string(),
2067                    ..Default::default()
2068                }],
2069            })
2070            .await;
2071
2072        // Wait for server to see the diagnostics update.
2073        server
2074            .condition(|store| {
2075                let worktree = store
2076                    .project(project_id)
2077                    .unwrap()
2078                    .worktrees
2079                    .get(&worktree_id.to_proto())
2080                    .unwrap();
2081
2082                !worktree
2083                    .share
2084                    .as_ref()
2085                    .unwrap()
2086                    .diagnostic_summaries
2087                    .is_empty()
2088            })
2089            .await;
2090
2091        // Join the worktree as client B.
2092        let project_b = Project::remote(
2093            project_id,
2094            client_b.clone(),
2095            client_b.user_store.clone(),
2096            lang_registry.clone(),
2097            fs.clone(),
2098            &mut cx_b.to_async(),
2099        )
2100        .await
2101        .unwrap();
2102
2103        project_b.read_with(&cx_b, |project, cx| {
2104            assert_eq!(
2105                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2106                &[(
2107                    ProjectPath {
2108                        worktree_id,
2109                        path: Arc::from(Path::new("a.rs")),
2110                    },
2111                    DiagnosticSummary {
2112                        error_count: 1,
2113                        warning_count: 0,
2114                        ..Default::default()
2115                    },
2116                )]
2117            )
2118        });
2119
2120        // Simulate a language server reporting more errors for a file.
2121        fake_language_server
2122            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2123                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2124                version: None,
2125                diagnostics: vec![
2126                    lsp::Diagnostic {
2127                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2128                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2129                        message: "message 1".to_string(),
2130                        ..Default::default()
2131                    },
2132                    lsp::Diagnostic {
2133                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2134                        range: lsp::Range::new(
2135                            lsp::Position::new(0, 10),
2136                            lsp::Position::new(0, 13),
2137                        ),
2138                        message: "message 2".to_string(),
2139                        ..Default::default()
2140                    },
2141                ],
2142            })
2143            .await;
2144
2145        // Client b gets the updated summaries
2146        project_b
2147            .condition(&cx_b, |project, cx| {
2148                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2149                    == &[(
2150                        ProjectPath {
2151                            worktree_id,
2152                            path: Arc::from(Path::new("a.rs")),
2153                        },
2154                        DiagnosticSummary {
2155                            error_count: 1,
2156                            warning_count: 1,
2157                            ..Default::default()
2158                        },
2159                    )]
2160            })
2161            .await;
2162
2163        // Open the file with the errors on client B. They should be present.
2164        let buffer_b = cx_b
2165            .background()
2166            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2167            .await
2168            .unwrap();
2169
2170        buffer_b.read_with(&cx_b, |buffer, _| {
2171            assert_eq!(
2172                buffer
2173                    .snapshot()
2174                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2175                    .map(|entry| entry)
2176                    .collect::<Vec<_>>(),
2177                &[
2178                    DiagnosticEntry {
2179                        range: Point::new(0, 4)..Point::new(0, 7),
2180                        diagnostic: Diagnostic {
2181                            group_id: 0,
2182                            message: "message 1".to_string(),
2183                            severity: lsp::DiagnosticSeverity::ERROR,
2184                            is_primary: true,
2185                            ..Default::default()
2186                        }
2187                    },
2188                    DiagnosticEntry {
2189                        range: Point::new(0, 10)..Point::new(0, 13),
2190                        diagnostic: Diagnostic {
2191                            group_id: 1,
2192                            severity: lsp::DiagnosticSeverity::WARNING,
2193                            message: "message 2".to_string(),
2194                            is_primary: true,
2195                            ..Default::default()
2196                        }
2197                    }
2198                ]
2199            );
2200        });
2201    }
2202
2203    #[gpui::test(iterations = 10)]
2204    async fn test_collaborating_with_completion(
2205        mut cx_a: TestAppContext,
2206        mut cx_b: TestAppContext,
2207        last_iteration: bool,
2208    ) {
2209        cx_a.foreground().forbid_parking();
2210        let mut lang_registry = Arc::new(LanguageRegistry::new());
2211        let fs = Arc::new(FakeFs::new(cx_a.background()));
2212
2213        // Set up a fake language server.
2214        let (language_server_config, mut fake_language_server) =
2215            LanguageServerConfig::fake_with_capabilities(
2216                lsp::ServerCapabilities {
2217                    completion_provider: Some(lsp::CompletionOptions {
2218                        trigger_characters: Some(vec![".".to_string()]),
2219                        ..Default::default()
2220                    }),
2221                    ..Default::default()
2222                },
2223                &cx_a,
2224            )
2225            .await;
2226        Arc::get_mut(&mut lang_registry)
2227            .unwrap()
2228            .add(Arc::new(Language::new(
2229                LanguageConfig {
2230                    name: "Rust".to_string(),
2231                    path_suffixes: vec!["rs".to_string()],
2232                    language_server: Some(language_server_config),
2233                    ..Default::default()
2234                },
2235                Some(tree_sitter_rust::language()),
2236            )));
2237
2238        // Connect to a server as 2 clients.
2239        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2240        let client_a = server.create_client(&mut cx_a, "user_a").await;
2241        let client_b = server.create_client(&mut cx_b, "user_b").await;
2242
2243        // Share a project as client A
2244        fs.insert_tree(
2245            "/a",
2246            json!({
2247                ".zed.toml": r#"collaborators = ["user_b"]"#,
2248                "main.rs": "fn main() { a }",
2249                "other.rs": "",
2250            }),
2251        )
2252        .await;
2253        let project_a = cx_a.update(|cx| {
2254            Project::local(
2255                client_a.clone(),
2256                client_a.user_store.clone(),
2257                lang_registry.clone(),
2258                fs.clone(),
2259                cx,
2260            )
2261        });
2262        let (worktree_a, _) = project_a
2263            .update(&mut cx_a, |p, cx| {
2264                p.find_or_create_local_worktree("/a", false, cx)
2265            })
2266            .await
2267            .unwrap();
2268        worktree_a
2269            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2270            .await;
2271        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2272        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2273        project_a
2274            .update(&mut cx_a, |p, cx| p.share(cx))
2275            .await
2276            .unwrap();
2277
2278        // Join the worktree as client B.
2279        let project_b = Project::remote(
2280            project_id,
2281            client_b.clone(),
2282            client_b.user_store.clone(),
2283            lang_registry.clone(),
2284            fs.clone(),
2285            &mut cx_b.to_async(),
2286        )
2287        .await
2288        .unwrap();
2289
2290        // Open a file in an editor as the guest.
2291        let buffer_b = project_b
2292            .update(&mut cx_b, |p, cx| {
2293                p.open_buffer((worktree_id, "main.rs"), cx)
2294            })
2295            .await
2296            .unwrap();
2297        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2298        let editor_b = cx_b.add_view(window_b, |cx| {
2299            Editor::for_buffer(
2300                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2301                Arc::new(|cx| EditorSettings::test(cx)),
2302                Some(project_b.clone()),
2303                cx,
2304            )
2305        });
2306
2307        // Type a completion trigger character as the guest.
2308        editor_b.update(&mut cx_b, |editor, cx| {
2309            editor.select_ranges([13..13], None, cx);
2310            editor.handle_input(&Input(".".into()), cx);
2311            cx.focus(&editor_b);
2312        });
2313
2314        // Receive a completion request as the host's language server.
2315        // Return some completions from the host's language server.
2316        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2317            assert_eq!(
2318                params.text_document_position.text_document.uri,
2319                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2320            );
2321            assert_eq!(
2322                params.text_document_position.position,
2323                lsp::Position::new(0, 14),
2324            );
2325
2326            Some(lsp::CompletionResponse::Array(vec![
2327                lsp::CompletionItem {
2328                    label: "first_method(…)".into(),
2329                    detail: Some("fn(&mut self, B) -> C".into()),
2330                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2331                        new_text: "first_method($1)".to_string(),
2332                        range: lsp::Range::new(
2333                            lsp::Position::new(0, 14),
2334                            lsp::Position::new(0, 14),
2335                        ),
2336                    })),
2337                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2338                    ..Default::default()
2339                },
2340                lsp::CompletionItem {
2341                    label: "second_method(…)".into(),
2342                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2343                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2344                        new_text: "second_method()".to_string(),
2345                        range: lsp::Range::new(
2346                            lsp::Position::new(0, 14),
2347                            lsp::Position::new(0, 14),
2348                        ),
2349                    })),
2350                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2351                    ..Default::default()
2352                },
2353            ]))
2354        });
2355
2356        // Open the buffer on the host.
2357        let buffer_a = project_a
2358            .update(&mut cx_a, |p, cx| {
2359                p.open_buffer((worktree_id, "main.rs"), cx)
2360            })
2361            .await
2362            .unwrap();
2363        buffer_a
2364            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2365            .await;
2366
2367        // Confirm a completion on the guest.
2368        editor_b
2369            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2370            .await;
2371        editor_b.update(&mut cx_b, |editor, cx| {
2372            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2373            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2374        });
2375
2376        // Return a resolved completion from the host's language server.
2377        // The resolved completion has an additional text edit.
2378        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2379            assert_eq!(params.label, "first_method(…)");
2380            lsp::CompletionItem {
2381                label: "first_method(…)".into(),
2382                detail: Some("fn(&mut self, B) -> C".into()),
2383                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2384                    new_text: "first_method($1)".to_string(),
2385                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2386                })),
2387                additional_text_edits: Some(vec![lsp::TextEdit {
2388                    new_text: "use d::SomeTrait;\n".to_string(),
2389                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2390                }]),
2391                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2392                ..Default::default()
2393            }
2394        });
2395
2396        // The additional edit is applied.
2397        buffer_a
2398            .condition(&cx_a, |buffer, _| {
2399                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2400            })
2401            .await;
2402        buffer_b
2403            .condition(&cx_b, |buffer, _| {
2404                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2405            })
2406            .await;
2407    }
2408
2409    #[gpui::test(iterations = 10)]
2410    async fn test_formatting_buffer(
2411        mut cx_a: TestAppContext,
2412        mut cx_b: TestAppContext,
2413        last_iteration: bool,
2414    ) {
2415        cx_a.foreground().forbid_parking();
2416        let mut lang_registry = Arc::new(LanguageRegistry::new());
2417        let fs = Arc::new(FakeFs::new(cx_a.background()));
2418
2419        // Set up a fake language server.
2420        let (language_server_config, mut fake_language_server) =
2421            LanguageServerConfig::fake(&cx_a).await;
2422        Arc::get_mut(&mut lang_registry)
2423            .unwrap()
2424            .add(Arc::new(Language::new(
2425                LanguageConfig {
2426                    name: "Rust".to_string(),
2427                    path_suffixes: vec!["rs".to_string()],
2428                    language_server: Some(language_server_config),
2429                    ..Default::default()
2430                },
2431                Some(tree_sitter_rust::language()),
2432            )));
2433
2434        // Connect to a server as 2 clients.
2435        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2436        let client_a = server.create_client(&mut cx_a, "user_a").await;
2437        let client_b = server.create_client(&mut cx_b, "user_b").await;
2438
2439        // Share a project as client A
2440        fs.insert_tree(
2441            "/a",
2442            json!({
2443                ".zed.toml": r#"collaborators = ["user_b"]"#,
2444                "a.rs": "let one = two",
2445            }),
2446        )
2447        .await;
2448        let project_a = cx_a.update(|cx| {
2449            Project::local(
2450                client_a.clone(),
2451                client_a.user_store.clone(),
2452                lang_registry.clone(),
2453                fs.clone(),
2454                cx,
2455            )
2456        });
2457        let (worktree_a, _) = project_a
2458            .update(&mut cx_a, |p, cx| {
2459                p.find_or_create_local_worktree("/a", false, cx)
2460            })
2461            .await
2462            .unwrap();
2463        worktree_a
2464            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2465            .await;
2466        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2467        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2468        project_a
2469            .update(&mut cx_a, |p, cx| p.share(cx))
2470            .await
2471            .unwrap();
2472
2473        // Join the worktree as client B.
2474        let project_b = Project::remote(
2475            project_id,
2476            client_b.clone(),
2477            client_b.user_store.clone(),
2478            lang_registry.clone(),
2479            fs.clone(),
2480            &mut cx_b.to_async(),
2481        )
2482        .await
2483        .unwrap();
2484
2485        let buffer_b = cx_b
2486            .background()
2487            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2488            .await
2489            .unwrap();
2490
2491        let format = project_b.update(&mut cx_b, |project, cx| {
2492            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2493        });
2494
2495        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2496            Some(vec![
2497                lsp::TextEdit {
2498                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2499                    new_text: "h".to_string(),
2500                },
2501                lsp::TextEdit {
2502                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2503                    new_text: "y".to_string(),
2504                },
2505            ])
2506        });
2507
2508        format.await.unwrap();
2509        assert_eq!(
2510            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2511            "let honey = two"
2512        );
2513    }
2514
2515    #[gpui::test(iterations = 10)]
2516    async fn test_definition(
2517        mut cx_a: TestAppContext,
2518        mut cx_b: TestAppContext,
2519        last_iteration: bool,
2520    ) {
2521        cx_a.foreground().forbid_parking();
2522        let mut lang_registry = Arc::new(LanguageRegistry::new());
2523        let fs = Arc::new(FakeFs::new(cx_a.background()));
2524        fs.insert_tree(
2525            "/root-1",
2526            json!({
2527                ".zed.toml": r#"collaborators = ["user_b"]"#,
2528                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2529            }),
2530        )
2531        .await;
2532        fs.insert_tree(
2533            "/root-2",
2534            json!({
2535                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2536            }),
2537        )
2538        .await;
2539
2540        // Set up a fake language server.
2541        let (language_server_config, mut fake_language_server) =
2542            LanguageServerConfig::fake(&cx_a).await;
2543        Arc::get_mut(&mut lang_registry)
2544            .unwrap()
2545            .add(Arc::new(Language::new(
2546                LanguageConfig {
2547                    name: "Rust".to_string(),
2548                    path_suffixes: vec!["rs".to_string()],
2549                    language_server: Some(language_server_config),
2550                    ..Default::default()
2551                },
2552                Some(tree_sitter_rust::language()),
2553            )));
2554
2555        // Connect to a server as 2 clients.
2556        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2557        let client_a = server.create_client(&mut cx_a, "user_a").await;
2558        let client_b = server.create_client(&mut cx_b, "user_b").await;
2559
2560        // Share a project as client A
2561        let project_a = cx_a.update(|cx| {
2562            Project::local(
2563                client_a.clone(),
2564                client_a.user_store.clone(),
2565                lang_registry.clone(),
2566                fs.clone(),
2567                cx,
2568            )
2569        });
2570        let (worktree_a, _) = project_a
2571            .update(&mut cx_a, |p, cx| {
2572                p.find_or_create_local_worktree("/root-1", false, cx)
2573            })
2574            .await
2575            .unwrap();
2576        worktree_a
2577            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2578            .await;
2579        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2580        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2581        project_a
2582            .update(&mut cx_a, |p, cx| p.share(cx))
2583            .await
2584            .unwrap();
2585
2586        // Join the worktree as client B.
2587        let project_b = Project::remote(
2588            project_id,
2589            client_b.clone(),
2590            client_b.user_store.clone(),
2591            lang_registry.clone(),
2592            fs.clone(),
2593            &mut cx_b.to_async(),
2594        )
2595        .await
2596        .unwrap();
2597
2598        // Open the file on client B.
2599        let buffer_b = cx_b
2600            .background()
2601            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2602            .await
2603            .unwrap();
2604
2605        // Request the definition of a symbol as the guest.
2606        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2607        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2608            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2609                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2610                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2611            )))
2612        });
2613
2614        let definitions_1 = definitions_1.await.unwrap();
2615        cx_b.read(|cx| {
2616            assert_eq!(definitions_1.len(), 1);
2617            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2618            let target_buffer = definitions_1[0].target_buffer.read(cx);
2619            assert_eq!(
2620                target_buffer.text(),
2621                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2622            );
2623            assert_eq!(
2624                definitions_1[0].target_range.to_point(target_buffer),
2625                Point::new(0, 6)..Point::new(0, 9)
2626            );
2627        });
2628
2629        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2630        // the previous call to `definition`.
2631        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2632        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2633            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2634                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2635                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2636            )))
2637        });
2638
2639        let definitions_2 = definitions_2.await.unwrap();
2640        cx_b.read(|cx| {
2641            assert_eq!(definitions_2.len(), 1);
2642            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2643            let target_buffer = definitions_2[0].target_buffer.read(cx);
2644            assert_eq!(
2645                target_buffer.text(),
2646                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2647            );
2648            assert_eq!(
2649                definitions_2[0].target_range.to_point(target_buffer),
2650                Point::new(1, 6)..Point::new(1, 11)
2651            );
2652        });
2653        assert_eq!(
2654            definitions_1[0].target_buffer,
2655            definitions_2[0].target_buffer
2656        );
2657
2658        cx_b.update(|_| {
2659            drop(definitions_1);
2660            drop(definitions_2);
2661        });
2662        project_b
2663            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2664            .await;
2665    }
2666
2667    #[gpui::test(iterations = 10)]
2668    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2669        mut cx_a: TestAppContext,
2670        mut cx_b: TestAppContext,
2671        mut rng: StdRng,
2672        last_iteration: bool,
2673    ) {
2674        cx_a.foreground().forbid_parking();
2675        let mut lang_registry = Arc::new(LanguageRegistry::new());
2676        let fs = Arc::new(FakeFs::new(cx_a.background()));
2677        fs.insert_tree(
2678            "/root",
2679            json!({
2680                ".zed.toml": r#"collaborators = ["user_b"]"#,
2681                "a.rs": "const ONE: usize = b::TWO;",
2682                "b.rs": "const TWO: usize = 2",
2683            }),
2684        )
2685        .await;
2686
2687        // Set up a fake language server.
2688        let (language_server_config, mut fake_language_server) =
2689            LanguageServerConfig::fake(&cx_a).await;
2690
2691        Arc::get_mut(&mut lang_registry)
2692            .unwrap()
2693            .add(Arc::new(Language::new(
2694                LanguageConfig {
2695                    name: "Rust".to_string(),
2696                    path_suffixes: vec!["rs".to_string()],
2697                    language_server: Some(language_server_config),
2698                    ..Default::default()
2699                },
2700                Some(tree_sitter_rust::language()),
2701            )));
2702
2703        // Connect to a server as 2 clients.
2704        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2705        let client_a = server.create_client(&mut cx_a, "user_a").await;
2706        let client_b = server.create_client(&mut cx_b, "user_b").await;
2707
2708        // Share a project as client A
2709        let project_a = cx_a.update(|cx| {
2710            Project::local(
2711                client_a.clone(),
2712                client_a.user_store.clone(),
2713                lang_registry.clone(),
2714                fs.clone(),
2715                cx,
2716            )
2717        });
2718
2719        let (worktree_a, _) = project_a
2720            .update(&mut cx_a, |p, cx| {
2721                p.find_or_create_local_worktree("/root", false, cx)
2722            })
2723            .await
2724            .unwrap();
2725        worktree_a
2726            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2727            .await;
2728        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2729        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2730        project_a
2731            .update(&mut cx_a, |p, cx| p.share(cx))
2732            .await
2733            .unwrap();
2734
2735        // Join the worktree as client B.
2736        let project_b = Project::remote(
2737            project_id,
2738            client_b.clone(),
2739            client_b.user_store.clone(),
2740            lang_registry.clone(),
2741            fs.clone(),
2742            &mut cx_b.to_async(),
2743        )
2744        .await
2745        .unwrap();
2746
2747        let buffer_b1 = cx_b
2748            .background()
2749            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2750            .await
2751            .unwrap();
2752
2753        let definitions;
2754        let buffer_b2;
2755        if rng.gen() {
2756            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2757            buffer_b2 =
2758                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2759        } else {
2760            buffer_b2 =
2761                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2762            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2763        }
2764
2765        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2766            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2767                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2768                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2769            )))
2770        });
2771
2772        let buffer_b2 = buffer_b2.await.unwrap();
2773        let definitions = definitions.await.unwrap();
2774        assert_eq!(definitions.len(), 1);
2775        assert_eq!(definitions[0].target_buffer, buffer_b2);
2776    }
2777
2778    #[gpui::test(iterations = 10)]
2779    async fn test_collaborating_with_code_actions(
2780        mut cx_a: TestAppContext,
2781        mut cx_b: TestAppContext,
2782        last_iteration: bool,
2783    ) {
2784        cx_a.foreground().forbid_parking();
2785        let mut lang_registry = Arc::new(LanguageRegistry::new());
2786        let fs = Arc::new(FakeFs::new(cx_a.background()));
2787        let mut path_openers_b = Vec::new();
2788        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2789
2790        // Set up a fake language server.
2791        let (language_server_config, mut fake_language_server) =
2792            LanguageServerConfig::fake_with_capabilities(
2793                lsp::ServerCapabilities {
2794                    ..Default::default()
2795                },
2796                &cx_a,
2797            )
2798            .await;
2799        Arc::get_mut(&mut lang_registry)
2800            .unwrap()
2801            .add(Arc::new(Language::new(
2802                LanguageConfig {
2803                    name: "Rust".to_string(),
2804                    path_suffixes: vec!["rs".to_string()],
2805                    language_server: Some(language_server_config),
2806                    ..Default::default()
2807                },
2808                Some(tree_sitter_rust::language()),
2809            )));
2810
2811        // Connect to a server as 2 clients.
2812        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
2813        let client_a = server.create_client(&mut cx_a, "user_a").await;
2814        let client_b = server.create_client(&mut cx_b, "user_b").await;
2815
2816        // Share a project as client A
2817        fs.insert_tree(
2818            "/a",
2819            json!({
2820                ".zed.toml": r#"collaborators = ["user_b"]"#,
2821                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2822                "other.rs": "pub fn foo() -> usize { 4 }",
2823            }),
2824        )
2825        .await;
2826        let project_a = cx_a.update(|cx| {
2827            Project::local(
2828                client_a.clone(),
2829                client_a.user_store.clone(),
2830                lang_registry.clone(),
2831                fs.clone(),
2832                cx,
2833            )
2834        });
2835        let (worktree_a, _) = project_a
2836            .update(&mut cx_a, |p, cx| {
2837                p.find_or_create_local_worktree("/a", false, cx)
2838            })
2839            .await
2840            .unwrap();
2841        worktree_a
2842            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2843            .await;
2844        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2845        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2846        project_a
2847            .update(&mut cx_a, |p, cx| p.share(cx))
2848            .await
2849            .unwrap();
2850
2851        // Join the worktree as client B.
2852        let project_b = Project::remote(
2853            project_id,
2854            client_b.clone(),
2855            client_b.user_store.clone(),
2856            lang_registry.clone(),
2857            fs.clone(),
2858            &mut cx_b.to_async(),
2859        )
2860        .await
2861        .unwrap();
2862        let mut params = cx_b.update(WorkspaceParams::test);
2863        params.languages = lang_registry.clone();
2864        params.client = client_b.client.clone();
2865        params.user_store = client_b.user_store.clone();
2866        params.project = project_b;
2867        params.path_openers = path_openers_b.into();
2868
2869        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2870        let editor_b = workspace_b
2871            .update(&mut cx_b, |workspace, cx| {
2872                workspace.open_path((worktree_id, "main.rs").into(), cx)
2873            })
2874            .await
2875            .unwrap()
2876            .downcast::<Editor>()
2877            .unwrap();
2878        fake_language_server
2879            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2880                assert_eq!(
2881                    params.text_document.uri,
2882                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2883                );
2884                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2885                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2886                None
2887            })
2888            .next()
2889            .await;
2890
2891        // Move cursor to a location that contains code actions.
2892        editor_b.update(&mut cx_b, |editor, cx| {
2893            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2894            cx.focus(&editor_b);
2895        });
2896        fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2897            assert_eq!(
2898                params.text_document.uri,
2899                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2900            );
2901            assert_eq!(params.range.start, lsp::Position::new(1, 31));
2902            assert_eq!(params.range.end, lsp::Position::new(1, 31));
2903
2904            Some(vec![lsp::CodeActionOrCommand::CodeAction(
2905                lsp::CodeAction {
2906                    title: "Inline into all callers".to_string(),
2907                    edit: Some(lsp::WorkspaceEdit {
2908                        changes: Some(
2909                            [
2910                                (
2911                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2912                                    vec![lsp::TextEdit::new(
2913                                        lsp::Range::new(
2914                                            lsp::Position::new(1, 22),
2915                                            lsp::Position::new(1, 34),
2916                                        ),
2917                                        "4".to_string(),
2918                                    )],
2919                                ),
2920                                (
2921                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
2922                                    vec![lsp::TextEdit::new(
2923                                        lsp::Range::new(
2924                                            lsp::Position::new(0, 0),
2925                                            lsp::Position::new(0, 27),
2926                                        ),
2927                                        "".to_string(),
2928                                    )],
2929                                ),
2930                            ]
2931                            .into_iter()
2932                            .collect(),
2933                        ),
2934                        ..Default::default()
2935                    }),
2936                    data: Some(json!({
2937                        "codeActionParams": {
2938                            "range": {
2939                                "start": {"line": 1, "column": 31},
2940                                "end": {"line": 1, "column": 31},
2941                            }
2942                        }
2943                    })),
2944                    ..Default::default()
2945                },
2946            )])
2947        });
2948
2949        // Toggle code actions and wait for them to display.
2950        editor_b.update(&mut cx_b, |editor, cx| {
2951            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2952        });
2953        editor_b
2954            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2955            .await;
2956
2957        // Confirming the code action will trigger a resolve request.
2958        let confirm_action = workspace_b
2959            .update(&mut cx_b, |workspace, cx| {
2960                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2961            })
2962            .unwrap();
2963        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2964            lsp::CodeAction {
2965                title: "Inline into all callers".to_string(),
2966                edit: Some(lsp::WorkspaceEdit {
2967                    changes: Some(
2968                        [
2969                            (
2970                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2971                                vec![lsp::TextEdit::new(
2972                                    lsp::Range::new(
2973                                        lsp::Position::new(1, 22),
2974                                        lsp::Position::new(1, 34),
2975                                    ),
2976                                    "4".to_string(),
2977                                )],
2978                            ),
2979                            (
2980                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2981                                vec![lsp::TextEdit::new(
2982                                    lsp::Range::new(
2983                                        lsp::Position::new(0, 0),
2984                                        lsp::Position::new(0, 27),
2985                                    ),
2986                                    "".to_string(),
2987                                )],
2988                            ),
2989                        ]
2990                        .into_iter()
2991                        .collect(),
2992                    ),
2993                    ..Default::default()
2994                }),
2995                ..Default::default()
2996            }
2997        });
2998
2999        // After the action is confirmed, an editor containing both modified files is opened.
3000        confirm_action.await.unwrap();
3001        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3002            workspace
3003                .active_item(cx)
3004                .unwrap()
3005                .downcast::<Editor>()
3006                .unwrap()
3007        });
3008        code_action_editor.update(&mut cx_b, |editor, cx| {
3009            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3010            editor.undo(&Undo, cx);
3011            assert_eq!(
3012                editor.text(cx),
3013                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3014            );
3015            editor.redo(&Redo, cx);
3016            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3017        });
3018    }
3019
3020    #[gpui::test(iterations = 10)]
3021    async fn test_basic_chat(
3022        mut cx_a: TestAppContext,
3023        mut cx_b: TestAppContext,
3024        last_iteration: bool,
3025    ) {
3026        cx_a.foreground().forbid_parking();
3027
3028        // Connect to a server as 2 clients.
3029        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3030        let client_a = server.create_client(&mut cx_a, "user_a").await;
3031        let client_b = server.create_client(&mut cx_b, "user_b").await;
3032
3033        // Create an org that includes these 2 users.
3034        let db = &server.app_state.db;
3035        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3036        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3037            .await
3038            .unwrap();
3039        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3040            .await
3041            .unwrap();
3042
3043        // Create a channel that includes all the users.
3044        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3045        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3046            .await
3047            .unwrap();
3048        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3049            .await
3050            .unwrap();
3051        db.create_channel_message(
3052            channel_id,
3053            client_b.current_user_id(&cx_b),
3054            "hello A, it's B.",
3055            OffsetDateTime::now_utc(),
3056            1,
3057        )
3058        .await
3059        .unwrap();
3060
3061        let channels_a = cx_a
3062            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3063        channels_a
3064            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3065            .await;
3066        channels_a.read_with(&cx_a, |list, _| {
3067            assert_eq!(
3068                list.available_channels().unwrap(),
3069                &[ChannelDetails {
3070                    id: channel_id.to_proto(),
3071                    name: "test-channel".to_string()
3072                }]
3073            )
3074        });
3075        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3076            this.get_channel(channel_id.to_proto(), cx).unwrap()
3077        });
3078        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3079        channel_a
3080            .condition(&cx_a, |channel, _| {
3081                channel_messages(channel)
3082                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3083            })
3084            .await;
3085
3086        let channels_b = cx_b
3087            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3088        channels_b
3089            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3090            .await;
3091        channels_b.read_with(&cx_b, |list, _| {
3092            assert_eq!(
3093                list.available_channels().unwrap(),
3094                &[ChannelDetails {
3095                    id: channel_id.to_proto(),
3096                    name: "test-channel".to_string()
3097                }]
3098            )
3099        });
3100
3101        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3102            this.get_channel(channel_id.to_proto(), cx).unwrap()
3103        });
3104        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3105        channel_b
3106            .condition(&cx_b, |channel, _| {
3107                channel_messages(channel)
3108                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3109            })
3110            .await;
3111
3112        channel_a
3113            .update(&mut cx_a, |channel, cx| {
3114                channel
3115                    .send_message("oh, hi B.".to_string(), cx)
3116                    .unwrap()
3117                    .detach();
3118                let task = channel.send_message("sup".to_string(), cx).unwrap();
3119                assert_eq!(
3120                    channel_messages(channel),
3121                    &[
3122                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3123                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3124                        ("user_a".to_string(), "sup".to_string(), true)
3125                    ]
3126                );
3127                task
3128            })
3129            .await
3130            .unwrap();
3131
3132        channel_b
3133            .condition(&cx_b, |channel, _| {
3134                channel_messages(channel)
3135                    == [
3136                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3137                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3138                        ("user_a".to_string(), "sup".to_string(), false),
3139                    ]
3140            })
3141            .await;
3142
3143        assert_eq!(
3144            server
3145                .state()
3146                .await
3147                .channel(channel_id)
3148                .unwrap()
3149                .connection_ids
3150                .len(),
3151            2
3152        );
3153        cx_b.update(|_| drop(channel_b));
3154        server
3155            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3156            .await;
3157
3158        cx_a.update(|_| drop(channel_a));
3159        server
3160            .condition(|state| state.channel(channel_id).is_none())
3161            .await;
3162    }
3163
3164    #[gpui::test(iterations = 10)]
3165    async fn test_chat_message_validation(mut cx_a: TestAppContext, last_iteration: bool) {
3166        cx_a.foreground().forbid_parking();
3167
3168        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3169        let client_a = server.create_client(&mut cx_a, "user_a").await;
3170
3171        let db = &server.app_state.db;
3172        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3173        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3174        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3175            .await
3176            .unwrap();
3177        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3178            .await
3179            .unwrap();
3180
3181        let channels_a = cx_a
3182            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3183        channels_a
3184            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3185            .await;
3186        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3187            this.get_channel(channel_id.to_proto(), cx).unwrap()
3188        });
3189
3190        // Messages aren't allowed to be too long.
3191        channel_a
3192            .update(&mut cx_a, |channel, cx| {
3193                let long_body = "this is long.\n".repeat(1024);
3194                channel.send_message(long_body, cx).unwrap()
3195            })
3196            .await
3197            .unwrap_err();
3198
3199        // Messages aren't allowed to be blank.
3200        channel_a.update(&mut cx_a, |channel, cx| {
3201            channel.send_message(String::new(), cx).unwrap_err()
3202        });
3203
3204        // Leading and trailing whitespace are trimmed.
3205        channel_a
3206            .update(&mut cx_a, |channel, cx| {
3207                channel
3208                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3209                    .unwrap()
3210            })
3211            .await
3212            .unwrap();
3213        assert_eq!(
3214            db.get_channel_messages(channel_id, 10, None)
3215                .await
3216                .unwrap()
3217                .iter()
3218                .map(|m| &m.body)
3219                .collect::<Vec<_>>(),
3220            &["surrounded by whitespace"]
3221        );
3222    }
3223
3224    #[gpui::test(iterations = 10)]
3225    async fn test_chat_reconnection(
3226        mut cx_a: TestAppContext,
3227        mut cx_b: TestAppContext,
3228        last_iteration: bool,
3229    ) {
3230        cx_a.foreground().forbid_parking();
3231
3232        // Connect to a server as 2 clients.
3233        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3234        let client_a = server.create_client(&mut cx_a, "user_a").await;
3235        let client_b = server.create_client(&mut cx_b, "user_b").await;
3236        let mut status_b = client_b.status();
3237
3238        // Create an org that includes these 2 users.
3239        let db = &server.app_state.db;
3240        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3241        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3242            .await
3243            .unwrap();
3244        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3245            .await
3246            .unwrap();
3247
3248        // Create a channel that includes all the users.
3249        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3250        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3251            .await
3252            .unwrap();
3253        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3254            .await
3255            .unwrap();
3256        db.create_channel_message(
3257            channel_id,
3258            client_b.current_user_id(&cx_b),
3259            "hello A, it's B.",
3260            OffsetDateTime::now_utc(),
3261            2,
3262        )
3263        .await
3264        .unwrap();
3265
3266        let channels_a = cx_a
3267            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3268        channels_a
3269            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3270            .await;
3271
3272        channels_a.read_with(&cx_a, |list, _| {
3273            assert_eq!(
3274                list.available_channels().unwrap(),
3275                &[ChannelDetails {
3276                    id: channel_id.to_proto(),
3277                    name: "test-channel".to_string()
3278                }]
3279            )
3280        });
3281        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3282            this.get_channel(channel_id.to_proto(), cx).unwrap()
3283        });
3284        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3285        channel_a
3286            .condition(&cx_a, |channel, _| {
3287                channel_messages(channel)
3288                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3289            })
3290            .await;
3291
3292        let channels_b = cx_b
3293            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3294        channels_b
3295            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3296            .await;
3297        channels_b.read_with(&cx_b, |list, _| {
3298            assert_eq!(
3299                list.available_channels().unwrap(),
3300                &[ChannelDetails {
3301                    id: channel_id.to_proto(),
3302                    name: "test-channel".to_string()
3303                }]
3304            )
3305        });
3306
3307        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3308            this.get_channel(channel_id.to_proto(), cx).unwrap()
3309        });
3310        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3311        channel_b
3312            .condition(&cx_b, |channel, _| {
3313                channel_messages(channel)
3314                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3315            })
3316            .await;
3317
3318        // Disconnect client B, ensuring we can still access its cached channel data.
3319        server.forbid_connections();
3320        server.disconnect_client(client_b.current_user_id(&cx_b));
3321        while !matches!(
3322            status_b.next().await,
3323            Some(client::Status::ReconnectionError { .. })
3324        ) {}
3325
3326        channels_b.read_with(&cx_b, |channels, _| {
3327            assert_eq!(
3328                channels.available_channels().unwrap(),
3329                [ChannelDetails {
3330                    id: channel_id.to_proto(),
3331                    name: "test-channel".to_string()
3332                }]
3333            )
3334        });
3335        channel_b.read_with(&cx_b, |channel, _| {
3336            assert_eq!(
3337                channel_messages(channel),
3338                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3339            )
3340        });
3341
3342        // Send a message from client B while it is disconnected.
3343        channel_b
3344            .update(&mut cx_b, |channel, cx| {
3345                let task = channel
3346                    .send_message("can you see this?".to_string(), cx)
3347                    .unwrap();
3348                assert_eq!(
3349                    channel_messages(channel),
3350                    &[
3351                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3352                        ("user_b".to_string(), "can you see this?".to_string(), true)
3353                    ]
3354                );
3355                task
3356            })
3357            .await
3358            .unwrap_err();
3359
3360        // Send a message from client A while B is disconnected.
3361        channel_a
3362            .update(&mut cx_a, |channel, cx| {
3363                channel
3364                    .send_message("oh, hi B.".to_string(), cx)
3365                    .unwrap()
3366                    .detach();
3367                let task = channel.send_message("sup".to_string(), cx).unwrap();
3368                assert_eq!(
3369                    channel_messages(channel),
3370                    &[
3371                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3372                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3373                        ("user_a".to_string(), "sup".to_string(), true)
3374                    ]
3375                );
3376                task
3377            })
3378            .await
3379            .unwrap();
3380
3381        // Give client B a chance to reconnect.
3382        server.allow_connections();
3383        cx_b.foreground().advance_clock(Duration::from_secs(10));
3384
3385        // Verify that B sees the new messages upon reconnection, as well as the message client B
3386        // sent while offline.
3387        channel_b
3388            .condition(&cx_b, |channel, _| {
3389                channel_messages(channel)
3390                    == [
3391                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3392                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3393                        ("user_a".to_string(), "sup".to_string(), false),
3394                        ("user_b".to_string(), "can you see this?".to_string(), false),
3395                    ]
3396            })
3397            .await;
3398
3399        // Ensure client A and B can communicate normally after reconnection.
3400        channel_a
3401            .update(&mut cx_a, |channel, cx| {
3402                channel.send_message("you online?".to_string(), cx).unwrap()
3403            })
3404            .await
3405            .unwrap();
3406        channel_b
3407            .condition(&cx_b, |channel, _| {
3408                channel_messages(channel)
3409                    == [
3410                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3411                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3412                        ("user_a".to_string(), "sup".to_string(), false),
3413                        ("user_b".to_string(), "can you see this?".to_string(), false),
3414                        ("user_a".to_string(), "you online?".to_string(), false),
3415                    ]
3416            })
3417            .await;
3418
3419        channel_b
3420            .update(&mut cx_b, |channel, cx| {
3421                channel.send_message("yep".to_string(), cx).unwrap()
3422            })
3423            .await
3424            .unwrap();
3425        channel_a
3426            .condition(&cx_a, |channel, _| {
3427                channel_messages(channel)
3428                    == [
3429                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3430                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3431                        ("user_a".to_string(), "sup".to_string(), false),
3432                        ("user_b".to_string(), "can you see this?".to_string(), false),
3433                        ("user_a".to_string(), "you online?".to_string(), false),
3434                        ("user_b".to_string(), "yep".to_string(), false),
3435                    ]
3436            })
3437            .await;
3438    }
3439
3440    #[gpui::test(iterations = 10)]
3441    async fn test_contacts(
3442        mut cx_a: TestAppContext,
3443        mut cx_b: TestAppContext,
3444        mut cx_c: TestAppContext,
3445        last_iteration: bool,
3446    ) {
3447        cx_a.foreground().forbid_parking();
3448        let lang_registry = Arc::new(LanguageRegistry::new());
3449        let fs = Arc::new(FakeFs::new(cx_a.background()));
3450
3451        // Connect to a server as 3 clients.
3452        let mut server = TestServer::start(cx_a.foreground(), last_iteration).await;
3453        let client_a = server.create_client(&mut cx_a, "user_a").await;
3454        let client_b = server.create_client(&mut cx_b, "user_b").await;
3455        let client_c = server.create_client(&mut cx_c, "user_c").await;
3456
3457        // Share a worktree as client A.
3458        fs.insert_tree(
3459            "/a",
3460            json!({
3461                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3462            }),
3463        )
3464        .await;
3465
3466        let project_a = cx_a.update(|cx| {
3467            Project::local(
3468                client_a.clone(),
3469                client_a.user_store.clone(),
3470                lang_registry.clone(),
3471                fs.clone(),
3472                cx,
3473            )
3474        });
3475        let (worktree_a, _) = project_a
3476            .update(&mut cx_a, |p, cx| {
3477                p.find_or_create_local_worktree("/a", false, cx)
3478            })
3479            .await
3480            .unwrap();
3481        worktree_a
3482            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3483            .await;
3484
3485        client_a
3486            .user_store
3487            .condition(&cx_a, |user_store, _| {
3488                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3489            })
3490            .await;
3491        client_b
3492            .user_store
3493            .condition(&cx_b, |user_store, _| {
3494                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3495            })
3496            .await;
3497        client_c
3498            .user_store
3499            .condition(&cx_c, |user_store, _| {
3500                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3501            })
3502            .await;
3503
3504        let project_id = project_a
3505            .update(&mut cx_a, |project, _| project.next_remote_id())
3506            .await;
3507        project_a
3508            .update(&mut cx_a, |project, cx| project.share(cx))
3509            .await
3510            .unwrap();
3511
3512        let _project_b = Project::remote(
3513            project_id,
3514            client_b.clone(),
3515            client_b.user_store.clone(),
3516            lang_registry.clone(),
3517            fs.clone(),
3518            &mut cx_b.to_async(),
3519        )
3520        .await
3521        .unwrap();
3522
3523        client_a
3524            .user_store
3525            .condition(&cx_a, |user_store, _| {
3526                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3527            })
3528            .await;
3529        client_b
3530            .user_store
3531            .condition(&cx_b, |user_store, _| {
3532                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3533            })
3534            .await;
3535        client_c
3536            .user_store
3537            .condition(&cx_c, |user_store, _| {
3538                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3539            })
3540            .await;
3541
3542        project_a
3543            .condition(&cx_a, |project, _| {
3544                project.collaborators().contains_key(&client_b.peer_id)
3545            })
3546            .await;
3547
3548        cx_a.update(move |_| drop(project_a));
3549        client_a
3550            .user_store
3551            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3552            .await;
3553        client_b
3554            .user_store
3555            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3556            .await;
3557        client_c
3558            .user_store
3559            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3560            .await;
3561
3562        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3563            user_store
3564                .contacts()
3565                .iter()
3566                .map(|contact| {
3567                    let worktrees = contact
3568                        .projects
3569                        .iter()
3570                        .map(|p| {
3571                            (
3572                                p.worktree_root_names[0].as_str(),
3573                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3574                            )
3575                        })
3576                        .collect();
3577                    (contact.user.github_login.as_str(), worktrees)
3578                })
3579                .collect()
3580        }
3581    }
3582
3583    #[gpui::test(iterations = 100)]
3584    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng, last_iteration: bool) {
3585        cx.foreground().forbid_parking();
3586        let max_peers = env::var("MAX_PEERS")
3587            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
3588            .unwrap_or(5);
3589        let max_operations = env::var("OPERATIONS")
3590            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
3591            .unwrap_or(10);
3592
3593        let rng = Rc::new(RefCell::new(rng));
3594        let lang_registry = Arc::new(LanguageRegistry::new());
3595        let fs = Arc::new(FakeFs::new(cx.background()));
3596        fs.insert_tree(
3597            "/_collab",
3598            json!({
3599                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
3600            }),
3601        )
3602        .await;
3603
3604        let operations = Rc::new(Cell::new(0));
3605        let mut server = TestServer::start(cx.foreground(), last_iteration).await;
3606        let mut clients = Vec::new();
3607
3608        let mut next_entity_id = 100000;
3609        let mut host_cx = TestAppContext::new(
3610            cx.foreground_platform(),
3611            cx.platform(),
3612            cx.foreground(),
3613            cx.background(),
3614            cx.font_cache(),
3615            next_entity_id,
3616        );
3617        let host = server.create_client(&mut host_cx, "host").await;
3618        let host_project = host_cx.update(|cx| {
3619            Project::local(
3620                host.client.clone(),
3621                host.user_store.clone(),
3622                lang_registry.clone(),
3623                fs.clone(),
3624                cx,
3625            )
3626        });
3627        let host_project_id = host_project
3628            .update(&mut host_cx, |p, _| p.next_remote_id())
3629            .await;
3630
3631        let (collab_worktree, _) = host_project
3632            .update(&mut host_cx, |project, cx| {
3633                project.find_or_create_local_worktree("/_collab", false, cx)
3634            })
3635            .await
3636            .unwrap();
3637        collab_worktree
3638            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
3639            .await;
3640        host_project
3641            .update(&mut host_cx, |project, cx| project.share(cx))
3642            .await
3643            .unwrap();
3644
3645        clients.push(cx.foreground().spawn(host.simulate_host(
3646            host_project.clone(),
3647            operations.clone(),
3648            max_operations,
3649            rng.clone(),
3650            host_cx.clone(),
3651        )));
3652
3653        while operations.get() < max_operations {
3654            cx.background().simulate_random_delay().await;
3655            if clients.len() < max_peers && rng.borrow_mut().gen_bool(0.05) {
3656                operations.set(operations.get() + 1);
3657
3658                let guest_id = clients.len();
3659                log::info!("Adding guest {}", guest_id);
3660                next_entity_id += 100000;
3661                let mut guest_cx = TestAppContext::new(
3662                    cx.foreground_platform(),
3663                    cx.platform(),
3664                    cx.foreground(),
3665                    cx.background(),
3666                    cx.font_cache(),
3667                    next_entity_id,
3668                );
3669                let guest = server
3670                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
3671                    .await;
3672                let guest_project = Project::remote(
3673                    host_project_id,
3674                    guest.client.clone(),
3675                    guest.user_store.clone(),
3676                    lang_registry.clone(),
3677                    fs.clone(),
3678                    &mut guest_cx.to_async(),
3679                )
3680                .await
3681                .unwrap();
3682                clients.push(cx.foreground().spawn(guest.simulate_guest(
3683                    guest_id,
3684                    guest_project,
3685                    operations.clone(),
3686                    max_operations,
3687                    rng.clone(),
3688                    guest_cx,
3689                )));
3690
3691                log::info!("Guest {} added", guest_id);
3692            }
3693        }
3694
3695        let clients = futures::future::join_all(clients).await;
3696        cx.foreground().run_until_parked();
3697
3698        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
3699            project
3700                .worktrees(cx)
3701                .map(|worktree| {
3702                    let snapshot = worktree.read(cx).snapshot();
3703                    (snapshot.id(), snapshot)
3704                })
3705                .collect::<BTreeMap<_, _>>()
3706        });
3707
3708        for (ix, (client_a, cx_a)) in clients.iter().enumerate() {
3709            let worktree_snapshots =
3710                client_a
3711                    .project
3712                    .as_ref()
3713                    .unwrap()
3714                    .read_with(cx_a, |project, cx| {
3715                        project
3716                            .worktrees(cx)
3717                            .map(|worktree| {
3718                                let snapshot = worktree.read(cx).snapshot();
3719                                (snapshot.id(), snapshot)
3720                            })
3721                            .collect::<BTreeMap<_, _>>()
3722                    });
3723
3724            assert_eq!(
3725                worktree_snapshots.keys().collect::<Vec<_>>(),
3726                host_worktree_snapshots.keys().collect::<Vec<_>>(),
3727                "guest {} has different worktrees than the host",
3728                ix
3729            );
3730            for (id, host_snapshot) in &host_worktree_snapshots {
3731                let guest_snapshot = &worktree_snapshots[id];
3732                assert_eq!(
3733                    guest_snapshot.root_name(),
3734                    host_snapshot.root_name(),
3735                    "guest {} has different root name than the host for worktree {}",
3736                    ix,
3737                    id
3738                );
3739                assert_eq!(
3740                    guest_snapshot.entries(false).collect::<Vec<_>>(),
3741                    host_snapshot.entries(false).collect::<Vec<_>>(),
3742                    "guest {} has different snapshot than the host for worktree {}",
3743                    ix,
3744                    id
3745                );
3746            }
3747
3748            for (client_b, cx_b) in &clients[ix + 1..] {
3749                for buffer_a in &client_a.buffers {
3750                    let buffer_id = buffer_a.read_with(cx_a, |buffer, _| buffer.remote_id());
3751                    if let Some(buffer_b) = client_b.buffers.iter().find(|buffer| {
3752                        buffer.read_with(cx_b, |buffer, _| buffer.remote_id() == buffer_id)
3753                    }) {
3754                        assert_eq!(
3755                            buffer_a.read_with(cx_a, |buffer, _| buffer.text()),
3756                            buffer_b.read_with(cx_b, |buffer, _| buffer.text())
3757                        );
3758                    }
3759                }
3760            }
3761        }
3762    }
3763
3764    struct TestServer {
3765        peer: Arc<Peer>,
3766        app_state: Arc<AppState>,
3767        server: Arc<Server>,
3768        foreground: Rc<executor::Foreground>,
3769        notifications: mpsc::Receiver<()>,
3770        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3771        forbid_connections: Arc<AtomicBool>,
3772        _test_db: TestDb,
3773    }
3774
3775    impl TestServer {
3776        async fn start(foreground: Rc<executor::Foreground>, clean_db_pool_on_drop: bool) -> Self {
3777            let mut test_db = TestDb::new();
3778            test_db.set_clean_pool_on_drop(clean_db_pool_on_drop);
3779            let app_state = Self::build_app_state(&test_db).await;
3780            let peer = Peer::new();
3781            let notifications = mpsc::channel(128);
3782            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3783            Self {
3784                peer,
3785                app_state,
3786                server,
3787                foreground,
3788                notifications: notifications.1,
3789                connection_killers: Default::default(),
3790                forbid_connections: Default::default(),
3791                _test_db: test_db,
3792            }
3793        }
3794
3795        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3796            let http = FakeHttpClient::with_404_response();
3797            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3798            let client_name = name.to_string();
3799            let mut client = Client::new(http.clone());
3800            let server = self.server.clone();
3801            let connection_killers = self.connection_killers.clone();
3802            let forbid_connections = self.forbid_connections.clone();
3803            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3804
3805            Arc::get_mut(&mut client)
3806                .unwrap()
3807                .override_authenticate(move |cx| {
3808                    cx.spawn(|_| async move {
3809                        let access_token = "the-token".to_string();
3810                        Ok(Credentials {
3811                            user_id: user_id.0 as u64,
3812                            access_token,
3813                        })
3814                    })
3815                })
3816                .override_establish_connection(move |credentials, cx| {
3817                    assert_eq!(credentials.user_id, user_id.0 as u64);
3818                    assert_eq!(credentials.access_token, "the-token");
3819
3820                    let server = server.clone();
3821                    let connection_killers = connection_killers.clone();
3822                    let forbid_connections = forbid_connections.clone();
3823                    let client_name = client_name.clone();
3824                    let connection_id_tx = connection_id_tx.clone();
3825                    cx.spawn(move |cx| async move {
3826                        if forbid_connections.load(SeqCst) {
3827                            Err(EstablishConnectionError::other(anyhow!(
3828                                "server is forbidding connections"
3829                            )))
3830                        } else {
3831                            let (client_conn, server_conn, kill_conn) =
3832                                Connection::in_memory(cx.background());
3833                            connection_killers.lock().insert(user_id, kill_conn);
3834                            cx.background()
3835                                .spawn(server.handle_connection(
3836                                    server_conn,
3837                                    client_name,
3838                                    user_id,
3839                                    Some(connection_id_tx),
3840                                    cx.background(),
3841                                ))
3842                                .detach();
3843                            Ok(client_conn)
3844                        }
3845                    })
3846                });
3847
3848            client
3849                .authenticate_and_connect(&cx.to_async())
3850                .await
3851                .unwrap();
3852
3853            Channel::init(&client);
3854            Project::init(&client);
3855
3856            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3857            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3858            let mut authed_user =
3859                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3860            while authed_user.next().await.unwrap().is_none() {}
3861
3862            TestClient {
3863                client,
3864                peer_id,
3865                user_store,
3866                project: Default::default(),
3867                buffers: Default::default(),
3868            }
3869        }
3870
3871        fn disconnect_client(&self, user_id: UserId) {
3872            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3873                let _ = kill_conn.try_send(Some(()));
3874            }
3875        }
3876
3877        fn forbid_connections(&self) {
3878            self.forbid_connections.store(true, SeqCst);
3879        }
3880
3881        fn allow_connections(&self) {
3882            self.forbid_connections.store(false, SeqCst);
3883        }
3884
3885        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3886            let mut config = Config::default();
3887            config.session_secret = "a".repeat(32);
3888            config.database_url = test_db.url.clone();
3889            let github_client = github::AppClient::test();
3890            Arc::new(AppState {
3891                db: test_db.db().clone(),
3892                handlebars: Default::default(),
3893                auth_client: auth::build_client("", ""),
3894                repo_client: github::RepoClient::test(&github_client),
3895                github_client,
3896                config,
3897            })
3898        }
3899
3900        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3901            self.server.store.read()
3902        }
3903
3904        async fn condition<F>(&mut self, mut predicate: F)
3905        where
3906            F: FnMut(&Store) -> bool,
3907        {
3908            async_std::future::timeout(Duration::from_millis(500), async {
3909                while !(predicate)(&*self.server.store.read()) {
3910                    self.foreground.start_waiting();
3911                    self.notifications.next().await;
3912                    self.foreground.finish_waiting();
3913                }
3914            })
3915            .await
3916            .expect("condition timed out");
3917        }
3918    }
3919
3920    impl Drop for TestServer {
3921        fn drop(&mut self) {
3922            self.peer.reset();
3923        }
3924    }
3925
3926    struct TestClient {
3927        client: Arc<Client>,
3928        pub peer_id: PeerId,
3929        pub user_store: ModelHandle<UserStore>,
3930        project: Option<ModelHandle<Project>>,
3931        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
3932    }
3933
3934    impl Deref for TestClient {
3935        type Target = Arc<Client>;
3936
3937        fn deref(&self) -> &Self::Target {
3938            &self.client
3939        }
3940    }
3941
3942    impl TestClient {
3943        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3944            UserId::from_proto(
3945                self.user_store
3946                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3947            )
3948        }
3949
3950        async fn simulate_host(
3951            mut self,
3952            project: ModelHandle<Project>,
3953            operations: Rc<Cell<usize>>,
3954            max_operations: usize,
3955            rng: Rc<RefCell<StdRng>>,
3956            mut cx: TestAppContext,
3957        ) -> (Self, TestAppContext) {
3958            let fs = project.read_with(&cx, |project, _| project.fs().clone());
3959            let mut files: Vec<PathBuf> = Default::default();
3960            while operations.get() < max_operations {
3961                operations.set(operations.get() + 1);
3962
3963                let distribution = rng.borrow_mut().gen_range(0..100);
3964                match distribution {
3965                    0..=20 if !files.is_empty() => {
3966                        let mut path = files.choose(&mut *rng.borrow_mut()).unwrap().as_path();
3967                        while let Some(parent_path) = path.parent() {
3968                            path = parent_path;
3969                            if rng.borrow_mut().gen() {
3970                                break;
3971                            }
3972                        }
3973
3974                        log::info!("Host: find/create local worktree {:?}", path);
3975                        project
3976                            .update(&mut cx, |project, cx| {
3977                                project.find_or_create_local_worktree(path, false, cx)
3978                            })
3979                            .await
3980                            .unwrap();
3981                    }
3982                    10..=80 if !files.is_empty() => {
3983                        let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
3984                            let file = files.choose(&mut *rng.borrow_mut()).unwrap();
3985                            let (worktree, path) = project
3986                                .update(&mut cx, |project, cx| {
3987                                    project.find_or_create_local_worktree(file, false, cx)
3988                                })
3989                                .await
3990                                .unwrap();
3991                            let project_path =
3992                                worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
3993                            log::info!("Host: opening path {:?}", project_path);
3994                            let buffer = project
3995                                .update(&mut cx, |project, cx| {
3996                                    project.open_buffer(project_path, cx)
3997                                })
3998                                .await
3999                                .unwrap();
4000                            self.buffers.insert(buffer.clone());
4001                            buffer
4002                        } else {
4003                            self.buffers
4004                                .iter()
4005                                .choose(&mut *rng.borrow_mut())
4006                                .unwrap()
4007                                .clone()
4008                        };
4009
4010                        if rng.borrow_mut().gen_bool(0.1) {
4011                            cx.update(|cx| {
4012                                log::info!(
4013                                    "Host: dropping buffer {:?}",
4014                                    buffer.read(cx).file().unwrap().full_path(cx)
4015                                );
4016                                self.buffers.remove(&buffer);
4017                                drop(buffer);
4018                            });
4019                        } else {
4020                            buffer.update(&mut cx, |buffer, cx| {
4021                                log::info!(
4022                                    "Host: updating buffer {:?}",
4023                                    buffer.file().unwrap().full_path(cx)
4024                                );
4025                                buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4026                            });
4027                        }
4028                    }
4029                    _ => loop {
4030                        let path_component_count = rng.borrow_mut().gen_range(1..=5);
4031                        let mut path = PathBuf::new();
4032                        path.push("/");
4033                        for _ in 0..path_component_count {
4034                            let letter = rng.borrow_mut().gen_range(b'a'..=b'z');
4035                            path.push(std::str::from_utf8(&[letter]).unwrap());
4036                        }
4037                        let parent_path = path.parent().unwrap();
4038
4039                        log::info!("Host: creating file {:?}", path);
4040                        if fs.create_dir(&parent_path).await.is_ok()
4041                            && fs.create_file(&path, Default::default()).await.is_ok()
4042                        {
4043                            files.push(path);
4044                            break;
4045                        } else {
4046                            log::info!("Host: cannot create file");
4047                        }
4048                    },
4049                }
4050
4051                cx.background().simulate_random_delay().await;
4052            }
4053
4054            self.project = Some(project);
4055            (self, cx)
4056        }
4057
4058        pub async fn simulate_guest(
4059            mut self,
4060            guest_id: usize,
4061            project: ModelHandle<Project>,
4062            operations: Rc<Cell<usize>>,
4063            max_operations: usize,
4064            rng: Rc<RefCell<StdRng>>,
4065            mut cx: TestAppContext,
4066        ) -> (Self, TestAppContext) {
4067            while operations.get() < max_operations {
4068                let buffer = if self.buffers.is_empty() || rng.borrow_mut().gen() {
4069                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4070                        project
4071                            .worktrees(&cx)
4072                            .filter(|worktree| {
4073                                worktree.read(cx).entries(false).any(|e| e.is_file())
4074                            })
4075                            .choose(&mut *rng.borrow_mut())
4076                    }) {
4077                        worktree
4078                    } else {
4079                        cx.background().simulate_random_delay().await;
4080                        continue;
4081                    };
4082
4083                    operations.set(operations.get() + 1);
4084                    let project_path = worktree.read_with(&cx, |worktree, _| {
4085                        let entry = worktree
4086                            .entries(false)
4087                            .filter(|e| e.is_file())
4088                            .choose(&mut *rng.borrow_mut())
4089                            .unwrap();
4090                        (worktree.id(), entry.path.clone())
4091                    });
4092                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4093                    let buffer = project
4094                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4095                        .await
4096                        .unwrap();
4097                    self.buffers.insert(buffer.clone());
4098                    buffer
4099                } else {
4100                    self.buffers
4101                        .iter()
4102                        .choose(&mut *rng.borrow_mut())
4103                        .unwrap()
4104                        .clone()
4105                };
4106
4107                if rng.borrow_mut().gen_bool(0.1) {
4108                    cx.update(|cx| {
4109                        log::info!(
4110                            "Guest {}: dropping buffer {:?}",
4111                            guest_id,
4112                            buffer.read(cx).file().unwrap().full_path(cx)
4113                        );
4114                        self.buffers.remove(&buffer);
4115                        drop(buffer);
4116                    });
4117                } else {
4118                    buffer.update(&mut cx, |buffer, cx| {
4119                        log::info!(
4120                            "Guest {}: updating buffer {:?}",
4121                            guest_id,
4122                            buffer.file().unwrap().full_path(cx)
4123                        );
4124                        buffer.randomly_edit(&mut *rng.borrow_mut(), 5, cx)
4125                    });
4126                }
4127
4128                cx.background().simulate_random_delay().await;
4129            }
4130
4131            self.project = Some(project);
4132            (self, cx)
4133        }
4134    }
4135
4136    impl Executor for Arc<gpui::executor::Background> {
4137        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4138            self.spawn(future).detach();
4139        }
4140    }
4141
4142    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4143        channel
4144            .messages()
4145            .cursor::<()>()
4146            .map(|m| {
4147                (
4148                    m.sender.github_login.clone(),
4149                    m.body.clone(),
4150                    m.is_pending(),
4151                )
4152            })
4153            .collect()
4154    }
4155
4156    struct EmptyView;
4157
4158    impl gpui::Entity for EmptyView {
4159        type Event = ();
4160    }
4161
4162    impl gpui::View for EmptyView {
4163        fn ui_name() -> &'static str {
4164            "empty view"
4165        }
4166
4167        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4168            gpui::Element::boxed(gpui::elements::Empty)
4169        }
4170    }
4171}