rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::share_worktree)
  77            .add_request_handler(Server::update_worktree)
  78            .add_message_handler(Server::update_diagnostic_summary)
  79            .add_message_handler(Server::disk_based_diagnostics_updating)
  80            .add_message_handler(Server::disk_based_diagnostics_updated)
  81            .add_request_handler(Server::get_definition)
  82            .add_request_handler(Server::get_references)
  83            .add_request_handler(Server::get_project_symbols)
  84            .add_request_handler(Server::open_buffer_for_symbol)
  85            .add_request_handler(Server::open_buffer)
  86            .add_message_handler(Server::close_buffer)
  87            .add_request_handler(Server::update_buffer)
  88            .add_message_handler(Server::update_buffer_file)
  89            .add_message_handler(Server::buffer_reloaded)
  90            .add_message_handler(Server::buffer_saved)
  91            .add_request_handler(Server::save_buffer)
  92            .add_request_handler(Server::format_buffers)
  93            .add_request_handler(Server::get_completions)
  94            .add_request_handler(Server::apply_additional_edits_for_completion)
  95            .add_request_handler(Server::get_code_actions)
  96            .add_request_handler(Server::apply_code_action)
  97            .add_request_handler(Server::prepare_rename)
  98            .add_request_handler(Server::perform_rename)
  99            .add_request_handler(Server::get_channels)
 100            .add_request_handler(Server::get_users)
 101            .add_request_handler(Server::join_channel)
 102            .add_message_handler(Server::leave_channel)
 103            .add_request_handler(Server::send_channel_message)
 104            .add_request_handler(Server::get_channel_messages);
 105
 106        Arc::new(server)
 107    }
 108
 109    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 110    where
 111        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 112        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 113        M: EnvelopedMessage,
 114    {
 115        let prev_handler = self.handlers.insert(
 116            TypeId::of::<M>(),
 117            Box::new(move |server, envelope| {
 118                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 119                (handler)(server, *envelope).boxed()
 120            }),
 121        );
 122        if prev_handler.is_some() {
 123            panic!("registered a handler for the same message twice");
 124        }
 125        self
 126    }
 127
 128    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 129    where
 130        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 131        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 132        M: RequestMessage,
 133    {
 134        self.add_message_handler(move |server, envelope| {
 135            let receipt = envelope.receipt();
 136            let response = (handler)(server.clone(), envelope);
 137            async move {
 138                match response.await {
 139                    Ok(response) => {
 140                        server.peer.respond(receipt, response)?;
 141                        Ok(())
 142                    }
 143                    Err(error) => {
 144                        server.peer.respond_with_error(
 145                            receipt,
 146                            proto::Error {
 147                                message: error.to_string(),
 148                            },
 149                        )?;
 150                        Err(error)
 151                    }
 152                }
 153            }
 154        })
 155    }
 156
 157    pub fn handle_connection<E: Executor>(
 158        self: &Arc<Self>,
 159        connection: Connection,
 160        addr: String,
 161        user_id: UserId,
 162        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 163        executor: E,
 164    ) -> impl Future<Output = ()> {
 165        let mut this = self.clone();
 166        async move {
 167            let (connection_id, handle_io, mut incoming_rx) =
 168                this.peer.add_connection(connection).await;
 169
 170            if let Some(send_connection_id) = send_connection_id.as_mut() {
 171                let _ = send_connection_id.send(connection_id).await;
 172            }
 173
 174            this.state_mut().add_connection(connection_id, user_id);
 175            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 176                log::error!("error updating contacts for {:?}: {}", user_id, err);
 177            }
 178
 179            let handle_io = handle_io.fuse();
 180            futures::pin_mut!(handle_io);
 181            loop {
 182                let next_message = incoming_rx.next().fuse();
 183                futures::pin_mut!(next_message);
 184                futures::select_biased! {
 185                    result = handle_io => {
 186                        if let Err(err) = result {
 187                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 188                        }
 189                        break;
 190                    }
 191                    message = next_message => {
 192                        if let Some(message) = message {
 193                            let start_time = Instant::now();
 194                            let type_name = message.payload_type_name();
 195                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 196                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 197                                let notifications = this.notifications.clone();
 198                                let is_background = message.is_background();
 199                                let handle_message = (handler)(this.clone(), message);
 200                                let handle_message = async move {
 201                                    if let Err(err) = handle_message.await {
 202                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 203                                    } else {
 204                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 205                                    }
 206                                    if let Some(mut notifications) = notifications {
 207                                        let _ = notifications.send(()).await;
 208                                    }
 209                                };
 210                                if is_background {
 211                                    executor.spawn_detached(handle_message);
 212                                } else {
 213                                    handle_message.await;
 214                                }
 215                            } else {
 216                                log::warn!("unhandled message: {}", type_name);
 217                            }
 218                        } else {
 219                            log::info!("rpc connection closed {:?}", addr);
 220                            break;
 221                        }
 222                    }
 223                }
 224            }
 225
 226            if let Err(err) = this.sign_out(connection_id).await {
 227                log::error!("error signing out connection {:?} - {:?}", addr, err);
 228            }
 229        }
 230    }
 231
 232    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 233        self.peer.disconnect(connection_id);
 234        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 235
 236        for (project_id, project) in removed_connection.hosted_projects {
 237            if let Some(share) = project.share {
 238                broadcast(
 239                    connection_id,
 240                    share.guests.keys().copied().collect(),
 241                    |conn_id| {
 242                        self.peer
 243                            .send(conn_id, proto::UnshareProject { project_id })
 244                    },
 245                )?;
 246            }
 247        }
 248
 249        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 250            broadcast(connection_id, peer_ids, |conn_id| {
 251                self.peer.send(
 252                    conn_id,
 253                    proto::RemoveProjectCollaborator {
 254                        project_id,
 255                        peer_id: connection_id.0,
 256                    },
 257                )
 258            })?;
 259        }
 260
 261        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 262        Ok(())
 263    }
 264
 265    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 266        Ok(proto::Ack {})
 267    }
 268
 269    async fn register_project(
 270        mut self: Arc<Server>,
 271        request: TypedEnvelope<proto::RegisterProject>,
 272    ) -> tide::Result<proto::RegisterProjectResponse> {
 273        let project_id = {
 274            let mut state = self.state_mut();
 275            let user_id = state.user_id_for_connection(request.sender_id)?;
 276            state.register_project(request.sender_id, user_id)
 277        };
 278        Ok(proto::RegisterProjectResponse { project_id })
 279    }
 280
 281    async fn unregister_project(
 282        mut self: Arc<Server>,
 283        request: TypedEnvelope<proto::UnregisterProject>,
 284    ) -> tide::Result<()> {
 285        let project = self
 286            .state_mut()
 287            .unregister_project(request.payload.project_id, request.sender_id)?;
 288        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 289        Ok(())
 290    }
 291
 292    async fn share_project(
 293        mut self: Arc<Server>,
 294        request: TypedEnvelope<proto::ShareProject>,
 295    ) -> tide::Result<proto::Ack> {
 296        self.state_mut()
 297            .share_project(request.payload.project_id, request.sender_id);
 298        Ok(proto::Ack {})
 299    }
 300
 301    async fn unshare_project(
 302        mut self: Arc<Server>,
 303        request: TypedEnvelope<proto::UnshareProject>,
 304    ) -> tide::Result<()> {
 305        let project_id = request.payload.project_id;
 306        let project = self
 307            .state_mut()
 308            .unshare_project(project_id, request.sender_id)?;
 309
 310        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 311            self.peer
 312                .send(conn_id, proto::UnshareProject { project_id })
 313        })?;
 314        self.update_contacts_for_users(&project.authorized_user_ids)?;
 315        Ok(())
 316    }
 317
 318    async fn join_project(
 319        mut self: Arc<Server>,
 320        request: TypedEnvelope<proto::JoinProject>,
 321    ) -> tide::Result<proto::JoinProjectResponse> {
 322        let project_id = request.payload.project_id;
 323
 324        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 325        let (response, connection_ids, contact_user_ids) = self
 326            .state_mut()
 327            .join_project(request.sender_id, user_id, project_id)
 328            .and_then(|joined| {
 329                let share = joined.project.share()?;
 330                let peer_count = share.guests.len();
 331                let mut collaborators = Vec::with_capacity(peer_count);
 332                collaborators.push(proto::Collaborator {
 333                    peer_id: joined.project.host_connection_id.0,
 334                    replica_id: 0,
 335                    user_id: joined.project.host_user_id.to_proto(),
 336                });
 337                let worktrees = joined
 338                    .project
 339                    .worktrees
 340                    .iter()
 341                    .filter_map(|(id, worktree)| {
 342                        worktree.share.as_ref().map(|share| proto::Worktree {
 343                            id: *id,
 344                            root_name: worktree.root_name.clone(),
 345                            entries: share.entries.values().cloned().collect(),
 346                            diagnostic_summaries: share
 347                                .diagnostic_summaries
 348                                .values()
 349                                .cloned()
 350                                .collect(),
 351                            weak: worktree.weak,
 352                            next_update_id: share.next_update_id as u64,
 353                        })
 354                    })
 355                    .collect();
 356                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 357                    if *peer_conn_id != request.sender_id {
 358                        collaborators.push(proto::Collaborator {
 359                            peer_id: peer_conn_id.0,
 360                            replica_id: *peer_replica_id as u32,
 361                            user_id: peer_user_id.to_proto(),
 362                        });
 363                    }
 364                }
 365                let response = proto::JoinProjectResponse {
 366                    worktrees,
 367                    replica_id: joined.replica_id as u32,
 368                    collaborators,
 369                };
 370                let connection_ids = joined.project.connection_ids();
 371                let contact_user_ids = joined.project.authorized_user_ids();
 372                Ok((response, connection_ids, contact_user_ids))
 373            })?;
 374
 375        broadcast(request.sender_id, connection_ids, |conn_id| {
 376            self.peer.send(
 377                conn_id,
 378                proto::AddProjectCollaborator {
 379                    project_id,
 380                    collaborator: Some(proto::Collaborator {
 381                        peer_id: request.sender_id.0,
 382                        replica_id: response.replica_id,
 383                        user_id: user_id.to_proto(),
 384                    }),
 385                },
 386            )
 387        })?;
 388        self.update_contacts_for_users(&contact_user_ids)?;
 389        Ok(response)
 390    }
 391
 392    async fn leave_project(
 393        mut self: Arc<Server>,
 394        request: TypedEnvelope<proto::LeaveProject>,
 395    ) -> tide::Result<()> {
 396        let sender_id = request.sender_id;
 397        let project_id = request.payload.project_id;
 398        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 399
 400        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 401            self.peer.send(
 402                conn_id,
 403                proto::RemoveProjectCollaborator {
 404                    project_id,
 405                    peer_id: sender_id.0,
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 410
 411        Ok(())
 412    }
 413
 414    async fn register_worktree(
 415        mut self: Arc<Server>,
 416        request: TypedEnvelope<proto::RegisterWorktree>,
 417    ) -> tide::Result<proto::Ack> {
 418        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 419
 420        let mut contact_user_ids = HashSet::default();
 421        contact_user_ids.insert(host_user_id);
 422        for github_login in request.payload.authorized_logins {
 423            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 424            contact_user_ids.insert(contact_user_id);
 425        }
 426
 427        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 428        self.state_mut().register_worktree(
 429            request.payload.project_id,
 430            request.payload.worktree_id,
 431            request.sender_id,
 432            Worktree {
 433                authorized_user_ids: contact_user_ids.clone(),
 434                root_name: request.payload.root_name,
 435                share: None,
 436                weak: false,
 437            },
 438        )?;
 439        self.update_contacts_for_users(&contact_user_ids)?;
 440        Ok(proto::Ack {})
 441    }
 442
 443    async fn unregister_worktree(
 444        mut self: Arc<Server>,
 445        request: TypedEnvelope<proto::UnregisterWorktree>,
 446    ) -> tide::Result<()> {
 447        let project_id = request.payload.project_id;
 448        let worktree_id = request.payload.worktree_id;
 449        let (worktree, guest_connection_ids) =
 450            self.state_mut()
 451                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 452        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 453            self.peer.send(
 454                conn_id,
 455                proto::UnregisterWorktree {
 456                    project_id,
 457                    worktree_id,
 458                },
 459            )
 460        })?;
 461        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 462        Ok(())
 463    }
 464
 465    async fn share_worktree(
 466        mut self: Arc<Server>,
 467        mut request: TypedEnvelope<proto::ShareWorktree>,
 468    ) -> tide::Result<proto::Ack> {
 469        let worktree = request
 470            .payload
 471            .worktree
 472            .as_mut()
 473            .ok_or_else(|| anyhow!("missing worktree"))?;
 474        let entries = worktree
 475            .entries
 476            .iter()
 477            .map(|entry| (entry.id, entry.clone()))
 478            .collect();
 479        let diagnostic_summaries = worktree
 480            .diagnostic_summaries
 481            .iter()
 482            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 483            .collect();
 484
 485        let shared_worktree = self.state_mut().share_worktree(
 486            request.payload.project_id,
 487            worktree.id,
 488            request.sender_id,
 489            entries,
 490            diagnostic_summaries,
 491            worktree.next_update_id,
 492        )?;
 493
 494        broadcast(
 495            request.sender_id,
 496            shared_worktree.connection_ids,
 497            |connection_id| {
 498                self.peer
 499                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 500            },
 501        )?;
 502        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 503
 504        Ok(proto::Ack {})
 505    }
 506
 507    async fn update_worktree(
 508        mut self: Arc<Server>,
 509        request: TypedEnvelope<proto::UpdateWorktree>,
 510    ) -> tide::Result<proto::Ack> {
 511        let connection_ids = self.state_mut().update_worktree(
 512            request.sender_id,
 513            request.payload.project_id,
 514            request.payload.worktree_id,
 515            request.payload.id,
 516            &request.payload.removed_entries,
 517            &request.payload.updated_entries,
 518        )?;
 519
 520        broadcast(request.sender_id, connection_ids, |connection_id| {
 521            self.peer
 522                .forward_send(request.sender_id, connection_id, request.payload.clone())
 523        })?;
 524
 525        Ok(proto::Ack {})
 526    }
 527
 528    async fn update_diagnostic_summary(
 529        mut self: Arc<Server>,
 530        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 531    ) -> tide::Result<()> {
 532        let summary = request
 533            .payload
 534            .summary
 535            .clone()
 536            .ok_or_else(|| anyhow!("invalid summary"))?;
 537        let receiver_ids = self.state_mut().update_diagnostic_summary(
 538            request.payload.project_id,
 539            request.payload.worktree_id,
 540            request.sender_id,
 541            summary,
 542        )?;
 543
 544        broadcast(request.sender_id, receiver_ids, |connection_id| {
 545            self.peer
 546                .forward_send(request.sender_id, connection_id, request.payload.clone())
 547        })?;
 548        Ok(())
 549    }
 550
 551    async fn disk_based_diagnostics_updating(
 552        self: Arc<Server>,
 553        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 554    ) -> tide::Result<()> {
 555        let receiver_ids = self
 556            .state()
 557            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 558        broadcast(request.sender_id, receiver_ids, |connection_id| {
 559            self.peer
 560                .forward_send(request.sender_id, connection_id, request.payload.clone())
 561        })?;
 562        Ok(())
 563    }
 564
 565    async fn disk_based_diagnostics_updated(
 566        self: Arc<Server>,
 567        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 568    ) -> tide::Result<()> {
 569        let receiver_ids = self
 570            .state()
 571            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 572        broadcast(request.sender_id, receiver_ids, |connection_id| {
 573            self.peer
 574                .forward_send(request.sender_id, connection_id, request.payload.clone())
 575        })?;
 576        Ok(())
 577    }
 578
 579    async fn get_definition(
 580        self: Arc<Server>,
 581        request: TypedEnvelope<proto::GetDefinition>,
 582    ) -> tide::Result<proto::GetDefinitionResponse> {
 583        let host_connection_id = self
 584            .state()
 585            .read_project(request.payload.project_id, request.sender_id)?
 586            .host_connection_id;
 587        Ok(self
 588            .peer
 589            .forward_request(request.sender_id, host_connection_id, request.payload)
 590            .await?)
 591    }
 592
 593    async fn get_references(
 594        self: Arc<Server>,
 595        request: TypedEnvelope<proto::GetReferences>,
 596    ) -> tide::Result<proto::GetReferencesResponse> {
 597        let host_connection_id = self
 598            .state()
 599            .read_project(request.payload.project_id, request.sender_id)?
 600            .host_connection_id;
 601        Ok(self
 602            .peer
 603            .forward_request(request.sender_id, host_connection_id, request.payload)
 604            .await?)
 605    }
 606
 607    async fn get_project_symbols(
 608        self: Arc<Server>,
 609        request: TypedEnvelope<proto::GetProjectSymbols>,
 610    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 611        let host_connection_id = self
 612            .state()
 613            .read_project(request.payload.project_id, request.sender_id)?
 614            .host_connection_id;
 615        Ok(self
 616            .peer
 617            .forward_request(request.sender_id, host_connection_id, request.payload)
 618            .await?)
 619    }
 620
 621    async fn open_buffer_for_symbol(
 622        self: Arc<Server>,
 623        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 624    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 625        let host_connection_id = self
 626            .state()
 627            .read_project(request.payload.project_id, request.sender_id)?
 628            .host_connection_id;
 629        Ok(self
 630            .peer
 631            .forward_request(request.sender_id, host_connection_id, request.payload)
 632            .await?)
 633    }
 634
 635    async fn open_buffer(
 636        self: Arc<Server>,
 637        request: TypedEnvelope<proto::OpenBuffer>,
 638    ) -> tide::Result<proto::OpenBufferResponse> {
 639        let host_connection_id = self
 640            .state()
 641            .read_project(request.payload.project_id, request.sender_id)?
 642            .host_connection_id;
 643        Ok(self
 644            .peer
 645            .forward_request(request.sender_id, host_connection_id, request.payload)
 646            .await?)
 647    }
 648
 649    async fn close_buffer(
 650        self: Arc<Server>,
 651        request: TypedEnvelope<proto::CloseBuffer>,
 652    ) -> tide::Result<()> {
 653        let host_connection_id = self
 654            .state()
 655            .read_project(request.payload.project_id, request.sender_id)?
 656            .host_connection_id;
 657        self.peer
 658            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 659        Ok(())
 660    }
 661
 662    async fn save_buffer(
 663        self: Arc<Server>,
 664        request: TypedEnvelope<proto::SaveBuffer>,
 665    ) -> tide::Result<proto::BufferSaved> {
 666        let host;
 667        let mut guests;
 668        {
 669            let state = self.state();
 670            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 671            host = project.host_connection_id;
 672            guests = project.guest_connection_ids()
 673        }
 674
 675        let response = self
 676            .peer
 677            .forward_request(request.sender_id, host, request.payload.clone())
 678            .await?;
 679
 680        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 681        broadcast(host, guests, |conn_id| {
 682            self.peer.forward_send(host, conn_id, response.clone())
 683        })?;
 684
 685        Ok(response)
 686    }
 687
 688    async fn format_buffers(
 689        self: Arc<Server>,
 690        request: TypedEnvelope<proto::FormatBuffers>,
 691    ) -> tide::Result<proto::FormatBuffersResponse> {
 692        let host = self
 693            .state()
 694            .read_project(request.payload.project_id, request.sender_id)?
 695            .host_connection_id;
 696        Ok(self
 697            .peer
 698            .forward_request(request.sender_id, host, request.payload.clone())
 699            .await?)
 700    }
 701
 702    async fn get_completions(
 703        self: Arc<Server>,
 704        request: TypedEnvelope<proto::GetCompletions>,
 705    ) -> tide::Result<proto::GetCompletionsResponse> {
 706        let host = self
 707            .state()
 708            .read_project(request.payload.project_id, request.sender_id)?
 709            .host_connection_id;
 710        Ok(self
 711            .peer
 712            .forward_request(request.sender_id, host, request.payload.clone())
 713            .await?)
 714    }
 715
 716    async fn apply_additional_edits_for_completion(
 717        self: Arc<Server>,
 718        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 719    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 720        let host = self
 721            .state()
 722            .read_project(request.payload.project_id, request.sender_id)?
 723            .host_connection_id;
 724        Ok(self
 725            .peer
 726            .forward_request(request.sender_id, host, request.payload.clone())
 727            .await?)
 728    }
 729
 730    async fn get_code_actions(
 731        self: Arc<Server>,
 732        request: TypedEnvelope<proto::GetCodeActions>,
 733    ) -> tide::Result<proto::GetCodeActionsResponse> {
 734        let host = self
 735            .state()
 736            .read_project(request.payload.project_id, request.sender_id)?
 737            .host_connection_id;
 738        Ok(self
 739            .peer
 740            .forward_request(request.sender_id, host, request.payload.clone())
 741            .await?)
 742    }
 743
 744    async fn apply_code_action(
 745        self: Arc<Server>,
 746        request: TypedEnvelope<proto::ApplyCodeAction>,
 747    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 748        let host = self
 749            .state()
 750            .read_project(request.payload.project_id, request.sender_id)?
 751            .host_connection_id;
 752        Ok(self
 753            .peer
 754            .forward_request(request.sender_id, host, request.payload.clone())
 755            .await?)
 756    }
 757
 758    async fn prepare_rename(
 759        self: Arc<Server>,
 760        request: TypedEnvelope<proto::PrepareRename>,
 761    ) -> tide::Result<proto::PrepareRenameResponse> {
 762        let host = self
 763            .state()
 764            .read_project(request.payload.project_id, request.sender_id)?
 765            .host_connection_id;
 766        Ok(self
 767            .peer
 768            .forward_request(request.sender_id, host, request.payload.clone())
 769            .await?)
 770    }
 771
 772    async fn perform_rename(
 773        self: Arc<Server>,
 774        request: TypedEnvelope<proto::PerformRename>,
 775    ) -> tide::Result<proto::PerformRenameResponse> {
 776        let host = self
 777            .state()
 778            .read_project(request.payload.project_id, request.sender_id)?
 779            .host_connection_id;
 780        Ok(self
 781            .peer
 782            .forward_request(request.sender_id, host, request.payload.clone())
 783            .await?)
 784    }
 785
 786    async fn update_buffer(
 787        self: Arc<Server>,
 788        request: TypedEnvelope<proto::UpdateBuffer>,
 789    ) -> tide::Result<proto::Ack> {
 790        let receiver_ids = self
 791            .state()
 792            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 793        broadcast(request.sender_id, receiver_ids, |connection_id| {
 794            self.peer
 795                .forward_send(request.sender_id, connection_id, request.payload.clone())
 796        })?;
 797        Ok(proto::Ack {})
 798    }
 799
 800    async fn update_buffer_file(
 801        self: Arc<Server>,
 802        request: TypedEnvelope<proto::UpdateBufferFile>,
 803    ) -> tide::Result<()> {
 804        let receiver_ids = self
 805            .state()
 806            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 807        broadcast(request.sender_id, receiver_ids, |connection_id| {
 808            self.peer
 809                .forward_send(request.sender_id, connection_id, request.payload.clone())
 810        })?;
 811        Ok(())
 812    }
 813
 814    async fn buffer_reloaded(
 815        self: Arc<Server>,
 816        request: TypedEnvelope<proto::BufferReloaded>,
 817    ) -> tide::Result<()> {
 818        let receiver_ids = self
 819            .state()
 820            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 821        broadcast(request.sender_id, receiver_ids, |connection_id| {
 822            self.peer
 823                .forward_send(request.sender_id, connection_id, request.payload.clone())
 824        })?;
 825        Ok(())
 826    }
 827
 828    async fn buffer_saved(
 829        self: Arc<Server>,
 830        request: TypedEnvelope<proto::BufferSaved>,
 831    ) -> tide::Result<()> {
 832        let receiver_ids = self
 833            .state()
 834            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 835        broadcast(request.sender_id, receiver_ids, |connection_id| {
 836            self.peer
 837                .forward_send(request.sender_id, connection_id, request.payload.clone())
 838        })?;
 839        Ok(())
 840    }
 841
 842    async fn get_channels(
 843        self: Arc<Server>,
 844        request: TypedEnvelope<proto::GetChannels>,
 845    ) -> tide::Result<proto::GetChannelsResponse> {
 846        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 847        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 848        Ok(proto::GetChannelsResponse {
 849            channels: channels
 850                .into_iter()
 851                .map(|chan| proto::Channel {
 852                    id: chan.id.to_proto(),
 853                    name: chan.name,
 854                })
 855                .collect(),
 856        })
 857    }
 858
 859    async fn get_users(
 860        self: Arc<Server>,
 861        request: TypedEnvelope<proto::GetUsers>,
 862    ) -> tide::Result<proto::GetUsersResponse> {
 863        let user_ids = request
 864            .payload
 865            .user_ids
 866            .into_iter()
 867            .map(UserId::from_proto)
 868            .collect();
 869        let users = self
 870            .app_state
 871            .db
 872            .get_users_by_ids(user_ids)
 873            .await?
 874            .into_iter()
 875            .map(|user| proto::User {
 876                id: user.id.to_proto(),
 877                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 878                github_login: user.github_login,
 879            })
 880            .collect();
 881        Ok(proto::GetUsersResponse { users })
 882    }
 883
 884    fn update_contacts_for_users<'a>(
 885        self: &Arc<Server>,
 886        user_ids: impl IntoIterator<Item = &'a UserId>,
 887    ) -> anyhow::Result<()> {
 888        let mut result = Ok(());
 889        let state = self.state();
 890        for user_id in user_ids {
 891            let contacts = state.contacts_for_user(*user_id);
 892            for connection_id in state.connection_ids_for_user(*user_id) {
 893                if let Err(error) = self.peer.send(
 894                    connection_id,
 895                    proto::UpdateContacts {
 896                        contacts: contacts.clone(),
 897                    },
 898                ) {
 899                    result = Err(error);
 900                }
 901            }
 902        }
 903        result
 904    }
 905
 906    async fn join_channel(
 907        mut self: Arc<Self>,
 908        request: TypedEnvelope<proto::JoinChannel>,
 909    ) -> tide::Result<proto::JoinChannelResponse> {
 910        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 911        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 912        if !self
 913            .app_state
 914            .db
 915            .can_user_access_channel(user_id, channel_id)
 916            .await?
 917        {
 918            Err(anyhow!("access denied"))?;
 919        }
 920
 921        self.state_mut().join_channel(request.sender_id, channel_id);
 922        let messages = self
 923            .app_state
 924            .db
 925            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 926            .await?
 927            .into_iter()
 928            .map(|msg| proto::ChannelMessage {
 929                id: msg.id.to_proto(),
 930                body: msg.body,
 931                timestamp: msg.sent_at.unix_timestamp() as u64,
 932                sender_id: msg.sender_id.to_proto(),
 933                nonce: Some(msg.nonce.as_u128().into()),
 934            })
 935            .collect::<Vec<_>>();
 936        Ok(proto::JoinChannelResponse {
 937            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 938            messages,
 939        })
 940    }
 941
 942    async fn leave_channel(
 943        mut self: Arc<Self>,
 944        request: TypedEnvelope<proto::LeaveChannel>,
 945    ) -> tide::Result<()> {
 946        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 947        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 948        if !self
 949            .app_state
 950            .db
 951            .can_user_access_channel(user_id, channel_id)
 952            .await?
 953        {
 954            Err(anyhow!("access denied"))?;
 955        }
 956
 957        self.state_mut()
 958            .leave_channel(request.sender_id, channel_id);
 959
 960        Ok(())
 961    }
 962
 963    async fn send_channel_message(
 964        self: Arc<Self>,
 965        request: TypedEnvelope<proto::SendChannelMessage>,
 966    ) -> tide::Result<proto::SendChannelMessageResponse> {
 967        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 968        let user_id;
 969        let connection_ids;
 970        {
 971            let state = self.state();
 972            user_id = state.user_id_for_connection(request.sender_id)?;
 973            connection_ids = state.channel_connection_ids(channel_id)?;
 974        }
 975
 976        // Validate the message body.
 977        let body = request.payload.body.trim().to_string();
 978        if body.len() > MAX_MESSAGE_LEN {
 979            return Err(anyhow!("message is too long"))?;
 980        }
 981        if body.is_empty() {
 982            return Err(anyhow!("message can't be blank"))?;
 983        }
 984
 985        let timestamp = OffsetDateTime::now_utc();
 986        let nonce = request
 987            .payload
 988            .nonce
 989            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 990
 991        let message_id = self
 992            .app_state
 993            .db
 994            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 995            .await?
 996            .to_proto();
 997        let message = proto::ChannelMessage {
 998            sender_id: user_id.to_proto(),
 999            id: message_id,
1000            body,
1001            timestamp: timestamp.unix_timestamp() as u64,
1002            nonce: Some(nonce),
1003        };
1004        broadcast(request.sender_id, connection_ids, |conn_id| {
1005            self.peer.send(
1006                conn_id,
1007                proto::ChannelMessageSent {
1008                    channel_id: channel_id.to_proto(),
1009                    message: Some(message.clone()),
1010                },
1011            )
1012        })?;
1013        Ok(proto::SendChannelMessageResponse {
1014            message: Some(message),
1015        })
1016    }
1017
1018    async fn get_channel_messages(
1019        self: Arc<Self>,
1020        request: TypedEnvelope<proto::GetChannelMessages>,
1021    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1022        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1023        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1024        if !self
1025            .app_state
1026            .db
1027            .can_user_access_channel(user_id, channel_id)
1028            .await?
1029        {
1030            Err(anyhow!("access denied"))?;
1031        }
1032
1033        let messages = self
1034            .app_state
1035            .db
1036            .get_channel_messages(
1037                channel_id,
1038                MESSAGE_COUNT_PER_PAGE,
1039                Some(MessageId::from_proto(request.payload.before_message_id)),
1040            )
1041            .await?
1042            .into_iter()
1043            .map(|msg| proto::ChannelMessage {
1044                id: msg.id.to_proto(),
1045                body: msg.body,
1046                timestamp: msg.sent_at.unix_timestamp() as u64,
1047                sender_id: msg.sender_id.to_proto(),
1048                nonce: Some(msg.nonce.as_u128().into()),
1049            })
1050            .collect::<Vec<_>>();
1051
1052        Ok(proto::GetChannelMessagesResponse {
1053            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1054            messages,
1055        })
1056    }
1057
1058    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1059        self.store.read()
1060    }
1061
1062    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1063        self.store.write()
1064    }
1065}
1066
1067impl Executor for RealExecutor {
1068    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1069        task::spawn(future);
1070    }
1071}
1072
1073fn broadcast<F>(
1074    sender_id: ConnectionId,
1075    receiver_ids: Vec<ConnectionId>,
1076    mut f: F,
1077) -> anyhow::Result<()>
1078where
1079    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1080{
1081    let mut result = Ok(());
1082    for receiver_id in receiver_ids {
1083        if receiver_id != sender_id {
1084            if let Err(error) = f(receiver_id) {
1085                if result.is_ok() {
1086                    result = Err(error);
1087                }
1088            }
1089        }
1090    }
1091    result
1092}
1093
1094pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1095    let server = Server::new(app.state().clone(), rpc.clone(), None);
1096    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1097        let server = server.clone();
1098        async move {
1099            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1100
1101            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1102            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1103            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1104            let client_protocol_version: Option<u32> = request
1105                .header("X-Zed-Protocol-Version")
1106                .and_then(|v| v.as_str().parse().ok());
1107
1108            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1109                return Ok(Response::new(StatusCode::UpgradeRequired));
1110            }
1111
1112            let header = match request.header("Sec-Websocket-Key") {
1113                Some(h) => h.as_str(),
1114                None => return Err(anyhow!("expected sec-websocket-key"))?,
1115            };
1116
1117            let user_id = process_auth_header(&request).await?;
1118
1119            let mut response = Response::new(StatusCode::SwitchingProtocols);
1120            response.insert_header(UPGRADE, "websocket");
1121            response.insert_header(CONNECTION, "Upgrade");
1122            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1123            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1124            response.insert_header("Sec-Websocket-Version", "13");
1125
1126            let http_res: &mut tide::http::Response = response.as_mut();
1127            let upgrade_receiver = http_res.recv_upgrade().await;
1128            let addr = request.remote().unwrap_or("unknown").to_string();
1129            task::spawn(async move {
1130                if let Some(stream) = upgrade_receiver.await {
1131                    server
1132                        .handle_connection(
1133                            Connection::new(
1134                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1135                            ),
1136                            addr,
1137                            user_id,
1138                            None,
1139                            RealExecutor,
1140                        )
1141                        .await;
1142                }
1143            });
1144
1145            Ok(response)
1146        }
1147    });
1148}
1149
1150fn header_contains_ignore_case<T>(
1151    request: &tide::Request<T>,
1152    header_name: HeaderName,
1153    value: &str,
1154) -> bool {
1155    request
1156        .header(header_name)
1157        .map(|h| {
1158            h.as_str()
1159                .split(',')
1160                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1161        })
1162        .unwrap_or(false)
1163}
1164
1165#[cfg(test)]
1166mod tests {
1167    use super::*;
1168    use crate::{
1169        auth,
1170        db::{tests::TestDb, UserId},
1171        github, AppState, Config,
1172    };
1173    use ::rpc::Peer;
1174    use collections::BTreeMap;
1175    use gpui::{executor, ModelHandle, TestAppContext};
1176    use parking_lot::Mutex;
1177    use postage::{sink::Sink, watch};
1178    use rand::prelude::*;
1179    use rpc::PeerId;
1180    use serde_json::json;
1181    use sqlx::types::time::OffsetDateTime;
1182    use std::{
1183        cell::Cell,
1184        env,
1185        ops::Deref,
1186        path::Path,
1187        rc::Rc,
1188        sync::{
1189            atomic::{AtomicBool, Ordering::SeqCst},
1190            Arc,
1191        },
1192        time::Duration,
1193    };
1194    use zed::{
1195        client::{
1196            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1197            EstablishConnectionError, UserStore,
1198        },
1199        editor::{
1200            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1201            Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1202        },
1203        fs::{FakeFs, Fs as _},
1204        language::{
1205            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1206            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1207        },
1208        lsp,
1209        project::{DiagnosticSummary, Project, ProjectPath},
1210        workspace::{Workspace, WorkspaceParams},
1211    };
1212
1213    #[cfg(test)]
1214    #[ctor::ctor]
1215    fn init_logger() {
1216        if std::env::var("RUST_LOG").is_ok() {
1217            env_logger::init();
1218        }
1219    }
1220
1221    #[gpui::test(iterations = 10)]
1222    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1223        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1224        let lang_registry = Arc::new(LanguageRegistry::new());
1225        let fs = FakeFs::new(cx_a.background());
1226        cx_a.foreground().forbid_parking();
1227
1228        // Connect to a server as 2 clients.
1229        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1230        let client_a = server.create_client(&mut cx_a, "user_a").await;
1231        let client_b = server.create_client(&mut cx_b, "user_b").await;
1232
1233        // Share a project as client A
1234        fs.insert_tree(
1235            "/a",
1236            json!({
1237                ".zed.toml": r#"collaborators = ["user_b"]"#,
1238                "a.txt": "a-contents",
1239                "b.txt": "b-contents",
1240            }),
1241        )
1242        .await;
1243        let project_a = cx_a.update(|cx| {
1244            Project::local(
1245                client_a.clone(),
1246                client_a.user_store.clone(),
1247                lang_registry.clone(),
1248                fs.clone(),
1249                cx,
1250            )
1251        });
1252        let (worktree_a, _) = project_a
1253            .update(&mut cx_a, |p, cx| {
1254                p.find_or_create_local_worktree("/a", false, cx)
1255            })
1256            .await
1257            .unwrap();
1258        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1259        worktree_a
1260            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1261            .await;
1262        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1263        project_a
1264            .update(&mut cx_a, |p, cx| p.share(cx))
1265            .await
1266            .unwrap();
1267
1268        // Join that project as client B
1269        let project_b = Project::remote(
1270            project_id,
1271            client_b.clone(),
1272            client_b.user_store.clone(),
1273            lang_registry.clone(),
1274            fs.clone(),
1275            &mut cx_b.to_async(),
1276        )
1277        .await
1278        .unwrap();
1279
1280        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1281            assert_eq!(
1282                project
1283                    .collaborators()
1284                    .get(&client_a.peer_id)
1285                    .unwrap()
1286                    .user
1287                    .github_login,
1288                "user_a"
1289            );
1290            project.replica_id()
1291        });
1292        project_a
1293            .condition(&cx_a, |tree, _| {
1294                tree.collaborators()
1295                    .get(&client_b.peer_id)
1296                    .map_or(false, |collaborator| {
1297                        collaborator.replica_id == replica_id_b
1298                            && collaborator.user.github_login == "user_b"
1299                    })
1300            })
1301            .await;
1302
1303        // Open the same file as client B and client A.
1304        let buffer_b = project_b
1305            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1306            .await
1307            .unwrap();
1308        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1309        buffer_b.read_with(&cx_b, |buf, cx| {
1310            assert_eq!(buf.read(cx).text(), "b-contents")
1311        });
1312        project_a.read_with(&cx_a, |project, cx| {
1313            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1314        });
1315        let buffer_a = project_a
1316            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1317            .await
1318            .unwrap();
1319
1320        let editor_b = cx_b.add_view(window_b, |cx| {
1321            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1322        });
1323
1324        // TODO
1325        // // Create a selection set as client B and see that selection set as client A.
1326        // buffer_a
1327        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1328        //     .await;
1329
1330        // Edit the buffer as client B and see that edit as client A.
1331        editor_b.update(&mut cx_b, |editor, cx| {
1332            editor.handle_input(&Input("ok, ".into()), cx)
1333        });
1334        buffer_a
1335            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1336            .await;
1337
1338        // TODO
1339        // // Remove the selection set as client B, see those selections disappear as client A.
1340        cx_b.update(move |_| drop(editor_b));
1341        // buffer_a
1342        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1343        //     .await;
1344
1345        // Close the buffer as client A, see that the buffer is closed.
1346        cx_a.update(move |_| drop(buffer_a));
1347        project_a
1348            .condition(&cx_a, |project, cx| {
1349                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1350            })
1351            .await;
1352
1353        // Dropping the client B's project removes client B from client A's collaborators.
1354        cx_b.update(move |_| drop(project_b));
1355        project_a
1356            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1357            .await;
1358    }
1359
1360    #[gpui::test(iterations = 10)]
1361    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1362        let lang_registry = Arc::new(LanguageRegistry::new());
1363        let fs = FakeFs::new(cx_a.background());
1364        cx_a.foreground().forbid_parking();
1365
1366        // Connect to a server as 2 clients.
1367        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1368        let client_a = server.create_client(&mut cx_a, "user_a").await;
1369        let client_b = server.create_client(&mut cx_b, "user_b").await;
1370
1371        // Share a project as client A
1372        fs.insert_tree(
1373            "/a",
1374            json!({
1375                ".zed.toml": r#"collaborators = ["user_b"]"#,
1376                "a.txt": "a-contents",
1377                "b.txt": "b-contents",
1378            }),
1379        )
1380        .await;
1381        let project_a = cx_a.update(|cx| {
1382            Project::local(
1383                client_a.clone(),
1384                client_a.user_store.clone(),
1385                lang_registry.clone(),
1386                fs.clone(),
1387                cx,
1388            )
1389        });
1390        let (worktree_a, _) = project_a
1391            .update(&mut cx_a, |p, cx| {
1392                p.find_or_create_local_worktree("/a", false, cx)
1393            })
1394            .await
1395            .unwrap();
1396        worktree_a
1397            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1398            .await;
1399        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1400        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1401        project_a
1402            .update(&mut cx_a, |p, cx| p.share(cx))
1403            .await
1404            .unwrap();
1405        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1406
1407        // Join that project as client B
1408        let project_b = Project::remote(
1409            project_id,
1410            client_b.clone(),
1411            client_b.user_store.clone(),
1412            lang_registry.clone(),
1413            fs.clone(),
1414            &mut cx_b.to_async(),
1415        )
1416        .await
1417        .unwrap();
1418        project_b
1419            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1420            .await
1421            .unwrap();
1422
1423        // Unshare the project as client A
1424        project_a
1425            .update(&mut cx_a, |project, cx| project.unshare(cx))
1426            .await
1427            .unwrap();
1428        project_b
1429            .condition(&mut cx_b, |project, _| project.is_read_only())
1430            .await;
1431        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1432        drop(project_b);
1433
1434        // Share the project again and ensure guests can still join.
1435        project_a
1436            .update(&mut cx_a, |project, cx| project.share(cx))
1437            .await
1438            .unwrap();
1439        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1440
1441        let project_c = Project::remote(
1442            project_id,
1443            client_b.clone(),
1444            client_b.user_store.clone(),
1445            lang_registry.clone(),
1446            fs.clone(),
1447            &mut cx_b.to_async(),
1448        )
1449        .await
1450        .unwrap();
1451        project_c
1452            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1453            .await
1454            .unwrap();
1455    }
1456
1457    #[gpui::test(iterations = 10)]
1458    async fn test_propagate_saves_and_fs_changes(
1459        mut cx_a: TestAppContext,
1460        mut cx_b: TestAppContext,
1461        mut cx_c: TestAppContext,
1462    ) {
1463        let lang_registry = Arc::new(LanguageRegistry::new());
1464        let fs = FakeFs::new(cx_a.background());
1465        cx_a.foreground().forbid_parking();
1466
1467        // Connect to a server as 3 clients.
1468        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1469        let client_a = server.create_client(&mut cx_a, "user_a").await;
1470        let client_b = server.create_client(&mut cx_b, "user_b").await;
1471        let client_c = server.create_client(&mut cx_c, "user_c").await;
1472
1473        // Share a worktree as client A.
1474        fs.insert_tree(
1475            "/a",
1476            json!({
1477                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1478                "file1": "",
1479                "file2": ""
1480            }),
1481        )
1482        .await;
1483        let project_a = cx_a.update(|cx| {
1484            Project::local(
1485                client_a.clone(),
1486                client_a.user_store.clone(),
1487                lang_registry.clone(),
1488                fs.clone(),
1489                cx,
1490            )
1491        });
1492        let (worktree_a, _) = project_a
1493            .update(&mut cx_a, |p, cx| {
1494                p.find_or_create_local_worktree("/a", false, cx)
1495            })
1496            .await
1497            .unwrap();
1498        worktree_a
1499            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1500            .await;
1501        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1502        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1503        project_a
1504            .update(&mut cx_a, |p, cx| p.share(cx))
1505            .await
1506            .unwrap();
1507
1508        // Join that worktree as clients B and C.
1509        let project_b = Project::remote(
1510            project_id,
1511            client_b.clone(),
1512            client_b.user_store.clone(),
1513            lang_registry.clone(),
1514            fs.clone(),
1515            &mut cx_b.to_async(),
1516        )
1517        .await
1518        .unwrap();
1519        let project_c = Project::remote(
1520            project_id,
1521            client_c.clone(),
1522            client_c.user_store.clone(),
1523            lang_registry.clone(),
1524            fs.clone(),
1525            &mut cx_c.to_async(),
1526        )
1527        .await
1528        .unwrap();
1529        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1530        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1531
1532        // Open and edit a buffer as both guests B and C.
1533        let buffer_b = project_b
1534            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1535            .await
1536            .unwrap();
1537        let buffer_c = project_c
1538            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1539            .await
1540            .unwrap();
1541        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1542        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1543
1544        // Open and edit that buffer as the host.
1545        let buffer_a = project_a
1546            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1547            .await
1548            .unwrap();
1549
1550        buffer_a
1551            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1552            .await;
1553        buffer_a.update(&mut cx_a, |buf, cx| {
1554            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1555        });
1556
1557        // Wait for edits to propagate
1558        buffer_a
1559            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1560            .await;
1561        buffer_b
1562            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1563            .await;
1564        buffer_c
1565            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1566            .await;
1567
1568        // Edit the buffer as the host and concurrently save as guest B.
1569        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1570        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1571        save_b.await.unwrap();
1572        assert_eq!(
1573            fs.load("/a/file1".as_ref()).await.unwrap(),
1574            "hi-a, i-am-c, i-am-b, i-am-a"
1575        );
1576        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1577        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1578        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1579
1580        // Make changes on host's file system, see those changes on guest worktrees.
1581        fs.rename(
1582            "/a/file1".as_ref(),
1583            "/a/file1-renamed".as_ref(),
1584            Default::default(),
1585        )
1586        .await
1587        .unwrap();
1588
1589        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1590            .await
1591            .unwrap();
1592        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1593
1594        worktree_a
1595            .condition(&cx_a, |tree, _| {
1596                tree.paths()
1597                    .map(|p| p.to_string_lossy())
1598                    .collect::<Vec<_>>()
1599                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1600            })
1601            .await;
1602        worktree_b
1603            .condition(&cx_b, |tree, _| {
1604                tree.paths()
1605                    .map(|p| p.to_string_lossy())
1606                    .collect::<Vec<_>>()
1607                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1608            })
1609            .await;
1610        worktree_c
1611            .condition(&cx_c, |tree, _| {
1612                tree.paths()
1613                    .map(|p| p.to_string_lossy())
1614                    .collect::<Vec<_>>()
1615                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1616            })
1617            .await;
1618
1619        // Ensure buffer files are updated as well.
1620        buffer_a
1621            .condition(&cx_a, |buf, _| {
1622                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1623            })
1624            .await;
1625        buffer_b
1626            .condition(&cx_b, |buf, _| {
1627                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1628            })
1629            .await;
1630        buffer_c
1631            .condition(&cx_c, |buf, _| {
1632                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1633            })
1634            .await;
1635    }
1636
1637    #[gpui::test(iterations = 10)]
1638    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1639        cx_a.foreground().forbid_parking();
1640        let lang_registry = Arc::new(LanguageRegistry::new());
1641        let fs = FakeFs::new(cx_a.background());
1642
1643        // Connect to a server as 2 clients.
1644        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1645        let client_a = server.create_client(&mut cx_a, "user_a").await;
1646        let client_b = server.create_client(&mut cx_b, "user_b").await;
1647
1648        // Share a project as client A
1649        fs.insert_tree(
1650            "/dir",
1651            json!({
1652                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1653                "a.txt": "a-contents",
1654            }),
1655        )
1656        .await;
1657
1658        let project_a = cx_a.update(|cx| {
1659            Project::local(
1660                client_a.clone(),
1661                client_a.user_store.clone(),
1662                lang_registry.clone(),
1663                fs.clone(),
1664                cx,
1665            )
1666        });
1667        let (worktree_a, _) = project_a
1668            .update(&mut cx_a, |p, cx| {
1669                p.find_or_create_local_worktree("/dir", false, cx)
1670            })
1671            .await
1672            .unwrap();
1673        worktree_a
1674            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675            .await;
1676        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1677        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1678        project_a
1679            .update(&mut cx_a, |p, cx| p.share(cx))
1680            .await
1681            .unwrap();
1682
1683        // Join that project as client B
1684        let project_b = Project::remote(
1685            project_id,
1686            client_b.clone(),
1687            client_b.user_store.clone(),
1688            lang_registry.clone(),
1689            fs.clone(),
1690            &mut cx_b.to_async(),
1691        )
1692        .await
1693        .unwrap();
1694
1695        // Open a buffer as client B
1696        let buffer_b = project_b
1697            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1698            .await
1699            .unwrap();
1700
1701        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1702        buffer_b.read_with(&cx_b, |buf, _| {
1703            assert!(buf.is_dirty());
1704            assert!(!buf.has_conflict());
1705        });
1706
1707        buffer_b
1708            .update(&mut cx_b, |buf, cx| buf.save(cx))
1709            .await
1710            .unwrap();
1711        buffer_b
1712            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1713            .await;
1714        buffer_b.read_with(&cx_b, |buf, _| {
1715            assert!(!buf.has_conflict());
1716        });
1717
1718        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1719        buffer_b.read_with(&cx_b, |buf, _| {
1720            assert!(buf.is_dirty());
1721            assert!(!buf.has_conflict());
1722        });
1723    }
1724
1725    #[gpui::test(iterations = 10)]
1726    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1727        cx_a.foreground().forbid_parking();
1728        let lang_registry = Arc::new(LanguageRegistry::new());
1729        let fs = FakeFs::new(cx_a.background());
1730
1731        // Connect to a server as 2 clients.
1732        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1733        let client_a = server.create_client(&mut cx_a, "user_a").await;
1734        let client_b = server.create_client(&mut cx_b, "user_b").await;
1735
1736        // Share a project as client A
1737        fs.insert_tree(
1738            "/dir",
1739            json!({
1740                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1741                "a.txt": "a-contents",
1742            }),
1743        )
1744        .await;
1745
1746        let project_a = cx_a.update(|cx| {
1747            Project::local(
1748                client_a.clone(),
1749                client_a.user_store.clone(),
1750                lang_registry.clone(),
1751                fs.clone(),
1752                cx,
1753            )
1754        });
1755        let (worktree_a, _) = project_a
1756            .update(&mut cx_a, |p, cx| {
1757                p.find_or_create_local_worktree("/dir", false, cx)
1758            })
1759            .await
1760            .unwrap();
1761        worktree_a
1762            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1763            .await;
1764        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1765        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1766        project_a
1767            .update(&mut cx_a, |p, cx| p.share(cx))
1768            .await
1769            .unwrap();
1770
1771        // Join that project as client B
1772        let project_b = Project::remote(
1773            project_id,
1774            client_b.clone(),
1775            client_b.user_store.clone(),
1776            lang_registry.clone(),
1777            fs.clone(),
1778            &mut cx_b.to_async(),
1779        )
1780        .await
1781        .unwrap();
1782        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1783
1784        // Open a buffer as client B
1785        let buffer_b = project_b
1786            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1787            .await
1788            .unwrap();
1789        buffer_b.read_with(&cx_b, |buf, _| {
1790            assert!(!buf.is_dirty());
1791            assert!(!buf.has_conflict());
1792        });
1793
1794        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1795            .await
1796            .unwrap();
1797        buffer_b
1798            .condition(&cx_b, |buf, _| {
1799                buf.text() == "new contents" && !buf.is_dirty()
1800            })
1801            .await;
1802        buffer_b.read_with(&cx_b, |buf, _| {
1803            assert!(!buf.has_conflict());
1804        });
1805    }
1806
1807    #[gpui::test(iterations = 10)]
1808    async fn test_editing_while_guest_opens_buffer(
1809        mut cx_a: TestAppContext,
1810        mut cx_b: TestAppContext,
1811    ) {
1812        cx_a.foreground().forbid_parking();
1813        let lang_registry = Arc::new(LanguageRegistry::new());
1814        let fs = FakeFs::new(cx_a.background());
1815
1816        // Connect to a server as 2 clients.
1817        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1818        let client_a = server.create_client(&mut cx_a, "user_a").await;
1819        let client_b = server.create_client(&mut cx_b, "user_b").await;
1820
1821        // Share a project as client A
1822        fs.insert_tree(
1823            "/dir",
1824            json!({
1825                ".zed.toml": r#"collaborators = ["user_b"]"#,
1826                "a.txt": "a-contents",
1827            }),
1828        )
1829        .await;
1830        let project_a = cx_a.update(|cx| {
1831            Project::local(
1832                client_a.clone(),
1833                client_a.user_store.clone(),
1834                lang_registry.clone(),
1835                fs.clone(),
1836                cx,
1837            )
1838        });
1839        let (worktree_a, _) = project_a
1840            .update(&mut cx_a, |p, cx| {
1841                p.find_or_create_local_worktree("/dir", false, cx)
1842            })
1843            .await
1844            .unwrap();
1845        worktree_a
1846            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1847            .await;
1848        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1849        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1850        project_a
1851            .update(&mut cx_a, |p, cx| p.share(cx))
1852            .await
1853            .unwrap();
1854
1855        // Join that project as client B
1856        let project_b = Project::remote(
1857            project_id,
1858            client_b.clone(),
1859            client_b.user_store.clone(),
1860            lang_registry.clone(),
1861            fs.clone(),
1862            &mut cx_b.to_async(),
1863        )
1864        .await
1865        .unwrap();
1866
1867        // Open a buffer as client A
1868        let buffer_a = project_a
1869            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1870            .await
1871            .unwrap();
1872
1873        // Start opening the same buffer as client B
1874        let buffer_b = cx_b
1875            .background()
1876            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1877
1878        // Edit the buffer as client A while client B is still opening it.
1879        cx_b.background().simulate_random_delay().await;
1880        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1881        cx_b.background().simulate_random_delay().await;
1882        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1883
1884        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1885        let buffer_b = buffer_b.await.unwrap();
1886        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1887    }
1888
1889    #[gpui::test(iterations = 10)]
1890    async fn test_leaving_worktree_while_opening_buffer(
1891        mut cx_a: TestAppContext,
1892        mut cx_b: TestAppContext,
1893    ) {
1894        cx_a.foreground().forbid_parking();
1895        let lang_registry = Arc::new(LanguageRegistry::new());
1896        let fs = FakeFs::new(cx_a.background());
1897
1898        // Connect to a server as 2 clients.
1899        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1900        let client_a = server.create_client(&mut cx_a, "user_a").await;
1901        let client_b = server.create_client(&mut cx_b, "user_b").await;
1902
1903        // Share a project as client A
1904        fs.insert_tree(
1905            "/dir",
1906            json!({
1907                ".zed.toml": r#"collaborators = ["user_b"]"#,
1908                "a.txt": "a-contents",
1909            }),
1910        )
1911        .await;
1912        let project_a = cx_a.update(|cx| {
1913            Project::local(
1914                client_a.clone(),
1915                client_a.user_store.clone(),
1916                lang_registry.clone(),
1917                fs.clone(),
1918                cx,
1919            )
1920        });
1921        let (worktree_a, _) = project_a
1922            .update(&mut cx_a, |p, cx| {
1923                p.find_or_create_local_worktree("/dir", false, cx)
1924            })
1925            .await
1926            .unwrap();
1927        worktree_a
1928            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1929            .await;
1930        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1931        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1932        project_a
1933            .update(&mut cx_a, |p, cx| p.share(cx))
1934            .await
1935            .unwrap();
1936
1937        // Join that project as client B
1938        let project_b = Project::remote(
1939            project_id,
1940            client_b.clone(),
1941            client_b.user_store.clone(),
1942            lang_registry.clone(),
1943            fs.clone(),
1944            &mut cx_b.to_async(),
1945        )
1946        .await
1947        .unwrap();
1948
1949        // See that a guest has joined as client A.
1950        project_a
1951            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1952            .await;
1953
1954        // Begin opening a buffer as client B, but leave the project before the open completes.
1955        let buffer_b = cx_b
1956            .background()
1957            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1958        cx_b.update(|_| drop(project_b));
1959        drop(buffer_b);
1960
1961        // See that the guest has left.
1962        project_a
1963            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1964            .await;
1965    }
1966
1967    #[gpui::test(iterations = 10)]
1968    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1969        cx_a.foreground().forbid_parking();
1970        let lang_registry = Arc::new(LanguageRegistry::new());
1971        let fs = FakeFs::new(cx_a.background());
1972
1973        // Connect to a server as 2 clients.
1974        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1975        let client_a = server.create_client(&mut cx_a, "user_a").await;
1976        let client_b = server.create_client(&mut cx_b, "user_b").await;
1977
1978        // Share a project as client A
1979        fs.insert_tree(
1980            "/a",
1981            json!({
1982                ".zed.toml": r#"collaborators = ["user_b"]"#,
1983                "a.txt": "a-contents",
1984                "b.txt": "b-contents",
1985            }),
1986        )
1987        .await;
1988        let project_a = cx_a.update(|cx| {
1989            Project::local(
1990                client_a.clone(),
1991                client_a.user_store.clone(),
1992                lang_registry.clone(),
1993                fs.clone(),
1994                cx,
1995            )
1996        });
1997        let (worktree_a, _) = project_a
1998            .update(&mut cx_a, |p, cx| {
1999                p.find_or_create_local_worktree("/a", false, cx)
2000            })
2001            .await
2002            .unwrap();
2003        worktree_a
2004            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2005            .await;
2006        let project_id = project_a
2007            .update(&mut cx_a, |project, _| project.next_remote_id())
2008            .await;
2009        project_a
2010            .update(&mut cx_a, |project, cx| project.share(cx))
2011            .await
2012            .unwrap();
2013
2014        // Join that project as client B
2015        let _project_b = Project::remote(
2016            project_id,
2017            client_b.clone(),
2018            client_b.user_store.clone(),
2019            lang_registry.clone(),
2020            fs.clone(),
2021            &mut cx_b.to_async(),
2022        )
2023        .await
2024        .unwrap();
2025
2026        // See that a guest has joined as client A.
2027        project_a
2028            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2029            .await;
2030
2031        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2032        client_b.disconnect(&cx_b.to_async()).unwrap();
2033        project_a
2034            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2035            .await;
2036    }
2037
2038    #[gpui::test(iterations = 10)]
2039    async fn test_collaborating_with_diagnostics(
2040        mut cx_a: TestAppContext,
2041        mut cx_b: TestAppContext,
2042    ) {
2043        cx_a.foreground().forbid_parking();
2044        let mut lang_registry = Arc::new(LanguageRegistry::new());
2045        let fs = FakeFs::new(cx_a.background());
2046
2047        // Set up a fake language server.
2048        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2049        Arc::get_mut(&mut lang_registry)
2050            .unwrap()
2051            .add(Arc::new(Language::new(
2052                LanguageConfig {
2053                    name: "Rust".into(),
2054                    path_suffixes: vec!["rs".to_string()],
2055                    language_server: Some(language_server_config),
2056                    ..Default::default()
2057                },
2058                Some(tree_sitter_rust::language()),
2059            )));
2060
2061        // Connect to a server as 2 clients.
2062        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2063        let client_a = server.create_client(&mut cx_a, "user_a").await;
2064        let client_b = server.create_client(&mut cx_b, "user_b").await;
2065
2066        // Share a project as client A
2067        fs.insert_tree(
2068            "/a",
2069            json!({
2070                ".zed.toml": r#"collaborators = ["user_b"]"#,
2071                "a.rs": "let one = two",
2072                "other.rs": "",
2073            }),
2074        )
2075        .await;
2076        let project_a = cx_a.update(|cx| {
2077            Project::local(
2078                client_a.clone(),
2079                client_a.user_store.clone(),
2080                lang_registry.clone(),
2081                fs.clone(),
2082                cx,
2083            )
2084        });
2085        let (worktree_a, _) = project_a
2086            .update(&mut cx_a, |p, cx| {
2087                p.find_or_create_local_worktree("/a", false, cx)
2088            })
2089            .await
2090            .unwrap();
2091        worktree_a
2092            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2093            .await;
2094        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2095        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2096        project_a
2097            .update(&mut cx_a, |p, cx| p.share(cx))
2098            .await
2099            .unwrap();
2100
2101        // Cause the language server to start.
2102        let _ = cx_a
2103            .background()
2104            .spawn(project_a.update(&mut cx_a, |project, cx| {
2105                project.open_buffer(
2106                    ProjectPath {
2107                        worktree_id,
2108                        path: Path::new("other.rs").into(),
2109                    },
2110                    cx,
2111                )
2112            }))
2113            .await
2114            .unwrap();
2115
2116        // Simulate a language server reporting errors for a file.
2117        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2118        fake_language_server
2119            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2120                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2121                version: None,
2122                diagnostics: vec![lsp::Diagnostic {
2123                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2124                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2125                    message: "message 1".to_string(),
2126                    ..Default::default()
2127                }],
2128            })
2129            .await;
2130
2131        // Wait for server to see the diagnostics update.
2132        server
2133            .condition(|store| {
2134                let worktree = store
2135                    .project(project_id)
2136                    .unwrap()
2137                    .worktrees
2138                    .get(&worktree_id.to_proto())
2139                    .unwrap();
2140
2141                !worktree
2142                    .share
2143                    .as_ref()
2144                    .unwrap()
2145                    .diagnostic_summaries
2146                    .is_empty()
2147            })
2148            .await;
2149
2150        // Join the worktree as client B.
2151        let project_b = Project::remote(
2152            project_id,
2153            client_b.clone(),
2154            client_b.user_store.clone(),
2155            lang_registry.clone(),
2156            fs.clone(),
2157            &mut cx_b.to_async(),
2158        )
2159        .await
2160        .unwrap();
2161
2162        project_b.read_with(&cx_b, |project, cx| {
2163            assert_eq!(
2164                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2165                &[(
2166                    ProjectPath {
2167                        worktree_id,
2168                        path: Arc::from(Path::new("a.rs")),
2169                    },
2170                    DiagnosticSummary {
2171                        error_count: 1,
2172                        warning_count: 0,
2173                        ..Default::default()
2174                    },
2175                )]
2176            )
2177        });
2178
2179        // Simulate a language server reporting more errors for a file.
2180        fake_language_server
2181            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2182                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2183                version: None,
2184                diagnostics: vec![
2185                    lsp::Diagnostic {
2186                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2187                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2188                        message: "message 1".to_string(),
2189                        ..Default::default()
2190                    },
2191                    lsp::Diagnostic {
2192                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2193                        range: lsp::Range::new(
2194                            lsp::Position::new(0, 10),
2195                            lsp::Position::new(0, 13),
2196                        ),
2197                        message: "message 2".to_string(),
2198                        ..Default::default()
2199                    },
2200                ],
2201            })
2202            .await;
2203
2204        // Client b gets the updated summaries
2205        project_b
2206            .condition(&cx_b, |project, cx| {
2207                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2208                    == &[(
2209                        ProjectPath {
2210                            worktree_id,
2211                            path: Arc::from(Path::new("a.rs")),
2212                        },
2213                        DiagnosticSummary {
2214                            error_count: 1,
2215                            warning_count: 1,
2216                            ..Default::default()
2217                        },
2218                    )]
2219            })
2220            .await;
2221
2222        // Open the file with the errors on client B. They should be present.
2223        let buffer_b = cx_b
2224            .background()
2225            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2226            .await
2227            .unwrap();
2228
2229        buffer_b.read_with(&cx_b, |buffer, _| {
2230            assert_eq!(
2231                buffer
2232                    .snapshot()
2233                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2234                    .map(|entry| entry)
2235                    .collect::<Vec<_>>(),
2236                &[
2237                    DiagnosticEntry {
2238                        range: Point::new(0, 4)..Point::new(0, 7),
2239                        diagnostic: Diagnostic {
2240                            group_id: 0,
2241                            message: "message 1".to_string(),
2242                            severity: lsp::DiagnosticSeverity::ERROR,
2243                            is_primary: true,
2244                            ..Default::default()
2245                        }
2246                    },
2247                    DiagnosticEntry {
2248                        range: Point::new(0, 10)..Point::new(0, 13),
2249                        diagnostic: Diagnostic {
2250                            group_id: 1,
2251                            severity: lsp::DiagnosticSeverity::WARNING,
2252                            message: "message 2".to_string(),
2253                            is_primary: true,
2254                            ..Default::default()
2255                        }
2256                    }
2257                ]
2258            );
2259        });
2260    }
2261
2262    #[gpui::test(iterations = 10)]
2263    async fn test_collaborating_with_completion(
2264        mut cx_a: TestAppContext,
2265        mut cx_b: TestAppContext,
2266    ) {
2267        cx_a.foreground().forbid_parking();
2268        let mut lang_registry = Arc::new(LanguageRegistry::new());
2269        let fs = FakeFs::new(cx_a.background());
2270
2271        // Set up a fake language server.
2272        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2273        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2274            completion_provider: Some(lsp::CompletionOptions {
2275                trigger_characters: Some(vec![".".to_string()]),
2276                ..Default::default()
2277            }),
2278            ..Default::default()
2279        });
2280        Arc::get_mut(&mut lang_registry)
2281            .unwrap()
2282            .add(Arc::new(Language::new(
2283                LanguageConfig {
2284                    name: "Rust".into(),
2285                    path_suffixes: vec!["rs".to_string()],
2286                    language_server: Some(language_server_config),
2287                    ..Default::default()
2288                },
2289                Some(tree_sitter_rust::language()),
2290            )));
2291
2292        // Connect to a server as 2 clients.
2293        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2294        let client_a = server.create_client(&mut cx_a, "user_a").await;
2295        let client_b = server.create_client(&mut cx_b, "user_b").await;
2296
2297        // Share a project as client A
2298        fs.insert_tree(
2299            "/a",
2300            json!({
2301                ".zed.toml": r#"collaborators = ["user_b"]"#,
2302                "main.rs": "fn main() { a }",
2303                "other.rs": "",
2304            }),
2305        )
2306        .await;
2307        let project_a = cx_a.update(|cx| {
2308            Project::local(
2309                client_a.clone(),
2310                client_a.user_store.clone(),
2311                lang_registry.clone(),
2312                fs.clone(),
2313                cx,
2314            )
2315        });
2316        let (worktree_a, _) = project_a
2317            .update(&mut cx_a, |p, cx| {
2318                p.find_or_create_local_worktree("/a", false, cx)
2319            })
2320            .await
2321            .unwrap();
2322        worktree_a
2323            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2324            .await;
2325        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2326        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2327        project_a
2328            .update(&mut cx_a, |p, cx| p.share(cx))
2329            .await
2330            .unwrap();
2331
2332        // Join the worktree as client B.
2333        let project_b = Project::remote(
2334            project_id,
2335            client_b.clone(),
2336            client_b.user_store.clone(),
2337            lang_registry.clone(),
2338            fs.clone(),
2339            &mut cx_b.to_async(),
2340        )
2341        .await
2342        .unwrap();
2343
2344        // Open a file in an editor as the guest.
2345        let buffer_b = project_b
2346            .update(&mut cx_b, |p, cx| {
2347                p.open_buffer((worktree_id, "main.rs"), cx)
2348            })
2349            .await
2350            .unwrap();
2351        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2352        let editor_b = cx_b.add_view(window_b, |cx| {
2353            Editor::for_buffer(
2354                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2355                Arc::new(|cx| EditorSettings::test(cx)),
2356                Some(project_b.clone()),
2357                cx,
2358            )
2359        });
2360
2361        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2362        buffer_b
2363            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2364            .await;
2365
2366        // Type a completion trigger character as the guest.
2367        editor_b.update(&mut cx_b, |editor, cx| {
2368            editor.select_ranges([13..13], None, cx);
2369            editor.handle_input(&Input(".".into()), cx);
2370            cx.focus(&editor_b);
2371        });
2372
2373        // Receive a completion request as the host's language server.
2374        // Return some completions from the host's language server.
2375        cx_a.foreground().start_waiting();
2376        fake_language_server
2377            .handle_request::<lsp::request::Completion, _>(|params| {
2378                assert_eq!(
2379                    params.text_document_position.text_document.uri,
2380                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2381                );
2382                assert_eq!(
2383                    params.text_document_position.position,
2384                    lsp::Position::new(0, 14),
2385                );
2386
2387                Some(lsp::CompletionResponse::Array(vec![
2388                    lsp::CompletionItem {
2389                        label: "first_method(…)".into(),
2390                        detail: Some("fn(&mut self, B) -> C".into()),
2391                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2392                            new_text: "first_method($1)".to_string(),
2393                            range: lsp::Range::new(
2394                                lsp::Position::new(0, 14),
2395                                lsp::Position::new(0, 14),
2396                            ),
2397                        })),
2398                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2399                        ..Default::default()
2400                    },
2401                    lsp::CompletionItem {
2402                        label: "second_method(…)".into(),
2403                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2404                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2405                            new_text: "second_method()".to_string(),
2406                            range: lsp::Range::new(
2407                                lsp::Position::new(0, 14),
2408                                lsp::Position::new(0, 14),
2409                            ),
2410                        })),
2411                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2412                        ..Default::default()
2413                    },
2414                ]))
2415            })
2416            .next()
2417            .await
2418            .unwrap();
2419        cx_a.foreground().finish_waiting();
2420
2421        // Open the buffer on the host.
2422        let buffer_a = project_a
2423            .update(&mut cx_a, |p, cx| {
2424                p.open_buffer((worktree_id, "main.rs"), cx)
2425            })
2426            .await
2427            .unwrap();
2428        buffer_a
2429            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2430            .await;
2431
2432        // Confirm a completion on the guest.
2433        editor_b
2434            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2435            .await;
2436        editor_b.update(&mut cx_b, |editor, cx| {
2437            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2438            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2439        });
2440
2441        // Return a resolved completion from the host's language server.
2442        // The resolved completion has an additional text edit.
2443        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2444            assert_eq!(params.label, "first_method(…)");
2445            lsp::CompletionItem {
2446                label: "first_method(…)".into(),
2447                detail: Some("fn(&mut self, B) -> C".into()),
2448                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2449                    new_text: "first_method($1)".to_string(),
2450                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2451                })),
2452                additional_text_edits: Some(vec![lsp::TextEdit {
2453                    new_text: "use d::SomeTrait;\n".to_string(),
2454                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2455                }]),
2456                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2457                ..Default::default()
2458            }
2459        });
2460
2461        // The additional edit is applied.
2462        buffer_a
2463            .condition(&cx_a, |buffer, _| {
2464                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2465            })
2466            .await;
2467        buffer_b
2468            .condition(&cx_b, |buffer, _| {
2469                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2470            })
2471            .await;
2472    }
2473
2474    #[gpui::test(iterations = 10)]
2475    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2476        cx_a.foreground().forbid_parking();
2477        let mut lang_registry = Arc::new(LanguageRegistry::new());
2478        let fs = FakeFs::new(cx_a.background());
2479
2480        // Set up a fake language server.
2481        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2482        Arc::get_mut(&mut lang_registry)
2483            .unwrap()
2484            .add(Arc::new(Language::new(
2485                LanguageConfig {
2486                    name: "Rust".into(),
2487                    path_suffixes: vec!["rs".to_string()],
2488                    language_server: Some(language_server_config),
2489                    ..Default::default()
2490                },
2491                Some(tree_sitter_rust::language()),
2492            )));
2493
2494        // Connect to a server as 2 clients.
2495        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2496        let client_a = server.create_client(&mut cx_a, "user_a").await;
2497        let client_b = server.create_client(&mut cx_b, "user_b").await;
2498
2499        // Share a project as client A
2500        fs.insert_tree(
2501            "/a",
2502            json!({
2503                ".zed.toml": r#"collaborators = ["user_b"]"#,
2504                "a.rs": "let one = two",
2505            }),
2506        )
2507        .await;
2508        let project_a = cx_a.update(|cx| {
2509            Project::local(
2510                client_a.clone(),
2511                client_a.user_store.clone(),
2512                lang_registry.clone(),
2513                fs.clone(),
2514                cx,
2515            )
2516        });
2517        let (worktree_a, _) = project_a
2518            .update(&mut cx_a, |p, cx| {
2519                p.find_or_create_local_worktree("/a", false, cx)
2520            })
2521            .await
2522            .unwrap();
2523        worktree_a
2524            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2525            .await;
2526        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2527        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2528        project_a
2529            .update(&mut cx_a, |p, cx| p.share(cx))
2530            .await
2531            .unwrap();
2532
2533        // Join the worktree as client B.
2534        let project_b = Project::remote(
2535            project_id,
2536            client_b.clone(),
2537            client_b.user_store.clone(),
2538            lang_registry.clone(),
2539            fs.clone(),
2540            &mut cx_b.to_async(),
2541        )
2542        .await
2543        .unwrap();
2544
2545        let buffer_b = cx_b
2546            .background()
2547            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2548            .await
2549            .unwrap();
2550
2551        let format = project_b.update(&mut cx_b, |project, cx| {
2552            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2553        });
2554
2555        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2556        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2557            Some(vec![
2558                lsp::TextEdit {
2559                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2560                    new_text: "h".to_string(),
2561                },
2562                lsp::TextEdit {
2563                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2564                    new_text: "y".to_string(),
2565                },
2566            ])
2567        });
2568
2569        format.await.unwrap();
2570        assert_eq!(
2571            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2572            "let honey = two"
2573        );
2574    }
2575
2576    #[gpui::test(iterations = 10)]
2577    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2578        cx_a.foreground().forbid_parking();
2579        let mut lang_registry = Arc::new(LanguageRegistry::new());
2580        let fs = FakeFs::new(cx_a.background());
2581        fs.insert_tree(
2582            "/root-1",
2583            json!({
2584                ".zed.toml": r#"collaborators = ["user_b"]"#,
2585                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2586            }),
2587        )
2588        .await;
2589        fs.insert_tree(
2590            "/root-2",
2591            json!({
2592                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2593            }),
2594        )
2595        .await;
2596
2597        // Set up a fake language server.
2598        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2599        Arc::get_mut(&mut lang_registry)
2600            .unwrap()
2601            .add(Arc::new(Language::new(
2602                LanguageConfig {
2603                    name: "Rust".into(),
2604                    path_suffixes: vec!["rs".to_string()],
2605                    language_server: Some(language_server_config),
2606                    ..Default::default()
2607                },
2608                Some(tree_sitter_rust::language()),
2609            )));
2610
2611        // Connect to a server as 2 clients.
2612        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2613        let client_a = server.create_client(&mut cx_a, "user_a").await;
2614        let client_b = server.create_client(&mut cx_b, "user_b").await;
2615
2616        // Share a project as client A
2617        let project_a = cx_a.update(|cx| {
2618            Project::local(
2619                client_a.clone(),
2620                client_a.user_store.clone(),
2621                lang_registry.clone(),
2622                fs.clone(),
2623                cx,
2624            )
2625        });
2626        let (worktree_a, _) = project_a
2627            .update(&mut cx_a, |p, cx| {
2628                p.find_or_create_local_worktree("/root-1", false, cx)
2629            })
2630            .await
2631            .unwrap();
2632        worktree_a
2633            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2634            .await;
2635        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2636        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2637        project_a
2638            .update(&mut cx_a, |p, cx| p.share(cx))
2639            .await
2640            .unwrap();
2641
2642        // Join the worktree as client B.
2643        let project_b = Project::remote(
2644            project_id,
2645            client_b.clone(),
2646            client_b.user_store.clone(),
2647            lang_registry.clone(),
2648            fs.clone(),
2649            &mut cx_b.to_async(),
2650        )
2651        .await
2652        .unwrap();
2653
2654        // Open the file on client B.
2655        let buffer_b = cx_b
2656            .background()
2657            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2658            .await
2659            .unwrap();
2660
2661        // Request the definition of a symbol as the guest.
2662        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2663
2664        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2665        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2666            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2667                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2668                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2669            )))
2670        });
2671
2672        let definitions_1 = definitions_1.await.unwrap();
2673        cx_b.read(|cx| {
2674            assert_eq!(definitions_1.len(), 1);
2675            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2676            let target_buffer = definitions_1[0].buffer.read(cx);
2677            assert_eq!(
2678                target_buffer.text(),
2679                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2680            );
2681            assert_eq!(
2682                definitions_1[0].range.to_point(target_buffer),
2683                Point::new(0, 6)..Point::new(0, 9)
2684            );
2685        });
2686
2687        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2688        // the previous call to `definition`.
2689        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2690        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2691            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2692                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2693                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2694            )))
2695        });
2696
2697        let definitions_2 = definitions_2.await.unwrap();
2698        cx_b.read(|cx| {
2699            assert_eq!(definitions_2.len(), 1);
2700            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2701            let target_buffer = definitions_2[0].buffer.read(cx);
2702            assert_eq!(
2703                target_buffer.text(),
2704                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2705            );
2706            assert_eq!(
2707                definitions_2[0].range.to_point(target_buffer),
2708                Point::new(1, 6)..Point::new(1, 11)
2709            );
2710        });
2711        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2712
2713        cx_b.update(|_| {
2714            drop(definitions_1);
2715            drop(definitions_2);
2716        });
2717        project_b
2718            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2719            .await;
2720    }
2721
2722    #[gpui::test(iterations = 10)]
2723    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2724        cx_a.foreground().forbid_parking();
2725        let mut lang_registry = Arc::new(LanguageRegistry::new());
2726        let fs = FakeFs::new(cx_a.background());
2727        fs.insert_tree(
2728            "/root-1",
2729            json!({
2730                ".zed.toml": r#"collaborators = ["user_b"]"#,
2731                "one.rs": "const ONE: usize = 1;",
2732                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2733            }),
2734        )
2735        .await;
2736        fs.insert_tree(
2737            "/root-2",
2738            json!({
2739                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2740            }),
2741        )
2742        .await;
2743
2744        // Set up a fake language server.
2745        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2746        Arc::get_mut(&mut lang_registry)
2747            .unwrap()
2748            .add(Arc::new(Language::new(
2749                LanguageConfig {
2750                    name: "Rust".into(),
2751                    path_suffixes: vec!["rs".to_string()],
2752                    language_server: Some(language_server_config),
2753                    ..Default::default()
2754                },
2755                Some(tree_sitter_rust::language()),
2756            )));
2757
2758        // Connect to a server as 2 clients.
2759        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2760        let client_a = server.create_client(&mut cx_a, "user_a").await;
2761        let client_b = server.create_client(&mut cx_b, "user_b").await;
2762
2763        // Share a project as client A
2764        let project_a = cx_a.update(|cx| {
2765            Project::local(
2766                client_a.clone(),
2767                client_a.user_store.clone(),
2768                lang_registry.clone(),
2769                fs.clone(),
2770                cx,
2771            )
2772        });
2773        let (worktree_a, _) = project_a
2774            .update(&mut cx_a, |p, cx| {
2775                p.find_or_create_local_worktree("/root-1", false, cx)
2776            })
2777            .await
2778            .unwrap();
2779        worktree_a
2780            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2781            .await;
2782        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2783        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2784        project_a
2785            .update(&mut cx_a, |p, cx| p.share(cx))
2786            .await
2787            .unwrap();
2788
2789        // Join the worktree as client B.
2790        let project_b = Project::remote(
2791            project_id,
2792            client_b.clone(),
2793            client_b.user_store.clone(),
2794            lang_registry.clone(),
2795            fs.clone(),
2796            &mut cx_b.to_async(),
2797        )
2798        .await
2799        .unwrap();
2800
2801        // Open the file on client B.
2802        let buffer_b = cx_b
2803            .background()
2804            .spawn(project_b.update(&mut cx_b, |p, cx| {
2805                p.open_buffer((worktree_id, "one.rs"), cx)
2806            }))
2807            .await
2808            .unwrap();
2809
2810        // Request references to a symbol as the guest.
2811        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2812
2813        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2814        fake_language_server.handle_request::<lsp::request::References, _>(|params| {
2815            assert_eq!(
2816                params.text_document_position.text_document.uri.as_str(),
2817                "file:///root-1/one.rs"
2818            );
2819            Some(vec![
2820                lsp::Location {
2821                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2822                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2823                },
2824                lsp::Location {
2825                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2826                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2827                },
2828                lsp::Location {
2829                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2830                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2831                },
2832            ])
2833        });
2834
2835        let references = references.await.unwrap();
2836        cx_b.read(|cx| {
2837            assert_eq!(references.len(), 3);
2838            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2839
2840            let two_buffer = references[0].buffer.read(cx);
2841            let three_buffer = references[2].buffer.read(cx);
2842            assert_eq!(
2843                two_buffer.file().unwrap().path().as_ref(),
2844                Path::new("two.rs")
2845            );
2846            assert_eq!(references[1].buffer, references[0].buffer);
2847            assert_eq!(
2848                three_buffer.file().unwrap().full_path(cx),
2849                Path::new("three.rs")
2850            );
2851
2852            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2853            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2854            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2855        });
2856    }
2857
2858    #[gpui::test(iterations = 10)]
2859    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2860        cx_a.foreground().forbid_parking();
2861        let mut lang_registry = Arc::new(LanguageRegistry::new());
2862        let fs = FakeFs::new(cx_a.background());
2863        fs.insert_tree(
2864            "/code",
2865            json!({
2866                "crate-1": {
2867                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2868                    "one.rs": "const ONE: usize = 1;",
2869                },
2870                "crate-2": {
2871                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2872                },
2873                "private": {
2874                    "passwords.txt": "the-password",
2875                }
2876            }),
2877        )
2878        .await;
2879
2880        // Set up a fake language server.
2881        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2882        Arc::get_mut(&mut lang_registry)
2883            .unwrap()
2884            .add(Arc::new(Language::new(
2885                LanguageConfig {
2886                    name: "Rust".into(),
2887                    path_suffixes: vec!["rs".to_string()],
2888                    language_server: Some(language_server_config),
2889                    ..Default::default()
2890                },
2891                Some(tree_sitter_rust::language()),
2892            )));
2893
2894        // Connect to a server as 2 clients.
2895        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2896        let client_a = server.create_client(&mut cx_a, "user_a").await;
2897        let client_b = server.create_client(&mut cx_b, "user_b").await;
2898
2899        // Share a project as client A
2900        let project_a = cx_a.update(|cx| {
2901            Project::local(
2902                client_a.clone(),
2903                client_a.user_store.clone(),
2904                lang_registry.clone(),
2905                fs.clone(),
2906                cx,
2907            )
2908        });
2909        let (worktree_a, _) = project_a
2910            .update(&mut cx_a, |p, cx| {
2911                p.find_or_create_local_worktree("/code/crate-1", false, cx)
2912            })
2913            .await
2914            .unwrap();
2915        worktree_a
2916            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2917            .await;
2918        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2919        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2920        project_a
2921            .update(&mut cx_a, |p, cx| p.share(cx))
2922            .await
2923            .unwrap();
2924
2925        // Join the worktree as client B.
2926        let project_b = Project::remote(
2927            project_id,
2928            client_b.clone(),
2929            client_b.user_store.clone(),
2930            lang_registry.clone(),
2931            fs.clone(),
2932            &mut cx_b.to_async(),
2933        )
2934        .await
2935        .unwrap();
2936
2937        // Cause the language server to start.
2938        let _buffer = cx_b
2939            .background()
2940            .spawn(project_b.update(&mut cx_b, |p, cx| {
2941                p.open_buffer((worktree_id, "one.rs"), cx)
2942            }))
2943            .await
2944            .unwrap();
2945
2946        // Request the definition of a symbol as the guest.
2947        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
2948        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2949        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_| {
2950            #[allow(deprecated)]
2951            Some(vec![lsp::SymbolInformation {
2952                name: "TWO".into(),
2953                location: lsp::Location {
2954                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2955                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2956                },
2957                kind: lsp::SymbolKind::CONSTANT,
2958                tags: None,
2959                container_name: None,
2960                deprecated: None,
2961            }])
2962        });
2963
2964        let symbols = symbols.await.unwrap();
2965        assert_eq!(symbols.len(), 1);
2966        assert_eq!(symbols[0].name, "TWO");
2967
2968        // Open one of the returned symbols.
2969        let buffer_b_2 = project_b
2970            .update(&mut cx_b, |project, cx| {
2971                project.open_buffer_for_symbol(&symbols[0], cx)
2972            })
2973            .await
2974            .unwrap();
2975        buffer_b_2.read_with(&cx_b, |buffer, _| {
2976            assert_eq!(
2977                buffer.file().unwrap().path().as_ref(),
2978                Path::new("../crate-2/two.rs")
2979            );
2980        });
2981
2982        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
2983        let mut fake_symbol = symbols[0].clone();
2984        fake_symbol.path = Path::new("/code/secrets").into();
2985        let error = project_b
2986            .update(&mut cx_b, |project, cx| {
2987                project.open_buffer_for_symbol(&fake_symbol, cx)
2988            })
2989            .await
2990            .unwrap_err();
2991        assert!(error.to_string().contains("invalid symbol signature"));
2992    }
2993
2994    #[gpui::test(iterations = 10)]
2995    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2996        mut cx_a: TestAppContext,
2997        mut cx_b: TestAppContext,
2998        mut rng: StdRng,
2999    ) {
3000        cx_a.foreground().forbid_parking();
3001        let mut lang_registry = Arc::new(LanguageRegistry::new());
3002        let fs = FakeFs::new(cx_a.background());
3003        fs.insert_tree(
3004            "/root",
3005            json!({
3006                ".zed.toml": r#"collaborators = ["user_b"]"#,
3007                "a.rs": "const ONE: usize = b::TWO;",
3008                "b.rs": "const TWO: usize = 2",
3009            }),
3010        )
3011        .await;
3012
3013        // Set up a fake language server.
3014        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3015
3016        Arc::get_mut(&mut lang_registry)
3017            .unwrap()
3018            .add(Arc::new(Language::new(
3019                LanguageConfig {
3020                    name: "Rust".into(),
3021                    path_suffixes: vec!["rs".to_string()],
3022                    language_server: Some(language_server_config),
3023                    ..Default::default()
3024                },
3025                Some(tree_sitter_rust::language()),
3026            )));
3027
3028        // Connect to a server as 2 clients.
3029        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3030        let client_a = server.create_client(&mut cx_a, "user_a").await;
3031        let client_b = server.create_client(&mut cx_b, "user_b").await;
3032
3033        // Share a project as client A
3034        let project_a = cx_a.update(|cx| {
3035            Project::local(
3036                client_a.clone(),
3037                client_a.user_store.clone(),
3038                lang_registry.clone(),
3039                fs.clone(),
3040                cx,
3041            )
3042        });
3043
3044        let (worktree_a, _) = project_a
3045            .update(&mut cx_a, |p, cx| {
3046                p.find_or_create_local_worktree("/root", false, cx)
3047            })
3048            .await
3049            .unwrap();
3050        worktree_a
3051            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3052            .await;
3053        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3054        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3055        project_a
3056            .update(&mut cx_a, |p, cx| p.share(cx))
3057            .await
3058            .unwrap();
3059
3060        // Join the worktree as client B.
3061        let project_b = Project::remote(
3062            project_id,
3063            client_b.clone(),
3064            client_b.user_store.clone(),
3065            lang_registry.clone(),
3066            fs.clone(),
3067            &mut cx_b.to_async(),
3068        )
3069        .await
3070        .unwrap();
3071
3072        let buffer_b1 = cx_b
3073            .background()
3074            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3075            .await
3076            .unwrap();
3077
3078        let definitions;
3079        let buffer_b2;
3080        if rng.gen() {
3081            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3082            buffer_b2 =
3083                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3084        } else {
3085            buffer_b2 =
3086                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3087            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3088        }
3089
3090        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3091        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
3092            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3093                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3094                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3095            )))
3096        });
3097
3098        let buffer_b2 = buffer_b2.await.unwrap();
3099        let definitions = definitions.await.unwrap();
3100        assert_eq!(definitions.len(), 1);
3101        assert_eq!(definitions[0].buffer, buffer_b2);
3102    }
3103
3104    #[gpui::test(iterations = 10)]
3105    async fn test_collaborating_with_code_actions(
3106        mut cx_a: TestAppContext,
3107        mut cx_b: TestAppContext,
3108    ) {
3109        cx_a.foreground().forbid_parking();
3110        let mut lang_registry = Arc::new(LanguageRegistry::new());
3111        let fs = FakeFs::new(cx_a.background());
3112        let mut path_openers_b = Vec::new();
3113        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3114
3115        // Set up a fake language server.
3116        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3117        Arc::get_mut(&mut lang_registry)
3118            .unwrap()
3119            .add(Arc::new(Language::new(
3120                LanguageConfig {
3121                    name: "Rust".into(),
3122                    path_suffixes: vec!["rs".to_string()],
3123                    language_server: Some(language_server_config),
3124                    ..Default::default()
3125                },
3126                Some(tree_sitter_rust::language()),
3127            )));
3128
3129        // Connect to a server as 2 clients.
3130        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3131        let client_a = server.create_client(&mut cx_a, "user_a").await;
3132        let client_b = server.create_client(&mut cx_b, "user_b").await;
3133
3134        // Share a project as client A
3135        fs.insert_tree(
3136            "/a",
3137            json!({
3138                ".zed.toml": r#"collaborators = ["user_b"]"#,
3139                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3140                "other.rs": "pub fn foo() -> usize { 4 }",
3141            }),
3142        )
3143        .await;
3144        let project_a = cx_a.update(|cx| {
3145            Project::local(
3146                client_a.clone(),
3147                client_a.user_store.clone(),
3148                lang_registry.clone(),
3149                fs.clone(),
3150                cx,
3151            )
3152        });
3153        let (worktree_a, _) = project_a
3154            .update(&mut cx_a, |p, cx| {
3155                p.find_or_create_local_worktree("/a", false, cx)
3156            })
3157            .await
3158            .unwrap();
3159        worktree_a
3160            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3161            .await;
3162        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3163        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3164        project_a
3165            .update(&mut cx_a, |p, cx| p.share(cx))
3166            .await
3167            .unwrap();
3168
3169        // Join the worktree as client B.
3170        let project_b = Project::remote(
3171            project_id,
3172            client_b.clone(),
3173            client_b.user_store.clone(),
3174            lang_registry.clone(),
3175            fs.clone(),
3176            &mut cx_b.to_async(),
3177        )
3178        .await
3179        .unwrap();
3180        let mut params = cx_b.update(WorkspaceParams::test);
3181        params.languages = lang_registry.clone();
3182        params.client = client_b.client.clone();
3183        params.user_store = client_b.user_store.clone();
3184        params.project = project_b;
3185        params.path_openers = path_openers_b.into();
3186
3187        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3188        let editor_b = workspace_b
3189            .update(&mut cx_b, |workspace, cx| {
3190                workspace.open_path((worktree_id, "main.rs").into(), cx)
3191            })
3192            .await
3193            .unwrap()
3194            .downcast::<Editor>()
3195            .unwrap();
3196
3197        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3198        fake_language_server
3199            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
3200                assert_eq!(
3201                    params.text_document.uri,
3202                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3203                );
3204                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3205                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3206                None
3207            })
3208            .next()
3209            .await;
3210
3211        // Move cursor to a location that contains code actions.
3212        editor_b.update(&mut cx_b, |editor, cx| {
3213            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3214            cx.focus(&editor_b);
3215        });
3216
3217        fake_language_server
3218            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
3219                assert_eq!(
3220                    params.text_document.uri,
3221                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3222                );
3223                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3224                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3225
3226                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3227                    lsp::CodeAction {
3228                        title: "Inline into all callers".to_string(),
3229                        edit: Some(lsp::WorkspaceEdit {
3230                            changes: Some(
3231                                [
3232                                    (
3233                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3234                                        vec![lsp::TextEdit::new(
3235                                            lsp::Range::new(
3236                                                lsp::Position::new(1, 22),
3237                                                lsp::Position::new(1, 34),
3238                                            ),
3239                                            "4".to_string(),
3240                                        )],
3241                                    ),
3242                                    (
3243                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3244                                        vec![lsp::TextEdit::new(
3245                                            lsp::Range::new(
3246                                                lsp::Position::new(0, 0),
3247                                                lsp::Position::new(0, 27),
3248                                            ),
3249                                            "".to_string(),
3250                                        )],
3251                                    ),
3252                                ]
3253                                .into_iter()
3254                                .collect(),
3255                            ),
3256                            ..Default::default()
3257                        }),
3258                        data: Some(json!({
3259                            "codeActionParams": {
3260                                "range": {
3261                                    "start": {"line": 1, "column": 31},
3262                                    "end": {"line": 1, "column": 31},
3263                                }
3264                            }
3265                        })),
3266                        ..Default::default()
3267                    },
3268                )])
3269            })
3270            .next()
3271            .await;
3272
3273        // Toggle code actions and wait for them to display.
3274        editor_b.update(&mut cx_b, |editor, cx| {
3275            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3276        });
3277        editor_b
3278            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3279            .await;
3280
3281        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3282
3283        // Confirming the code action will trigger a resolve request.
3284        let confirm_action = workspace_b
3285            .update(&mut cx_b, |workspace, cx| {
3286                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3287            })
3288            .unwrap();
3289        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
3290            lsp::CodeAction {
3291                title: "Inline into all callers".to_string(),
3292                edit: Some(lsp::WorkspaceEdit {
3293                    changes: Some(
3294                        [
3295                            (
3296                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3297                                vec![lsp::TextEdit::new(
3298                                    lsp::Range::new(
3299                                        lsp::Position::new(1, 22),
3300                                        lsp::Position::new(1, 34),
3301                                    ),
3302                                    "4".to_string(),
3303                                )],
3304                            ),
3305                            (
3306                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3307                                vec![lsp::TextEdit::new(
3308                                    lsp::Range::new(
3309                                        lsp::Position::new(0, 0),
3310                                        lsp::Position::new(0, 27),
3311                                    ),
3312                                    "".to_string(),
3313                                )],
3314                            ),
3315                        ]
3316                        .into_iter()
3317                        .collect(),
3318                    ),
3319                    ..Default::default()
3320                }),
3321                ..Default::default()
3322            }
3323        });
3324
3325        // After the action is confirmed, an editor containing both modified files is opened.
3326        confirm_action.await.unwrap();
3327        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3328            workspace
3329                .active_item(cx)
3330                .unwrap()
3331                .downcast::<Editor>()
3332                .unwrap()
3333        });
3334        code_action_editor.update(&mut cx_b, |editor, cx| {
3335            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3336            editor.undo(&Undo, cx);
3337            assert_eq!(
3338                editor.text(cx),
3339                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3340            );
3341            editor.redo(&Redo, cx);
3342            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3343        });
3344    }
3345
3346    #[gpui::test(iterations = 10)]
3347    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3348        cx_a.foreground().forbid_parking();
3349        let mut lang_registry = Arc::new(LanguageRegistry::new());
3350        let fs = FakeFs::new(cx_a.background());
3351        let mut path_openers_b = Vec::new();
3352        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3353
3354        // Set up a fake language server.
3355        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3356        Arc::get_mut(&mut lang_registry)
3357            .unwrap()
3358            .add(Arc::new(Language::new(
3359                LanguageConfig {
3360                    name: "Rust".into(),
3361                    path_suffixes: vec!["rs".to_string()],
3362                    language_server: Some(language_server_config),
3363                    ..Default::default()
3364                },
3365                Some(tree_sitter_rust::language()),
3366            )));
3367
3368        // Connect to a server as 2 clients.
3369        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3370        let client_a = server.create_client(&mut cx_a, "user_a").await;
3371        let client_b = server.create_client(&mut cx_b, "user_b").await;
3372
3373        // Share a project as client A
3374        fs.insert_tree(
3375            "/dir",
3376            json!({
3377                ".zed.toml": r#"collaborators = ["user_b"]"#,
3378                "one.rs": "const ONE: usize = 1;",
3379                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3380            }),
3381        )
3382        .await;
3383        let project_a = cx_a.update(|cx| {
3384            Project::local(
3385                client_a.clone(),
3386                client_a.user_store.clone(),
3387                lang_registry.clone(),
3388                fs.clone(),
3389                cx,
3390            )
3391        });
3392        let (worktree_a, _) = project_a
3393            .update(&mut cx_a, |p, cx| {
3394                p.find_or_create_local_worktree("/dir", false, cx)
3395            })
3396            .await
3397            .unwrap();
3398        worktree_a
3399            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3400            .await;
3401        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3402        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3403        project_a
3404            .update(&mut cx_a, |p, cx| p.share(cx))
3405            .await
3406            .unwrap();
3407
3408        // Join the worktree as client B.
3409        let project_b = Project::remote(
3410            project_id,
3411            client_b.clone(),
3412            client_b.user_store.clone(),
3413            lang_registry.clone(),
3414            fs.clone(),
3415            &mut cx_b.to_async(),
3416        )
3417        .await
3418        .unwrap();
3419        let mut params = cx_b.update(WorkspaceParams::test);
3420        params.languages = lang_registry.clone();
3421        params.client = client_b.client.clone();
3422        params.user_store = client_b.user_store.clone();
3423        params.project = project_b;
3424        params.path_openers = path_openers_b.into();
3425
3426        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3427        let editor_b = workspace_b
3428            .update(&mut cx_b, |workspace, cx| {
3429                workspace.open_path((worktree_id, "one.rs").into(), cx)
3430            })
3431            .await
3432            .unwrap()
3433            .downcast::<Editor>()
3434            .unwrap();
3435        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3436
3437        // Move cursor to a location that can be renamed.
3438        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3439            editor.select_ranges([7..7], None, cx);
3440            editor.rename(&Rename, cx).unwrap()
3441        });
3442
3443        fake_language_server
3444            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
3445                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3446                assert_eq!(params.position, lsp::Position::new(0, 7));
3447                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3448                    lsp::Position::new(0, 6),
3449                    lsp::Position::new(0, 9),
3450                )))
3451            })
3452            .next()
3453            .await
3454            .unwrap();
3455        prepare_rename.await.unwrap();
3456        editor_b.update(&mut cx_b, |editor, cx| {
3457            let rename = editor.pending_rename().unwrap();
3458            let buffer = editor.buffer().read(cx).snapshot(cx);
3459            assert_eq!(
3460                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3461                6..9
3462            );
3463            rename.editor.update(cx, |rename_editor, cx| {
3464                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3465                    rename_buffer.edit([0..3], "THREE", cx);
3466                });
3467            });
3468        });
3469
3470        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3471            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3472        });
3473        fake_language_server
3474            .handle_request::<lsp::request::Rename, _>(|params| {
3475                assert_eq!(
3476                    params.text_document_position.text_document.uri.as_str(),
3477                    "file:///dir/one.rs"
3478                );
3479                assert_eq!(
3480                    params.text_document_position.position,
3481                    lsp::Position::new(0, 6)
3482                );
3483                assert_eq!(params.new_name, "THREE");
3484                Some(lsp::WorkspaceEdit {
3485                    changes: Some(
3486                        [
3487                            (
3488                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3489                                vec![lsp::TextEdit::new(
3490                                    lsp::Range::new(
3491                                        lsp::Position::new(0, 6),
3492                                        lsp::Position::new(0, 9),
3493                                    ),
3494                                    "THREE".to_string(),
3495                                )],
3496                            ),
3497                            (
3498                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3499                                vec![
3500                                    lsp::TextEdit::new(
3501                                        lsp::Range::new(
3502                                            lsp::Position::new(0, 24),
3503                                            lsp::Position::new(0, 27),
3504                                        ),
3505                                        "THREE".to_string(),
3506                                    ),
3507                                    lsp::TextEdit::new(
3508                                        lsp::Range::new(
3509                                            lsp::Position::new(0, 35),
3510                                            lsp::Position::new(0, 38),
3511                                        ),
3512                                        "THREE".to_string(),
3513                                    ),
3514                                ],
3515                            ),
3516                        ]
3517                        .into_iter()
3518                        .collect(),
3519                    ),
3520                    ..Default::default()
3521                })
3522            })
3523            .next()
3524            .await
3525            .unwrap();
3526        confirm_rename.await.unwrap();
3527
3528        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3529            workspace
3530                .active_item(cx)
3531                .unwrap()
3532                .downcast::<Editor>()
3533                .unwrap()
3534        });
3535        rename_editor.update(&mut cx_b, |editor, cx| {
3536            assert_eq!(
3537                editor.text(cx),
3538                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3539            );
3540            editor.undo(&Undo, cx);
3541            assert_eq!(
3542                editor.text(cx),
3543                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3544            );
3545            editor.redo(&Redo, cx);
3546            assert_eq!(
3547                editor.text(cx),
3548                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3549            );
3550        });
3551
3552        // Ensure temporary rename edits cannot be undone/redone.
3553        editor_b.update(&mut cx_b, |editor, cx| {
3554            editor.undo(&Undo, cx);
3555            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3556            editor.undo(&Undo, cx);
3557            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3558            editor.redo(&Redo, cx);
3559            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3560        })
3561    }
3562
3563    #[gpui::test(iterations = 10)]
3564    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3565        cx_a.foreground().forbid_parking();
3566
3567        // Connect to a server as 2 clients.
3568        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3569        let client_a = server.create_client(&mut cx_a, "user_a").await;
3570        let client_b = server.create_client(&mut cx_b, "user_b").await;
3571
3572        // Create an org that includes these 2 users.
3573        let db = &server.app_state.db;
3574        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3575        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3576            .await
3577            .unwrap();
3578        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3579            .await
3580            .unwrap();
3581
3582        // Create a channel that includes all the users.
3583        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3584        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3585            .await
3586            .unwrap();
3587        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3588            .await
3589            .unwrap();
3590        db.create_channel_message(
3591            channel_id,
3592            client_b.current_user_id(&cx_b),
3593            "hello A, it's B.",
3594            OffsetDateTime::now_utc(),
3595            1,
3596        )
3597        .await
3598        .unwrap();
3599
3600        let channels_a = cx_a
3601            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3602        channels_a
3603            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3604            .await;
3605        channels_a.read_with(&cx_a, |list, _| {
3606            assert_eq!(
3607                list.available_channels().unwrap(),
3608                &[ChannelDetails {
3609                    id: channel_id.to_proto(),
3610                    name: "test-channel".to_string()
3611                }]
3612            )
3613        });
3614        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3615            this.get_channel(channel_id.to_proto(), cx).unwrap()
3616        });
3617        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3618        channel_a
3619            .condition(&cx_a, |channel, _| {
3620                channel_messages(channel)
3621                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3622            })
3623            .await;
3624
3625        let channels_b = cx_b
3626            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3627        channels_b
3628            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3629            .await;
3630        channels_b.read_with(&cx_b, |list, _| {
3631            assert_eq!(
3632                list.available_channels().unwrap(),
3633                &[ChannelDetails {
3634                    id: channel_id.to_proto(),
3635                    name: "test-channel".to_string()
3636                }]
3637            )
3638        });
3639
3640        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3641            this.get_channel(channel_id.to_proto(), cx).unwrap()
3642        });
3643        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3644        channel_b
3645            .condition(&cx_b, |channel, _| {
3646                channel_messages(channel)
3647                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3648            })
3649            .await;
3650
3651        channel_a
3652            .update(&mut cx_a, |channel, cx| {
3653                channel
3654                    .send_message("oh, hi B.".to_string(), cx)
3655                    .unwrap()
3656                    .detach();
3657                let task = channel.send_message("sup".to_string(), cx).unwrap();
3658                assert_eq!(
3659                    channel_messages(channel),
3660                    &[
3661                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3662                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3663                        ("user_a".to_string(), "sup".to_string(), true)
3664                    ]
3665                );
3666                task
3667            })
3668            .await
3669            .unwrap();
3670
3671        channel_b
3672            .condition(&cx_b, |channel, _| {
3673                channel_messages(channel)
3674                    == [
3675                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3676                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3677                        ("user_a".to_string(), "sup".to_string(), false),
3678                    ]
3679            })
3680            .await;
3681
3682        assert_eq!(
3683            server
3684                .state()
3685                .await
3686                .channel(channel_id)
3687                .unwrap()
3688                .connection_ids
3689                .len(),
3690            2
3691        );
3692        cx_b.update(|_| drop(channel_b));
3693        server
3694            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3695            .await;
3696
3697        cx_a.update(|_| drop(channel_a));
3698        server
3699            .condition(|state| state.channel(channel_id).is_none())
3700            .await;
3701    }
3702
3703    #[gpui::test(iterations = 10)]
3704    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3705        cx_a.foreground().forbid_parking();
3706
3707        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3708        let client_a = server.create_client(&mut cx_a, "user_a").await;
3709
3710        let db = &server.app_state.db;
3711        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3712        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3713        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3714            .await
3715            .unwrap();
3716        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3717            .await
3718            .unwrap();
3719
3720        let channels_a = cx_a
3721            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3722        channels_a
3723            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3724            .await;
3725        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3726            this.get_channel(channel_id.to_proto(), cx).unwrap()
3727        });
3728
3729        // Messages aren't allowed to be too long.
3730        channel_a
3731            .update(&mut cx_a, |channel, cx| {
3732                let long_body = "this is long.\n".repeat(1024);
3733                channel.send_message(long_body, cx).unwrap()
3734            })
3735            .await
3736            .unwrap_err();
3737
3738        // Messages aren't allowed to be blank.
3739        channel_a.update(&mut cx_a, |channel, cx| {
3740            channel.send_message(String::new(), cx).unwrap_err()
3741        });
3742
3743        // Leading and trailing whitespace are trimmed.
3744        channel_a
3745            .update(&mut cx_a, |channel, cx| {
3746                channel
3747                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3748                    .unwrap()
3749            })
3750            .await
3751            .unwrap();
3752        assert_eq!(
3753            db.get_channel_messages(channel_id, 10, None)
3754                .await
3755                .unwrap()
3756                .iter()
3757                .map(|m| &m.body)
3758                .collect::<Vec<_>>(),
3759            &["surrounded by whitespace"]
3760        );
3761    }
3762
3763    #[gpui::test(iterations = 10)]
3764    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3765        cx_a.foreground().forbid_parking();
3766
3767        // Connect to a server as 2 clients.
3768        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3769        let client_a = server.create_client(&mut cx_a, "user_a").await;
3770        let client_b = server.create_client(&mut cx_b, "user_b").await;
3771        let mut status_b = client_b.status();
3772
3773        // Create an org that includes these 2 users.
3774        let db = &server.app_state.db;
3775        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3776        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3777            .await
3778            .unwrap();
3779        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3780            .await
3781            .unwrap();
3782
3783        // Create a channel that includes all the users.
3784        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3785        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3786            .await
3787            .unwrap();
3788        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3789            .await
3790            .unwrap();
3791        db.create_channel_message(
3792            channel_id,
3793            client_b.current_user_id(&cx_b),
3794            "hello A, it's B.",
3795            OffsetDateTime::now_utc(),
3796            2,
3797        )
3798        .await
3799        .unwrap();
3800
3801        let channels_a = cx_a
3802            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3803        channels_a
3804            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3805            .await;
3806
3807        channels_a.read_with(&cx_a, |list, _| {
3808            assert_eq!(
3809                list.available_channels().unwrap(),
3810                &[ChannelDetails {
3811                    id: channel_id.to_proto(),
3812                    name: "test-channel".to_string()
3813                }]
3814            )
3815        });
3816        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3817            this.get_channel(channel_id.to_proto(), cx).unwrap()
3818        });
3819        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3820        channel_a
3821            .condition(&cx_a, |channel, _| {
3822                channel_messages(channel)
3823                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3824            })
3825            .await;
3826
3827        let channels_b = cx_b
3828            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3829        channels_b
3830            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3831            .await;
3832        channels_b.read_with(&cx_b, |list, _| {
3833            assert_eq!(
3834                list.available_channels().unwrap(),
3835                &[ChannelDetails {
3836                    id: channel_id.to_proto(),
3837                    name: "test-channel".to_string()
3838                }]
3839            )
3840        });
3841
3842        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3843            this.get_channel(channel_id.to_proto(), cx).unwrap()
3844        });
3845        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3846        channel_b
3847            .condition(&cx_b, |channel, _| {
3848                channel_messages(channel)
3849                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3850            })
3851            .await;
3852
3853        // Disconnect client B, ensuring we can still access its cached channel data.
3854        server.forbid_connections();
3855        server.disconnect_client(client_b.current_user_id(&cx_b));
3856        while !matches!(
3857            status_b.next().await,
3858            Some(client::Status::ReconnectionError { .. })
3859        ) {}
3860
3861        channels_b.read_with(&cx_b, |channels, _| {
3862            assert_eq!(
3863                channels.available_channels().unwrap(),
3864                [ChannelDetails {
3865                    id: channel_id.to_proto(),
3866                    name: "test-channel".to_string()
3867                }]
3868            )
3869        });
3870        channel_b.read_with(&cx_b, |channel, _| {
3871            assert_eq!(
3872                channel_messages(channel),
3873                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3874            )
3875        });
3876
3877        // Send a message from client B while it is disconnected.
3878        channel_b
3879            .update(&mut cx_b, |channel, cx| {
3880                let task = channel
3881                    .send_message("can you see this?".to_string(), cx)
3882                    .unwrap();
3883                assert_eq!(
3884                    channel_messages(channel),
3885                    &[
3886                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3887                        ("user_b".to_string(), "can you see this?".to_string(), true)
3888                    ]
3889                );
3890                task
3891            })
3892            .await
3893            .unwrap_err();
3894
3895        // Send a message from client A while B is disconnected.
3896        channel_a
3897            .update(&mut cx_a, |channel, cx| {
3898                channel
3899                    .send_message("oh, hi B.".to_string(), cx)
3900                    .unwrap()
3901                    .detach();
3902                let task = channel.send_message("sup".to_string(), cx).unwrap();
3903                assert_eq!(
3904                    channel_messages(channel),
3905                    &[
3906                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3907                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3908                        ("user_a".to_string(), "sup".to_string(), true)
3909                    ]
3910                );
3911                task
3912            })
3913            .await
3914            .unwrap();
3915
3916        // Give client B a chance to reconnect.
3917        server.allow_connections();
3918        cx_b.foreground().advance_clock(Duration::from_secs(10));
3919
3920        // Verify that B sees the new messages upon reconnection, as well as the message client B
3921        // sent while offline.
3922        channel_b
3923            .condition(&cx_b, |channel, _| {
3924                channel_messages(channel)
3925                    == [
3926                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3927                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3928                        ("user_a".to_string(), "sup".to_string(), false),
3929                        ("user_b".to_string(), "can you see this?".to_string(), false),
3930                    ]
3931            })
3932            .await;
3933
3934        // Ensure client A and B can communicate normally after reconnection.
3935        channel_a
3936            .update(&mut cx_a, |channel, cx| {
3937                channel.send_message("you online?".to_string(), cx).unwrap()
3938            })
3939            .await
3940            .unwrap();
3941        channel_b
3942            .condition(&cx_b, |channel, _| {
3943                channel_messages(channel)
3944                    == [
3945                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3946                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3947                        ("user_a".to_string(), "sup".to_string(), false),
3948                        ("user_b".to_string(), "can you see this?".to_string(), false),
3949                        ("user_a".to_string(), "you online?".to_string(), false),
3950                    ]
3951            })
3952            .await;
3953
3954        channel_b
3955            .update(&mut cx_b, |channel, cx| {
3956                channel.send_message("yep".to_string(), cx).unwrap()
3957            })
3958            .await
3959            .unwrap();
3960        channel_a
3961            .condition(&cx_a, |channel, _| {
3962                channel_messages(channel)
3963                    == [
3964                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3965                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3966                        ("user_a".to_string(), "sup".to_string(), false),
3967                        ("user_b".to_string(), "can you see this?".to_string(), false),
3968                        ("user_a".to_string(), "you online?".to_string(), false),
3969                        ("user_b".to_string(), "yep".to_string(), false),
3970                    ]
3971            })
3972            .await;
3973    }
3974
3975    #[gpui::test(iterations = 10)]
3976    async fn test_contacts(
3977        mut cx_a: TestAppContext,
3978        mut cx_b: TestAppContext,
3979        mut cx_c: TestAppContext,
3980    ) {
3981        cx_a.foreground().forbid_parking();
3982        let lang_registry = Arc::new(LanguageRegistry::new());
3983        let fs = FakeFs::new(cx_a.background());
3984
3985        // Connect to a server as 3 clients.
3986        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3987        let client_a = server.create_client(&mut cx_a, "user_a").await;
3988        let client_b = server.create_client(&mut cx_b, "user_b").await;
3989        let client_c = server.create_client(&mut cx_c, "user_c").await;
3990
3991        // Share a worktree as client A.
3992        fs.insert_tree(
3993            "/a",
3994            json!({
3995                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3996            }),
3997        )
3998        .await;
3999
4000        let project_a = cx_a.update(|cx| {
4001            Project::local(
4002                client_a.clone(),
4003                client_a.user_store.clone(),
4004                lang_registry.clone(),
4005                fs.clone(),
4006                cx,
4007            )
4008        });
4009        let (worktree_a, _) = project_a
4010            .update(&mut cx_a, |p, cx| {
4011                p.find_or_create_local_worktree("/a", false, cx)
4012            })
4013            .await
4014            .unwrap();
4015        worktree_a
4016            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4017            .await;
4018
4019        client_a
4020            .user_store
4021            .condition(&cx_a, |user_store, _| {
4022                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4023            })
4024            .await;
4025        client_b
4026            .user_store
4027            .condition(&cx_b, |user_store, _| {
4028                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4029            })
4030            .await;
4031        client_c
4032            .user_store
4033            .condition(&cx_c, |user_store, _| {
4034                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4035            })
4036            .await;
4037
4038        let project_id = project_a
4039            .update(&mut cx_a, |project, _| project.next_remote_id())
4040            .await;
4041        project_a
4042            .update(&mut cx_a, |project, cx| project.share(cx))
4043            .await
4044            .unwrap();
4045
4046        let _project_b = Project::remote(
4047            project_id,
4048            client_b.clone(),
4049            client_b.user_store.clone(),
4050            lang_registry.clone(),
4051            fs.clone(),
4052            &mut cx_b.to_async(),
4053        )
4054        .await
4055        .unwrap();
4056
4057        client_a
4058            .user_store
4059            .condition(&cx_a, |user_store, _| {
4060                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4061            })
4062            .await;
4063        client_b
4064            .user_store
4065            .condition(&cx_b, |user_store, _| {
4066                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4067            })
4068            .await;
4069        client_c
4070            .user_store
4071            .condition(&cx_c, |user_store, _| {
4072                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4073            })
4074            .await;
4075
4076        project_a
4077            .condition(&cx_a, |project, _| {
4078                project.collaborators().contains_key(&client_b.peer_id)
4079            })
4080            .await;
4081
4082        cx_a.update(move |_| drop(project_a));
4083        client_a
4084            .user_store
4085            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4086            .await;
4087        client_b
4088            .user_store
4089            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4090            .await;
4091        client_c
4092            .user_store
4093            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4094            .await;
4095
4096        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4097            user_store
4098                .contacts()
4099                .iter()
4100                .map(|contact| {
4101                    let worktrees = contact
4102                        .projects
4103                        .iter()
4104                        .map(|p| {
4105                            (
4106                                p.worktree_root_names[0].as_str(),
4107                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4108                            )
4109                        })
4110                        .collect();
4111                    (contact.user.github_login.as_str(), worktrees)
4112                })
4113                .collect()
4114        }
4115    }
4116
4117    #[gpui::test(iterations = 100)]
4118    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4119        cx.foreground().forbid_parking();
4120        let max_peers = env::var("MAX_PEERS")
4121            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4122            .unwrap_or(5);
4123        let max_operations = env::var("OPERATIONS")
4124            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4125            .unwrap_or(10);
4126
4127        let rng = Arc::new(Mutex::new(rng));
4128
4129        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4130        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4131
4132        let fs = FakeFs::new(cx.background());
4133        fs.insert_tree(
4134            "/_collab",
4135            json!({
4136                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4137            }),
4138        )
4139        .await;
4140
4141        let operations = Rc::new(Cell::new(0));
4142        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4143        let mut clients = Vec::new();
4144
4145        let mut next_entity_id = 100000;
4146        let mut host_cx = TestAppContext::new(
4147            cx.foreground_platform(),
4148            cx.platform(),
4149            cx.foreground(),
4150            cx.background(),
4151            cx.font_cache(),
4152            next_entity_id,
4153        );
4154        let host = server.create_client(&mut host_cx, "host").await;
4155        let host_project = host_cx.update(|cx| {
4156            Project::local(
4157                host.client.clone(),
4158                host.user_store.clone(),
4159                Arc::new(LanguageRegistry::new()),
4160                fs.clone(),
4161                cx,
4162            )
4163        });
4164        let host_project_id = host_project
4165            .update(&mut host_cx, |p, _| p.next_remote_id())
4166            .await;
4167
4168        let (collab_worktree, _) = host_project
4169            .update(&mut host_cx, |project, cx| {
4170                project.find_or_create_local_worktree("/_collab", false, cx)
4171            })
4172            .await
4173            .unwrap();
4174        collab_worktree
4175            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4176            .await;
4177        host_project
4178            .update(&mut host_cx, |project, cx| project.share(cx))
4179            .await
4180            .unwrap();
4181
4182        clients.push(cx.foreground().spawn(host.simulate_host(
4183            host_project.clone(),
4184            language_server_config,
4185            operations.clone(),
4186            max_operations,
4187            rng.clone(),
4188            host_cx.clone(),
4189        )));
4190
4191        while operations.get() < max_operations {
4192            cx.background().simulate_random_delay().await;
4193            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4194                operations.set(operations.get() + 1);
4195
4196                let guest_id = clients.len();
4197                log::info!("Adding guest {}", guest_id);
4198                next_entity_id += 100000;
4199                let mut guest_cx = TestAppContext::new(
4200                    cx.foreground_platform(),
4201                    cx.platform(),
4202                    cx.foreground(),
4203                    cx.background(),
4204                    cx.font_cache(),
4205                    next_entity_id,
4206                );
4207                let guest = server
4208                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4209                    .await;
4210                let guest_project = Project::remote(
4211                    host_project_id,
4212                    guest.client.clone(),
4213                    guest.user_store.clone(),
4214                    guest_lang_registry.clone(),
4215                    fs.clone(),
4216                    &mut guest_cx.to_async(),
4217                )
4218                .await
4219                .unwrap();
4220                clients.push(cx.foreground().spawn(guest.simulate_guest(
4221                    guest_id,
4222                    guest_project,
4223                    operations.clone(),
4224                    max_operations,
4225                    rng.clone(),
4226                    guest_cx,
4227                )));
4228
4229                log::info!("Guest {} added", guest_id);
4230            }
4231        }
4232
4233        let clients = futures::future::join_all(clients).await;
4234        cx.foreground().run_until_parked();
4235
4236        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4237            project
4238                .worktrees(cx)
4239                .map(|worktree| {
4240                    let snapshot = worktree.read(cx).snapshot();
4241                    (snapshot.id(), snapshot)
4242                })
4243                .collect::<BTreeMap<_, _>>()
4244        });
4245
4246        for (guest_client, guest_cx) in clients.iter().skip(1) {
4247            let guest_id = guest_client.client.id();
4248            let worktree_snapshots =
4249                guest_client
4250                    .project
4251                    .as_ref()
4252                    .unwrap()
4253                    .read_with(guest_cx, |project, cx| {
4254                        project
4255                            .worktrees(cx)
4256                            .map(|worktree| {
4257                                let worktree = worktree.read(cx);
4258                                assert!(
4259                                    !worktree.as_remote().unwrap().has_pending_updates(),
4260                                    "Guest {} worktree {:?} contains deferred updates",
4261                                    guest_id,
4262                                    worktree.id()
4263                                );
4264                                (worktree.id(), worktree.snapshot())
4265                            })
4266                            .collect::<BTreeMap<_, _>>()
4267                    });
4268
4269            assert_eq!(
4270                worktree_snapshots.keys().collect::<Vec<_>>(),
4271                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4272                "guest {} has different worktrees than the host",
4273                guest_id
4274            );
4275            for (id, host_snapshot) in &host_worktree_snapshots {
4276                let guest_snapshot = &worktree_snapshots[id];
4277                assert_eq!(
4278                    guest_snapshot.root_name(),
4279                    host_snapshot.root_name(),
4280                    "guest {} has different root name than the host for worktree {}",
4281                    guest_id,
4282                    id
4283                );
4284                assert_eq!(
4285                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4286                    host_snapshot.entries(false).collect::<Vec<_>>(),
4287                    "guest {} has different snapshot than the host for worktree {}",
4288                    guest_id,
4289                    id
4290                );
4291            }
4292
4293            guest_client
4294                .project
4295                .as_ref()
4296                .unwrap()
4297                .read_with(guest_cx, |project, _| {
4298                    assert!(
4299                        !project.has_buffered_operations(),
4300                        "guest {} has buffered operations ",
4301                        guest_id,
4302                    );
4303                });
4304
4305            for guest_buffer in &guest_client.buffers {
4306                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4307                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4308                    project
4309                        .shared_buffer(guest_client.peer_id, buffer_id)
4310                        .expect(&format!(
4311                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4312                            guest_id, guest_client.peer_id, buffer_id
4313                        ))
4314                });
4315                assert_eq!(
4316                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4317                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4318                    "guest {} buffer {} differs from the host's buffer",
4319                    guest_id,
4320                    buffer_id,
4321                );
4322            }
4323        }
4324    }
4325
4326    struct TestServer {
4327        peer: Arc<Peer>,
4328        app_state: Arc<AppState>,
4329        server: Arc<Server>,
4330        foreground: Rc<executor::Foreground>,
4331        notifications: mpsc::UnboundedReceiver<()>,
4332        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4333        forbid_connections: Arc<AtomicBool>,
4334        _test_db: TestDb,
4335    }
4336
4337    impl TestServer {
4338        async fn start(
4339            foreground: Rc<executor::Foreground>,
4340            background: Arc<executor::Background>,
4341        ) -> Self {
4342            let test_db = TestDb::fake(background);
4343            let app_state = Self::build_app_state(&test_db).await;
4344            let peer = Peer::new();
4345            let notifications = mpsc::unbounded();
4346            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4347            Self {
4348                peer,
4349                app_state,
4350                server,
4351                foreground,
4352                notifications: notifications.1,
4353                connection_killers: Default::default(),
4354                forbid_connections: Default::default(),
4355                _test_db: test_db,
4356            }
4357        }
4358
4359        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4360            let http = FakeHttpClient::with_404_response();
4361            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4362            let client_name = name.to_string();
4363            let mut client = Client::new(http.clone());
4364            let server = self.server.clone();
4365            let connection_killers = self.connection_killers.clone();
4366            let forbid_connections = self.forbid_connections.clone();
4367            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4368
4369            Arc::get_mut(&mut client)
4370                .unwrap()
4371                .override_authenticate(move |cx| {
4372                    cx.spawn(|_| async move {
4373                        let access_token = "the-token".to_string();
4374                        Ok(Credentials {
4375                            user_id: user_id.0 as u64,
4376                            access_token,
4377                        })
4378                    })
4379                })
4380                .override_establish_connection(move |credentials, cx| {
4381                    assert_eq!(credentials.user_id, user_id.0 as u64);
4382                    assert_eq!(credentials.access_token, "the-token");
4383
4384                    let server = server.clone();
4385                    let connection_killers = connection_killers.clone();
4386                    let forbid_connections = forbid_connections.clone();
4387                    let client_name = client_name.clone();
4388                    let connection_id_tx = connection_id_tx.clone();
4389                    cx.spawn(move |cx| async move {
4390                        if forbid_connections.load(SeqCst) {
4391                            Err(EstablishConnectionError::other(anyhow!(
4392                                "server is forbidding connections"
4393                            )))
4394                        } else {
4395                            let (client_conn, server_conn, kill_conn) =
4396                                Connection::in_memory(cx.background());
4397                            connection_killers.lock().insert(user_id, kill_conn);
4398                            cx.background()
4399                                .spawn(server.handle_connection(
4400                                    server_conn,
4401                                    client_name,
4402                                    user_id,
4403                                    Some(connection_id_tx),
4404                                    cx.background(),
4405                                ))
4406                                .detach();
4407                            Ok(client_conn)
4408                        }
4409                    })
4410                });
4411
4412            client
4413                .authenticate_and_connect(&cx.to_async())
4414                .await
4415                .unwrap();
4416
4417            Channel::init(&client);
4418            Project::init(&client);
4419
4420            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4421            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4422            let mut authed_user =
4423                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4424            while authed_user.next().await.unwrap().is_none() {}
4425
4426            TestClient {
4427                client,
4428                peer_id,
4429                user_store,
4430                project: Default::default(),
4431                buffers: Default::default(),
4432            }
4433        }
4434
4435        fn disconnect_client(&self, user_id: UserId) {
4436            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4437                let _ = kill_conn.try_send(Some(()));
4438            }
4439        }
4440
4441        fn forbid_connections(&self) {
4442            self.forbid_connections.store(true, SeqCst);
4443        }
4444
4445        fn allow_connections(&self) {
4446            self.forbid_connections.store(false, SeqCst);
4447        }
4448
4449        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4450            let mut config = Config::default();
4451            config.session_secret = "a".repeat(32);
4452            config.database_url = test_db.url.clone();
4453            let github_client = github::AppClient::test();
4454            Arc::new(AppState {
4455                db: test_db.db().clone(),
4456                handlebars: Default::default(),
4457                auth_client: auth::build_client("", ""),
4458                repo_client: github::RepoClient::test(&github_client),
4459                github_client,
4460                config,
4461            })
4462        }
4463
4464        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4465            self.server.store.read()
4466        }
4467
4468        async fn condition<F>(&mut self, mut predicate: F)
4469        where
4470            F: FnMut(&Store) -> bool,
4471        {
4472            async_std::future::timeout(Duration::from_millis(500), async {
4473                while !(predicate)(&*self.server.store.read()) {
4474                    self.foreground.start_waiting();
4475                    self.notifications.next().await;
4476                    self.foreground.finish_waiting();
4477                }
4478            })
4479            .await
4480            .expect("condition timed out");
4481        }
4482    }
4483
4484    impl Drop for TestServer {
4485        fn drop(&mut self) {
4486            self.peer.reset();
4487        }
4488    }
4489
4490    struct TestClient {
4491        client: Arc<Client>,
4492        pub peer_id: PeerId,
4493        pub user_store: ModelHandle<UserStore>,
4494        project: Option<ModelHandle<Project>>,
4495        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4496    }
4497
4498    impl Deref for TestClient {
4499        type Target = Arc<Client>;
4500
4501        fn deref(&self) -> &Self::Target {
4502            &self.client
4503        }
4504    }
4505
4506    impl TestClient {
4507        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4508            UserId::from_proto(
4509                self.user_store
4510                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4511            )
4512        }
4513
4514        fn simulate_host(
4515            mut self,
4516            project: ModelHandle<Project>,
4517            mut language_server_config: LanguageServerConfig,
4518            operations: Rc<Cell<usize>>,
4519            max_operations: usize,
4520            rng: Arc<Mutex<StdRng>>,
4521            mut cx: TestAppContext,
4522        ) -> impl Future<Output = (Self, TestAppContext)> {
4523            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4524
4525            // Set up a fake language server.
4526            language_server_config.set_fake_initializer({
4527                let rng = rng.clone();
4528                let files = files.clone();
4529                move |fake_server| {
4530                    fake_server.handle_request::<lsp::request::Completion, _>(|_| {
4531                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4532                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4533                                range: lsp::Range::new(
4534                                    lsp::Position::new(0, 0),
4535                                    lsp::Position::new(0, 0),
4536                                ),
4537                                new_text: "the-new-text".to_string(),
4538                            })),
4539                            ..Default::default()
4540                        }]))
4541                    });
4542
4543                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_| {
4544                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4545                            lsp::CodeAction {
4546                                title: "the-code-action".to_string(),
4547                                ..Default::default()
4548                            },
4549                        )])
4550                    });
4551
4552                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(|params| {
4553                        Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4554                            params.position,
4555                            params.position,
4556                        )))
4557                    });
4558
4559                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4560                        let files = files.clone();
4561                        let rng = rng.clone();
4562                        move |_| {
4563                            let files = files.lock();
4564                            let mut rng = rng.lock();
4565                            let count = rng.gen_range::<usize, _>(1..3);
4566                            Some(lsp::GotoDefinitionResponse::Array(
4567                                (0..count)
4568                                    .map(|_| {
4569                                        let file = files.choose(&mut *rng).unwrap().as_path();
4570                                        lsp::Location {
4571                                            uri: lsp::Url::from_file_path(file).unwrap(),
4572                                            range: Default::default(),
4573                                        }
4574                                    })
4575                                    .collect(),
4576                            ))
4577                        }
4578                    });
4579                }
4580            });
4581
4582            project.update(&mut cx, |project, _| {
4583                project.languages().add(Arc::new(Language::new(
4584                    LanguageConfig {
4585                        name: "Rust".into(),
4586                        path_suffixes: vec!["rs".to_string()],
4587                        language_server: Some(language_server_config),
4588                        ..Default::default()
4589                    },
4590                    None,
4591                )));
4592            });
4593
4594            async move {
4595                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4596                while operations.get() < max_operations {
4597                    operations.set(operations.get() + 1);
4598
4599                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4600                    match distribution {
4601                        0..=20 if !files.lock().is_empty() => {
4602                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4603                            let mut path = path.as_path();
4604                            while let Some(parent_path) = path.parent() {
4605                                path = parent_path;
4606                                if rng.lock().gen() {
4607                                    break;
4608                                }
4609                            }
4610
4611                            log::info!("Host: find/create local worktree {:?}", path);
4612                            project
4613                                .update(&mut cx, |project, cx| {
4614                                    project.find_or_create_local_worktree(path, false, cx)
4615                                })
4616                                .await
4617                                .unwrap();
4618                        }
4619                        10..=80 if !files.lock().is_empty() => {
4620                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4621                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4622                                let (worktree, path) = project
4623                                    .update(&mut cx, |project, cx| {
4624                                        project.find_or_create_local_worktree(file, false, cx)
4625                                    })
4626                                    .await
4627                                    .unwrap();
4628                                let project_path =
4629                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4630                                log::info!("Host: opening path {:?}", project_path);
4631                                let buffer = project
4632                                    .update(&mut cx, |project, cx| {
4633                                        project.open_buffer(project_path, cx)
4634                                    })
4635                                    .await
4636                                    .unwrap();
4637                                self.buffers.insert(buffer.clone());
4638                                buffer
4639                            } else {
4640                                self.buffers
4641                                    .iter()
4642                                    .choose(&mut *rng.lock())
4643                                    .unwrap()
4644                                    .clone()
4645                            };
4646
4647                            if rng.lock().gen_bool(0.1) {
4648                                cx.update(|cx| {
4649                                    log::info!(
4650                                        "Host: dropping buffer {:?}",
4651                                        buffer.read(cx).file().unwrap().full_path(cx)
4652                                    );
4653                                    self.buffers.remove(&buffer);
4654                                    drop(buffer);
4655                                });
4656                            } else {
4657                                buffer.update(&mut cx, |buffer, cx| {
4658                                    log::info!(
4659                                        "Host: updating buffer {:?}",
4660                                        buffer.file().unwrap().full_path(cx)
4661                                    );
4662                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4663                                });
4664                            }
4665                        }
4666                        _ => loop {
4667                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4668                            let mut path = PathBuf::new();
4669                            path.push("/");
4670                            for _ in 0..path_component_count {
4671                                let letter = rng.lock().gen_range(b'a'..=b'z');
4672                                path.push(std::str::from_utf8(&[letter]).unwrap());
4673                            }
4674                            path.set_extension("rs");
4675                            let parent_path = path.parent().unwrap();
4676
4677                            log::info!("Host: creating file {:?}", path,);
4678
4679                            if fs.create_dir(&parent_path).await.is_ok()
4680                                && fs.create_file(&path, Default::default()).await.is_ok()
4681                            {
4682                                files.lock().push(path);
4683                                break;
4684                            } else {
4685                                log::info!("Host: cannot create file");
4686                            }
4687                        },
4688                    }
4689
4690                    cx.background().simulate_random_delay().await;
4691                }
4692
4693                self.project = Some(project);
4694                (self, cx)
4695            }
4696        }
4697
4698        pub async fn simulate_guest(
4699            mut self,
4700            guest_id: usize,
4701            project: ModelHandle<Project>,
4702            operations: Rc<Cell<usize>>,
4703            max_operations: usize,
4704            rng: Arc<Mutex<StdRng>>,
4705            mut cx: TestAppContext,
4706        ) -> (Self, TestAppContext) {
4707            while operations.get() < max_operations {
4708                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4709                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4710                        project
4711                            .worktrees(&cx)
4712                            .filter(|worktree| {
4713                                worktree.read(cx).entries(false).any(|e| e.is_file())
4714                            })
4715                            .choose(&mut *rng.lock())
4716                    }) {
4717                        worktree
4718                    } else {
4719                        cx.background().simulate_random_delay().await;
4720                        continue;
4721                    };
4722
4723                    operations.set(operations.get() + 1);
4724                    let project_path = worktree.read_with(&cx, |worktree, _| {
4725                        let entry = worktree
4726                            .entries(false)
4727                            .filter(|e| e.is_file())
4728                            .choose(&mut *rng.lock())
4729                            .unwrap();
4730                        (worktree.id(), entry.path.clone())
4731                    });
4732                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4733                    let buffer = project
4734                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4735                        .await
4736                        .unwrap();
4737                    self.buffers.insert(buffer.clone());
4738                    buffer
4739                } else {
4740                    operations.set(operations.get() + 1);
4741
4742                    self.buffers
4743                        .iter()
4744                        .choose(&mut *rng.lock())
4745                        .unwrap()
4746                        .clone()
4747                };
4748
4749                let choice = rng.lock().gen_range(0..100);
4750                match choice {
4751                    0..=9 => {
4752                        cx.update(|cx| {
4753                            log::info!(
4754                                "Guest {}: dropping buffer {:?}",
4755                                guest_id,
4756                                buffer.read(cx).file().unwrap().full_path(cx)
4757                            );
4758                            self.buffers.remove(&buffer);
4759                            drop(buffer);
4760                        });
4761                    }
4762                    10..=19 => {
4763                        let completions = project.update(&mut cx, |project, cx| {
4764                            log::info!(
4765                                "Guest {}: requesting completions for buffer {:?}",
4766                                guest_id,
4767                                buffer.read(cx).file().unwrap().full_path(cx)
4768                            );
4769                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4770                            project.completions(&buffer, offset, cx)
4771                        });
4772                        let completions = cx.background().spawn(async move {
4773                            completions.await.expect("completions request failed");
4774                        });
4775                        if rng.lock().gen_bool(0.3) {
4776                            log::info!("Guest {}: detaching completions request", guest_id);
4777                            completions.detach();
4778                        } else {
4779                            completions.await;
4780                        }
4781                    }
4782                    20..=29 => {
4783                        let code_actions = project.update(&mut cx, |project, cx| {
4784                            log::info!(
4785                                "Guest {}: requesting code actions for buffer {:?}",
4786                                guest_id,
4787                                buffer.read(cx).file().unwrap().full_path(cx)
4788                            );
4789                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4790                            project.code_actions(&buffer, range, cx)
4791                        });
4792                        let code_actions = cx.background().spawn(async move {
4793                            code_actions.await.expect("code actions request failed");
4794                        });
4795                        if rng.lock().gen_bool(0.3) {
4796                            log::info!("Guest {}: detaching code actions request", guest_id);
4797                            code_actions.detach();
4798                        } else {
4799                            code_actions.await;
4800                        }
4801                    }
4802                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4803                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4804                            log::info!(
4805                                "Guest {}: saving buffer {:?}",
4806                                guest_id,
4807                                buffer.file().unwrap().full_path(cx)
4808                            );
4809                            (buffer.version(), buffer.save(cx))
4810                        });
4811                        let save = cx.spawn(|cx| async move {
4812                            let (saved_version, _) = save.await.expect("save request failed");
4813                            buffer.read_with(&cx, |buffer, _| {
4814                                assert!(buffer.version().observed_all(&saved_version));
4815                                assert!(saved_version.observed_all(&requested_version));
4816                            });
4817                        });
4818                        if rng.lock().gen_bool(0.3) {
4819                            log::info!("Guest {}: detaching save request", guest_id);
4820                            save.detach();
4821                        } else {
4822                            save.await;
4823                        }
4824                    }
4825                    40..=45 => {
4826                        let prepare_rename = project.update(&mut cx, |project, cx| {
4827                            log::info!(
4828                                "Guest {}: preparing rename for buffer {:?}",
4829                                guest_id,
4830                                buffer.read(cx).file().unwrap().full_path(cx)
4831                            );
4832                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4833                            project.prepare_rename(buffer, offset, cx)
4834                        });
4835                        let prepare_rename = cx.background().spawn(async move {
4836                            prepare_rename.await.expect("prepare rename request failed");
4837                        });
4838                        if rng.lock().gen_bool(0.3) {
4839                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4840                            prepare_rename.detach();
4841                        } else {
4842                            prepare_rename.await;
4843                        }
4844                    }
4845                    46..=49 => {
4846                        let definitions = project.update(&mut cx, |project, cx| {
4847                            log::info!(
4848                                "Guest {}: requesting defintions for buffer {:?}",
4849                                guest_id,
4850                                buffer.read(cx).file().unwrap().full_path(cx)
4851                            );
4852                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4853                            project.definition(&buffer, offset, cx)
4854                        });
4855                        let definitions = cx.background().spawn(async move {
4856                            definitions.await.expect("definitions request failed");
4857                        });
4858                        if rng.lock().gen_bool(0.3) {
4859                            log::info!("Guest {}: detaching definitions request", guest_id);
4860                            definitions.detach();
4861                        } else {
4862                            definitions.await;
4863                        }
4864                    }
4865                    _ => {
4866                        buffer.update(&mut cx, |buffer, cx| {
4867                            log::info!(
4868                                "Guest {}: updating buffer {:?}",
4869                                guest_id,
4870                                buffer.file().unwrap().full_path(cx)
4871                            );
4872                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4873                        });
4874                    }
4875                }
4876                cx.background().simulate_random_delay().await;
4877            }
4878
4879            self.project = Some(project);
4880            (self, cx)
4881        }
4882    }
4883
4884    impl Executor for Arc<gpui::executor::Background> {
4885        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
4886            self.spawn(future).detach();
4887        }
4888    }
4889
4890    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
4891        channel
4892            .messages()
4893            .cursor::<()>()
4894            .map(|m| {
4895                (
4896                    m.sender.github_login.clone(),
4897                    m.body.clone(),
4898                    m.is_pending(),
4899                )
4900            })
4901            .collect()
4902    }
4903
4904    struct EmptyView;
4905
4906    impl gpui::Entity for EmptyView {
4907        type Event = ();
4908    }
4909
4910    impl gpui::View for EmptyView {
4911        fn ui_name() -> &'static str {
4912            "empty view"
4913        }
4914
4915        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
4916            gpui::Element::boxed(gpui::elements::Empty)
4917        }
4918    }
4919}