rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::get_definition)
  81            .add_request_handler(Server::get_references)
  82            .add_request_handler(Server::get_document_highlights)
  83            .add_request_handler(Server::get_project_symbols)
  84            .add_request_handler(Server::open_buffer_for_symbol)
  85            .add_request_handler(Server::open_buffer)
  86            .add_message_handler(Server::close_buffer)
  87            .add_request_handler(Server::update_buffer)
  88            .add_message_handler(Server::update_buffer_file)
  89            .add_message_handler(Server::buffer_reloaded)
  90            .add_message_handler(Server::buffer_saved)
  91            .add_request_handler(Server::save_buffer)
  92            .add_request_handler(Server::format_buffers)
  93            .add_request_handler(Server::get_completions)
  94            .add_request_handler(Server::apply_additional_edits_for_completion)
  95            .add_request_handler(Server::get_code_actions)
  96            .add_request_handler(Server::apply_code_action)
  97            .add_request_handler(Server::prepare_rename)
  98            .add_request_handler(Server::perform_rename)
  99            .add_request_handler(Server::get_channels)
 100            .add_request_handler(Server::get_users)
 101            .add_request_handler(Server::join_channel)
 102            .add_message_handler(Server::leave_channel)
 103            .add_request_handler(Server::send_channel_message)
 104            .add_request_handler(Server::get_channel_messages);
 105
 106        Arc::new(server)
 107    }
 108
 109    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 110    where
 111        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 112        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 113        M: EnvelopedMessage,
 114    {
 115        let prev_handler = self.handlers.insert(
 116            TypeId::of::<M>(),
 117            Box::new(move |server, envelope| {
 118                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 119                (handler)(server, *envelope).boxed()
 120            }),
 121        );
 122        if prev_handler.is_some() {
 123            panic!("registered a handler for the same message twice");
 124        }
 125        self
 126    }
 127
 128    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 129    where
 130        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 131        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 132        M: RequestMessage,
 133    {
 134        self.add_message_handler(move |server, envelope| {
 135            let receipt = envelope.receipt();
 136            let response = (handler)(server.clone(), envelope);
 137            async move {
 138                match response.await {
 139                    Ok(response) => {
 140                        server.peer.respond(receipt, response)?;
 141                        Ok(())
 142                    }
 143                    Err(error) => {
 144                        server.peer.respond_with_error(
 145                            receipt,
 146                            proto::Error {
 147                                message: error.to_string(),
 148                            },
 149                        )?;
 150                        Err(error)
 151                    }
 152                }
 153            }
 154        })
 155    }
 156
 157    pub fn handle_connection<E: Executor>(
 158        self: &Arc<Self>,
 159        connection: Connection,
 160        addr: String,
 161        user_id: UserId,
 162        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 163        executor: E,
 164    ) -> impl Future<Output = ()> {
 165        let mut this = self.clone();
 166        async move {
 167            let (connection_id, handle_io, mut incoming_rx) =
 168                this.peer.add_connection(connection).await;
 169
 170            if let Some(send_connection_id) = send_connection_id.as_mut() {
 171                let _ = send_connection_id.send(connection_id).await;
 172            }
 173
 174            this.state_mut().add_connection(connection_id, user_id);
 175            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 176                log::error!("error updating contacts for {:?}: {}", user_id, err);
 177            }
 178
 179            let handle_io = handle_io.fuse();
 180            futures::pin_mut!(handle_io);
 181            loop {
 182                let next_message = incoming_rx.next().fuse();
 183                futures::pin_mut!(next_message);
 184                futures::select_biased! {
 185                    result = handle_io => {
 186                        if let Err(err) = result {
 187                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 188                        }
 189                        break;
 190                    }
 191                    message = next_message => {
 192                        if let Some(message) = message {
 193                            let start_time = Instant::now();
 194                            let type_name = message.payload_type_name();
 195                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 196                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 197                                let notifications = this.notifications.clone();
 198                                let is_background = message.is_background();
 199                                let handle_message = (handler)(this.clone(), message);
 200                                let handle_message = async move {
 201                                    if let Err(err) = handle_message.await {
 202                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 203                                    } else {
 204                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 205                                    }
 206                                    if let Some(mut notifications) = notifications {
 207                                        let _ = notifications.send(()).await;
 208                                    }
 209                                };
 210                                if is_background {
 211                                    executor.spawn_detached(handle_message);
 212                                } else {
 213                                    handle_message.await;
 214                                }
 215                            } else {
 216                                log::warn!("unhandled message: {}", type_name);
 217                            }
 218                        } else {
 219                            log::info!("rpc connection closed {:?}", addr);
 220                            break;
 221                        }
 222                    }
 223                }
 224            }
 225
 226            if let Err(err) = this.sign_out(connection_id).await {
 227                log::error!("error signing out connection {:?} - {:?}", addr, err);
 228            }
 229        }
 230    }
 231
 232    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 233        self.peer.disconnect(connection_id);
 234        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 235
 236        for (project_id, project) in removed_connection.hosted_projects {
 237            if let Some(share) = project.share {
 238                broadcast(
 239                    connection_id,
 240                    share.guests.keys().copied().collect(),
 241                    |conn_id| {
 242                        self.peer
 243                            .send(conn_id, proto::UnshareProject { project_id })
 244                    },
 245                )?;
 246            }
 247        }
 248
 249        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 250            broadcast(connection_id, peer_ids, |conn_id| {
 251                self.peer.send(
 252                    conn_id,
 253                    proto::RemoveProjectCollaborator {
 254                        project_id,
 255                        peer_id: connection_id.0,
 256                    },
 257                )
 258            })?;
 259        }
 260
 261        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 262        Ok(())
 263    }
 264
 265    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 266        Ok(proto::Ack {})
 267    }
 268
 269    async fn register_project(
 270        mut self: Arc<Server>,
 271        request: TypedEnvelope<proto::RegisterProject>,
 272    ) -> tide::Result<proto::RegisterProjectResponse> {
 273        let project_id = {
 274            let mut state = self.state_mut();
 275            let user_id = state.user_id_for_connection(request.sender_id)?;
 276            state.register_project(request.sender_id, user_id)
 277        };
 278        Ok(proto::RegisterProjectResponse { project_id })
 279    }
 280
 281    async fn unregister_project(
 282        mut self: Arc<Server>,
 283        request: TypedEnvelope<proto::UnregisterProject>,
 284    ) -> tide::Result<()> {
 285        let project = self
 286            .state_mut()
 287            .unregister_project(request.payload.project_id, request.sender_id)?;
 288        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 289        Ok(())
 290    }
 291
 292    async fn share_project(
 293        mut self: Arc<Server>,
 294        request: TypedEnvelope<proto::ShareProject>,
 295    ) -> tide::Result<proto::Ack> {
 296        self.state_mut()
 297            .share_project(request.payload.project_id, request.sender_id);
 298        Ok(proto::Ack {})
 299    }
 300
 301    async fn unshare_project(
 302        mut self: Arc<Server>,
 303        request: TypedEnvelope<proto::UnshareProject>,
 304    ) -> tide::Result<()> {
 305        let project_id = request.payload.project_id;
 306        let project = self
 307            .state_mut()
 308            .unshare_project(project_id, request.sender_id)?;
 309
 310        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 311            self.peer
 312                .send(conn_id, proto::UnshareProject { project_id })
 313        })?;
 314        self.update_contacts_for_users(&project.authorized_user_ids)?;
 315        Ok(())
 316    }
 317
 318    async fn join_project(
 319        mut self: Arc<Server>,
 320        request: TypedEnvelope<proto::JoinProject>,
 321    ) -> tide::Result<proto::JoinProjectResponse> {
 322        let project_id = request.payload.project_id;
 323
 324        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 325        let (response, connection_ids, contact_user_ids) = self
 326            .state_mut()
 327            .join_project(request.sender_id, user_id, project_id)
 328            .and_then(|joined| {
 329                let share = joined.project.share()?;
 330                let peer_count = share.guests.len();
 331                let mut collaborators = Vec::with_capacity(peer_count);
 332                collaborators.push(proto::Collaborator {
 333                    peer_id: joined.project.host_connection_id.0,
 334                    replica_id: 0,
 335                    user_id: joined.project.host_user_id.to_proto(),
 336                });
 337                let worktrees = joined
 338                    .project
 339                    .worktrees
 340                    .iter()
 341                    .filter_map(|(id, worktree)| {
 342                        worktree.share.as_ref().map(|share| proto::Worktree {
 343                            id: *id,
 344                            root_name: worktree.root_name.clone(),
 345                            entries: share.entries.values().cloned().collect(),
 346                            diagnostic_summaries: share
 347                                .diagnostic_summaries
 348                                .values()
 349                                .cloned()
 350                                .collect(),
 351                            weak: worktree.weak,
 352                        })
 353                    })
 354                    .collect();
 355                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 356                    if *peer_conn_id != request.sender_id {
 357                        collaborators.push(proto::Collaborator {
 358                            peer_id: peer_conn_id.0,
 359                            replica_id: *peer_replica_id as u32,
 360                            user_id: peer_user_id.to_proto(),
 361                        });
 362                    }
 363                }
 364                let response = proto::JoinProjectResponse {
 365                    worktrees,
 366                    replica_id: joined.replica_id as u32,
 367                    collaborators,
 368                };
 369                let connection_ids = joined.project.connection_ids();
 370                let contact_user_ids = joined.project.authorized_user_ids();
 371                Ok((response, connection_ids, contact_user_ids))
 372            })?;
 373
 374        broadcast(request.sender_id, connection_ids, |conn_id| {
 375            self.peer.send(
 376                conn_id,
 377                proto::AddProjectCollaborator {
 378                    project_id,
 379                    collaborator: Some(proto::Collaborator {
 380                        peer_id: request.sender_id.0,
 381                        replica_id: response.replica_id,
 382                        user_id: user_id.to_proto(),
 383                    }),
 384                },
 385            )
 386        })?;
 387        self.update_contacts_for_users(&contact_user_ids)?;
 388        Ok(response)
 389    }
 390
 391    async fn leave_project(
 392        mut self: Arc<Server>,
 393        request: TypedEnvelope<proto::LeaveProject>,
 394    ) -> tide::Result<()> {
 395        let sender_id = request.sender_id;
 396        let project_id = request.payload.project_id;
 397        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 398
 399        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 400            self.peer.send(
 401                conn_id,
 402                proto::RemoveProjectCollaborator {
 403                    project_id,
 404                    peer_id: sender_id.0,
 405                },
 406            )
 407        })?;
 408        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 409
 410        Ok(())
 411    }
 412
 413    async fn register_worktree(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::RegisterWorktree>,
 416    ) -> tide::Result<proto::Ack> {
 417        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 418
 419        let mut contact_user_ids = HashSet::default();
 420        contact_user_ids.insert(host_user_id);
 421        for github_login in &request.payload.authorized_logins {
 422            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 423            contact_user_ids.insert(contact_user_id);
 424        }
 425
 426        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 427        let guest_connection_ids;
 428        {
 429            let mut state = self.state_mut();
 430            guest_connection_ids = state
 431                .read_project(request.payload.project_id, request.sender_id)?
 432                .guest_connection_ids();
 433            state.register_worktree(
 434                request.payload.project_id,
 435                request.payload.worktree_id,
 436                request.sender_id,
 437                Worktree {
 438                    authorized_user_ids: contact_user_ids.clone(),
 439                    root_name: request.payload.root_name.clone(),
 440                    share: None,
 441                    weak: request.payload.weak,
 442                },
 443            )?;
 444        }
 445        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 446            self.peer
 447                .forward_send(request.sender_id, connection_id, request.payload.clone())
 448        })?;
 449        self.update_contacts_for_users(&contact_user_ids)?;
 450        Ok(proto::Ack {})
 451    }
 452
 453    async fn unregister_worktree(
 454        mut self: Arc<Server>,
 455        request: TypedEnvelope<proto::UnregisterWorktree>,
 456    ) -> tide::Result<()> {
 457        let project_id = request.payload.project_id;
 458        let worktree_id = request.payload.worktree_id;
 459        let (worktree, guest_connection_ids) =
 460            self.state_mut()
 461                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 462        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 463            self.peer.send(
 464                conn_id,
 465                proto::UnregisterWorktree {
 466                    project_id,
 467                    worktree_id,
 468                },
 469            )
 470        })?;
 471        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 472        Ok(())
 473    }
 474
 475    async fn update_worktree(
 476        mut self: Arc<Server>,
 477        request: TypedEnvelope<proto::UpdateWorktree>,
 478    ) -> tide::Result<proto::Ack> {
 479        let connection_ids = self.state_mut().update_worktree(
 480            request.sender_id,
 481            request.payload.project_id,
 482            request.payload.worktree_id,
 483            &request.payload.removed_entries,
 484            &request.payload.updated_entries,
 485        )?;
 486
 487        broadcast(request.sender_id, connection_ids, |connection_id| {
 488            self.peer
 489                .forward_send(request.sender_id, connection_id, request.payload.clone())
 490        })?;
 491
 492        Ok(proto::Ack {})
 493    }
 494
 495    async fn update_diagnostic_summary(
 496        mut self: Arc<Server>,
 497        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 498    ) -> tide::Result<()> {
 499        let summary = request
 500            .payload
 501            .summary
 502            .clone()
 503            .ok_or_else(|| anyhow!("invalid summary"))?;
 504        let receiver_ids = self.state_mut().update_diagnostic_summary(
 505            request.payload.project_id,
 506            request.payload.worktree_id,
 507            request.sender_id,
 508            summary,
 509        )?;
 510
 511        broadcast(request.sender_id, receiver_ids, |connection_id| {
 512            self.peer
 513                .forward_send(request.sender_id, connection_id, request.payload.clone())
 514        })?;
 515        Ok(())
 516    }
 517
 518    async fn disk_based_diagnostics_updating(
 519        self: Arc<Server>,
 520        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 521    ) -> tide::Result<()> {
 522        let receiver_ids = self
 523            .state()
 524            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 525        broadcast(request.sender_id, receiver_ids, |connection_id| {
 526            self.peer
 527                .forward_send(request.sender_id, connection_id, request.payload.clone())
 528        })?;
 529        Ok(())
 530    }
 531
 532    async fn disk_based_diagnostics_updated(
 533        self: Arc<Server>,
 534        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 535    ) -> tide::Result<()> {
 536        let receiver_ids = self
 537            .state()
 538            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 539        broadcast(request.sender_id, receiver_ids, |connection_id| {
 540            self.peer
 541                .forward_send(request.sender_id, connection_id, request.payload.clone())
 542        })?;
 543        Ok(())
 544    }
 545
 546    async fn get_definition(
 547        self: Arc<Server>,
 548        request: TypedEnvelope<proto::GetDefinition>,
 549    ) -> tide::Result<proto::GetDefinitionResponse> {
 550        let host_connection_id = self
 551            .state()
 552            .read_project(request.payload.project_id, request.sender_id)?
 553            .host_connection_id;
 554        Ok(self
 555            .peer
 556            .forward_request(request.sender_id, host_connection_id, request.payload)
 557            .await?)
 558    }
 559
 560    async fn get_references(
 561        self: Arc<Server>,
 562        request: TypedEnvelope<proto::GetReferences>,
 563    ) -> tide::Result<proto::GetReferencesResponse> {
 564        let host_connection_id = self
 565            .state()
 566            .read_project(request.payload.project_id, request.sender_id)?
 567            .host_connection_id;
 568        Ok(self
 569            .peer
 570            .forward_request(request.sender_id, host_connection_id, request.payload)
 571            .await?)
 572    }
 573
 574    async fn get_document_highlights(
 575        self: Arc<Server>,
 576        request: TypedEnvelope<proto::GetDocumentHighlights>,
 577    ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
 578        let host_connection_id = self
 579            .state()
 580            .read_project(request.payload.project_id, request.sender_id)?
 581            .host_connection_id;
 582        Ok(self
 583            .peer
 584            .forward_request(request.sender_id, host_connection_id, request.payload)
 585            .await?)
 586    }
 587
 588    async fn get_project_symbols(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::GetProjectSymbols>,
 591    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 592        let host_connection_id = self
 593            .state()
 594            .read_project(request.payload.project_id, request.sender_id)?
 595            .host_connection_id;
 596        Ok(self
 597            .peer
 598            .forward_request(request.sender_id, host_connection_id, request.payload)
 599            .await?)
 600    }
 601
 602    async fn open_buffer_for_symbol(
 603        self: Arc<Server>,
 604        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 605    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 606        let host_connection_id = self
 607            .state()
 608            .read_project(request.payload.project_id, request.sender_id)?
 609            .host_connection_id;
 610        Ok(self
 611            .peer
 612            .forward_request(request.sender_id, host_connection_id, request.payload)
 613            .await?)
 614    }
 615
 616    async fn open_buffer(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::OpenBuffer>,
 619    ) -> tide::Result<proto::OpenBufferResponse> {
 620        let host_connection_id = self
 621            .state()
 622            .read_project(request.payload.project_id, request.sender_id)?
 623            .host_connection_id;
 624        Ok(self
 625            .peer
 626            .forward_request(request.sender_id, host_connection_id, request.payload)
 627            .await?)
 628    }
 629
 630    async fn close_buffer(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::CloseBuffer>,
 633    ) -> tide::Result<()> {
 634        let host_connection_id = self
 635            .state()
 636            .read_project(request.payload.project_id, request.sender_id)?
 637            .host_connection_id;
 638        self.peer
 639            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 640        Ok(())
 641    }
 642
 643    async fn save_buffer(
 644        self: Arc<Server>,
 645        request: TypedEnvelope<proto::SaveBuffer>,
 646    ) -> tide::Result<proto::BufferSaved> {
 647        let host;
 648        let mut guests;
 649        {
 650            let state = self.state();
 651            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 652            host = project.host_connection_id;
 653            guests = project.guest_connection_ids()
 654        }
 655
 656        let response = self
 657            .peer
 658            .forward_request(request.sender_id, host, request.payload.clone())
 659            .await?;
 660
 661        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 662        broadcast(host, guests, |conn_id| {
 663            self.peer.forward_send(host, conn_id, response.clone())
 664        })?;
 665
 666        Ok(response)
 667    }
 668
 669    async fn format_buffers(
 670        self: Arc<Server>,
 671        request: TypedEnvelope<proto::FormatBuffers>,
 672    ) -> tide::Result<proto::FormatBuffersResponse> {
 673        let host = self
 674            .state()
 675            .read_project(request.payload.project_id, request.sender_id)?
 676            .host_connection_id;
 677        Ok(self
 678            .peer
 679            .forward_request(request.sender_id, host, request.payload.clone())
 680            .await?)
 681    }
 682
 683    async fn get_completions(
 684        self: Arc<Server>,
 685        request: TypedEnvelope<proto::GetCompletions>,
 686    ) -> tide::Result<proto::GetCompletionsResponse> {
 687        let host = self
 688            .state()
 689            .read_project(request.payload.project_id, request.sender_id)?
 690            .host_connection_id;
 691        Ok(self
 692            .peer
 693            .forward_request(request.sender_id, host, request.payload.clone())
 694            .await?)
 695    }
 696
 697    async fn apply_additional_edits_for_completion(
 698        self: Arc<Server>,
 699        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 700    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 701        let host = self
 702            .state()
 703            .read_project(request.payload.project_id, request.sender_id)?
 704            .host_connection_id;
 705        Ok(self
 706            .peer
 707            .forward_request(request.sender_id, host, request.payload.clone())
 708            .await?)
 709    }
 710
 711    async fn get_code_actions(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<proto::GetCodeActions>,
 714    ) -> tide::Result<proto::GetCodeActionsResponse> {
 715        let host = self
 716            .state()
 717            .read_project(request.payload.project_id, request.sender_id)?
 718            .host_connection_id;
 719        Ok(self
 720            .peer
 721            .forward_request(request.sender_id, host, request.payload.clone())
 722            .await?)
 723    }
 724
 725    async fn apply_code_action(
 726        self: Arc<Server>,
 727        request: TypedEnvelope<proto::ApplyCodeAction>,
 728    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 729        let host = self
 730            .state()
 731            .read_project(request.payload.project_id, request.sender_id)?
 732            .host_connection_id;
 733        Ok(self
 734            .peer
 735            .forward_request(request.sender_id, host, request.payload.clone())
 736            .await?)
 737    }
 738
 739    async fn prepare_rename(
 740        self: Arc<Server>,
 741        request: TypedEnvelope<proto::PrepareRename>,
 742    ) -> tide::Result<proto::PrepareRenameResponse> {
 743        let host = self
 744            .state()
 745            .read_project(request.payload.project_id, request.sender_id)?
 746            .host_connection_id;
 747        Ok(self
 748            .peer
 749            .forward_request(request.sender_id, host, request.payload.clone())
 750            .await?)
 751    }
 752
 753    async fn perform_rename(
 754        self: Arc<Server>,
 755        request: TypedEnvelope<proto::PerformRename>,
 756    ) -> tide::Result<proto::PerformRenameResponse> {
 757        let host = self
 758            .state()
 759            .read_project(request.payload.project_id, request.sender_id)?
 760            .host_connection_id;
 761        Ok(self
 762            .peer
 763            .forward_request(request.sender_id, host, request.payload.clone())
 764            .await?)
 765    }
 766
 767    async fn update_buffer(
 768        self: Arc<Server>,
 769        request: TypedEnvelope<proto::UpdateBuffer>,
 770    ) -> tide::Result<proto::Ack> {
 771        let receiver_ids = self
 772            .state()
 773            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 774        broadcast(request.sender_id, receiver_ids, |connection_id| {
 775            self.peer
 776                .forward_send(request.sender_id, connection_id, request.payload.clone())
 777        })?;
 778        Ok(proto::Ack {})
 779    }
 780
 781    async fn update_buffer_file(
 782        self: Arc<Server>,
 783        request: TypedEnvelope<proto::UpdateBufferFile>,
 784    ) -> tide::Result<()> {
 785        let receiver_ids = self
 786            .state()
 787            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 788        broadcast(request.sender_id, receiver_ids, |connection_id| {
 789            self.peer
 790                .forward_send(request.sender_id, connection_id, request.payload.clone())
 791        })?;
 792        Ok(())
 793    }
 794
 795    async fn buffer_reloaded(
 796        self: Arc<Server>,
 797        request: TypedEnvelope<proto::BufferReloaded>,
 798    ) -> tide::Result<()> {
 799        let receiver_ids = self
 800            .state()
 801            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 802        broadcast(request.sender_id, receiver_ids, |connection_id| {
 803            self.peer
 804                .forward_send(request.sender_id, connection_id, request.payload.clone())
 805        })?;
 806        Ok(())
 807    }
 808
 809    async fn buffer_saved(
 810        self: Arc<Server>,
 811        request: TypedEnvelope<proto::BufferSaved>,
 812    ) -> tide::Result<()> {
 813        let receiver_ids = self
 814            .state()
 815            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 816        broadcast(request.sender_id, receiver_ids, |connection_id| {
 817            self.peer
 818                .forward_send(request.sender_id, connection_id, request.payload.clone())
 819        })?;
 820        Ok(())
 821    }
 822
 823    async fn get_channels(
 824        self: Arc<Server>,
 825        request: TypedEnvelope<proto::GetChannels>,
 826    ) -> tide::Result<proto::GetChannelsResponse> {
 827        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 828        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 829        Ok(proto::GetChannelsResponse {
 830            channels: channels
 831                .into_iter()
 832                .map(|chan| proto::Channel {
 833                    id: chan.id.to_proto(),
 834                    name: chan.name,
 835                })
 836                .collect(),
 837        })
 838    }
 839
 840    async fn get_users(
 841        self: Arc<Server>,
 842        request: TypedEnvelope<proto::GetUsers>,
 843    ) -> tide::Result<proto::GetUsersResponse> {
 844        let user_ids = request
 845            .payload
 846            .user_ids
 847            .into_iter()
 848            .map(UserId::from_proto)
 849            .collect();
 850        let users = self
 851            .app_state
 852            .db
 853            .get_users_by_ids(user_ids)
 854            .await?
 855            .into_iter()
 856            .map(|user| proto::User {
 857                id: user.id.to_proto(),
 858                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 859                github_login: user.github_login,
 860            })
 861            .collect();
 862        Ok(proto::GetUsersResponse { users })
 863    }
 864
 865    fn update_contacts_for_users<'a>(
 866        self: &Arc<Server>,
 867        user_ids: impl IntoIterator<Item = &'a UserId>,
 868    ) -> anyhow::Result<()> {
 869        let mut result = Ok(());
 870        let state = self.state();
 871        for user_id in user_ids {
 872            let contacts = state.contacts_for_user(*user_id);
 873            for connection_id in state.connection_ids_for_user(*user_id) {
 874                if let Err(error) = self.peer.send(
 875                    connection_id,
 876                    proto::UpdateContacts {
 877                        contacts: contacts.clone(),
 878                    },
 879                ) {
 880                    result = Err(error);
 881                }
 882            }
 883        }
 884        result
 885    }
 886
 887    async fn join_channel(
 888        mut self: Arc<Self>,
 889        request: TypedEnvelope<proto::JoinChannel>,
 890    ) -> tide::Result<proto::JoinChannelResponse> {
 891        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 892        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 893        if !self
 894            .app_state
 895            .db
 896            .can_user_access_channel(user_id, channel_id)
 897            .await?
 898        {
 899            Err(anyhow!("access denied"))?;
 900        }
 901
 902        self.state_mut().join_channel(request.sender_id, channel_id);
 903        let messages = self
 904            .app_state
 905            .db
 906            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 907            .await?
 908            .into_iter()
 909            .map(|msg| proto::ChannelMessage {
 910                id: msg.id.to_proto(),
 911                body: msg.body,
 912                timestamp: msg.sent_at.unix_timestamp() as u64,
 913                sender_id: msg.sender_id.to_proto(),
 914                nonce: Some(msg.nonce.as_u128().into()),
 915            })
 916            .collect::<Vec<_>>();
 917        Ok(proto::JoinChannelResponse {
 918            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 919            messages,
 920        })
 921    }
 922
 923    async fn leave_channel(
 924        mut self: Arc<Self>,
 925        request: TypedEnvelope<proto::LeaveChannel>,
 926    ) -> tide::Result<()> {
 927        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 928        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 929        if !self
 930            .app_state
 931            .db
 932            .can_user_access_channel(user_id, channel_id)
 933            .await?
 934        {
 935            Err(anyhow!("access denied"))?;
 936        }
 937
 938        self.state_mut()
 939            .leave_channel(request.sender_id, channel_id);
 940
 941        Ok(())
 942    }
 943
 944    async fn send_channel_message(
 945        self: Arc<Self>,
 946        request: TypedEnvelope<proto::SendChannelMessage>,
 947    ) -> tide::Result<proto::SendChannelMessageResponse> {
 948        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 949        let user_id;
 950        let connection_ids;
 951        {
 952            let state = self.state();
 953            user_id = state.user_id_for_connection(request.sender_id)?;
 954            connection_ids = state.channel_connection_ids(channel_id)?;
 955        }
 956
 957        // Validate the message body.
 958        let body = request.payload.body.trim().to_string();
 959        if body.len() > MAX_MESSAGE_LEN {
 960            return Err(anyhow!("message is too long"))?;
 961        }
 962        if body.is_empty() {
 963            return Err(anyhow!("message can't be blank"))?;
 964        }
 965
 966        let timestamp = OffsetDateTime::now_utc();
 967        let nonce = request
 968            .payload
 969            .nonce
 970            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 971
 972        let message_id = self
 973            .app_state
 974            .db
 975            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 976            .await?
 977            .to_proto();
 978        let message = proto::ChannelMessage {
 979            sender_id: user_id.to_proto(),
 980            id: message_id,
 981            body,
 982            timestamp: timestamp.unix_timestamp() as u64,
 983            nonce: Some(nonce),
 984        };
 985        broadcast(request.sender_id, connection_ids, |conn_id| {
 986            self.peer.send(
 987                conn_id,
 988                proto::ChannelMessageSent {
 989                    channel_id: channel_id.to_proto(),
 990                    message: Some(message.clone()),
 991                },
 992            )
 993        })?;
 994        Ok(proto::SendChannelMessageResponse {
 995            message: Some(message),
 996        })
 997    }
 998
 999    async fn get_channel_messages(
1000        self: Arc<Self>,
1001        request: TypedEnvelope<proto::GetChannelMessages>,
1002    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1003        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1004        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1005        if !self
1006            .app_state
1007            .db
1008            .can_user_access_channel(user_id, channel_id)
1009            .await?
1010        {
1011            Err(anyhow!("access denied"))?;
1012        }
1013
1014        let messages = self
1015            .app_state
1016            .db
1017            .get_channel_messages(
1018                channel_id,
1019                MESSAGE_COUNT_PER_PAGE,
1020                Some(MessageId::from_proto(request.payload.before_message_id)),
1021            )
1022            .await?
1023            .into_iter()
1024            .map(|msg| proto::ChannelMessage {
1025                id: msg.id.to_proto(),
1026                body: msg.body,
1027                timestamp: msg.sent_at.unix_timestamp() as u64,
1028                sender_id: msg.sender_id.to_proto(),
1029                nonce: Some(msg.nonce.as_u128().into()),
1030            })
1031            .collect::<Vec<_>>();
1032
1033        Ok(proto::GetChannelMessagesResponse {
1034            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1035            messages,
1036        })
1037    }
1038
1039    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1040        self.store.read()
1041    }
1042
1043    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1044        self.store.write()
1045    }
1046}
1047
1048impl Executor for RealExecutor {
1049    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1050        task::spawn(future);
1051    }
1052}
1053
1054fn broadcast<F>(
1055    sender_id: ConnectionId,
1056    receiver_ids: Vec<ConnectionId>,
1057    mut f: F,
1058) -> anyhow::Result<()>
1059where
1060    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1061{
1062    let mut result = Ok(());
1063    for receiver_id in receiver_ids {
1064        if receiver_id != sender_id {
1065            if let Err(error) = f(receiver_id) {
1066                if result.is_ok() {
1067                    result = Err(error);
1068                }
1069            }
1070        }
1071    }
1072    result
1073}
1074
1075pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1076    let server = Server::new(app.state().clone(), rpc.clone(), None);
1077    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1078        let server = server.clone();
1079        async move {
1080            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1081
1082            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1083            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1084            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1085            let client_protocol_version: Option<u32> = request
1086                .header("X-Zed-Protocol-Version")
1087                .and_then(|v| v.as_str().parse().ok());
1088
1089            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1090                return Ok(Response::new(StatusCode::UpgradeRequired));
1091            }
1092
1093            let header = match request.header("Sec-Websocket-Key") {
1094                Some(h) => h.as_str(),
1095                None => return Err(anyhow!("expected sec-websocket-key"))?,
1096            };
1097
1098            let user_id = process_auth_header(&request).await?;
1099
1100            let mut response = Response::new(StatusCode::SwitchingProtocols);
1101            response.insert_header(UPGRADE, "websocket");
1102            response.insert_header(CONNECTION, "Upgrade");
1103            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1104            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1105            response.insert_header("Sec-Websocket-Version", "13");
1106
1107            let http_res: &mut tide::http::Response = response.as_mut();
1108            let upgrade_receiver = http_res.recv_upgrade().await;
1109            let addr = request.remote().unwrap_or("unknown").to_string();
1110            task::spawn(async move {
1111                if let Some(stream) = upgrade_receiver.await {
1112                    server
1113                        .handle_connection(
1114                            Connection::new(
1115                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1116                            ),
1117                            addr,
1118                            user_id,
1119                            None,
1120                            RealExecutor,
1121                        )
1122                        .await;
1123                }
1124            });
1125
1126            Ok(response)
1127        }
1128    });
1129}
1130
1131fn header_contains_ignore_case<T>(
1132    request: &tide::Request<T>,
1133    header_name: HeaderName,
1134    value: &str,
1135) -> bool {
1136    request
1137        .header(header_name)
1138        .map(|h| {
1139            h.as_str()
1140                .split(',')
1141                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1142        })
1143        .unwrap_or(false)
1144}
1145
1146#[cfg(test)]
1147mod tests {
1148    use super::*;
1149    use crate::{
1150        auth,
1151        db::{tests::TestDb, UserId},
1152        github, AppState, Config,
1153    };
1154    use ::rpc::Peer;
1155    use collections::BTreeMap;
1156    use gpui::{executor, ModelHandle, TestAppContext};
1157    use parking_lot::Mutex;
1158    use postage::{sink::Sink, watch};
1159    use rand::prelude::*;
1160    use rpc::PeerId;
1161    use serde_json::json;
1162    use sqlx::types::time::OffsetDateTime;
1163    use std::{
1164        cell::Cell,
1165        env,
1166        ops::Deref,
1167        path::Path,
1168        rc::Rc,
1169        sync::{
1170            atomic::{AtomicBool, Ordering::SeqCst},
1171            Arc,
1172        },
1173        time::Duration,
1174    };
1175    use zed::{
1176        client::{
1177            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1178            EstablishConnectionError, UserStore,
1179        },
1180        editor::{
1181            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, EditorSettings,
1182            Input, MultiBuffer, Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1183        },
1184        fs::{FakeFs, Fs as _},
1185        language::{
1186            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1187            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1188        },
1189        lsp,
1190        project::{DiagnosticSummary, Project, ProjectPath},
1191        workspace::{Workspace, WorkspaceParams},
1192    };
1193
1194    #[cfg(test)]
1195    #[ctor::ctor]
1196    fn init_logger() {
1197        if std::env::var("RUST_LOG").is_ok() {
1198            env_logger::init();
1199        }
1200    }
1201
1202    #[gpui::test(iterations = 10)]
1203    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1204        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1205        let lang_registry = Arc::new(LanguageRegistry::new());
1206        let fs = FakeFs::new(cx_a.background());
1207        cx_a.foreground().forbid_parking();
1208
1209        // Connect to a server as 2 clients.
1210        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1211        let client_a = server.create_client(&mut cx_a, "user_a").await;
1212        let client_b = server.create_client(&mut cx_b, "user_b").await;
1213
1214        // Share a project as client A
1215        fs.insert_tree(
1216            "/a",
1217            json!({
1218                ".zed.toml": r#"collaborators = ["user_b"]"#,
1219                "a.txt": "a-contents",
1220                "b.txt": "b-contents",
1221            }),
1222        )
1223        .await;
1224        let project_a = cx_a.update(|cx| {
1225            Project::local(
1226                client_a.clone(),
1227                client_a.user_store.clone(),
1228                lang_registry.clone(),
1229                fs.clone(),
1230                cx,
1231            )
1232        });
1233        let (worktree_a, _) = project_a
1234            .update(&mut cx_a, |p, cx| {
1235                p.find_or_create_local_worktree("/a", false, cx)
1236            })
1237            .await
1238            .unwrap();
1239        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1240        worktree_a
1241            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1242            .await;
1243        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1244        project_a
1245            .update(&mut cx_a, |p, cx| p.share(cx))
1246            .await
1247            .unwrap();
1248
1249        // Join that project as client B
1250        let project_b = Project::remote(
1251            project_id,
1252            client_b.clone(),
1253            client_b.user_store.clone(),
1254            lang_registry.clone(),
1255            fs.clone(),
1256            &mut cx_b.to_async(),
1257        )
1258        .await
1259        .unwrap();
1260
1261        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1262            assert_eq!(
1263                project
1264                    .collaborators()
1265                    .get(&client_a.peer_id)
1266                    .unwrap()
1267                    .user
1268                    .github_login,
1269                "user_a"
1270            );
1271            project.replica_id()
1272        });
1273        project_a
1274            .condition(&cx_a, |tree, _| {
1275                tree.collaborators()
1276                    .get(&client_b.peer_id)
1277                    .map_or(false, |collaborator| {
1278                        collaborator.replica_id == replica_id_b
1279                            && collaborator.user.github_login == "user_b"
1280                    })
1281            })
1282            .await;
1283
1284        // Open the same file as client B and client A.
1285        let buffer_b = project_b
1286            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1287            .await
1288            .unwrap();
1289        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1290        buffer_b.read_with(&cx_b, |buf, cx| {
1291            assert_eq!(buf.read(cx).text(), "b-contents")
1292        });
1293        project_a.read_with(&cx_a, |project, cx| {
1294            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1295        });
1296        let buffer_a = project_a
1297            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1298            .await
1299            .unwrap();
1300
1301        let editor_b = cx_b.add_view(window_b, |cx| {
1302            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1303        });
1304
1305        // TODO
1306        // // Create a selection set as client B and see that selection set as client A.
1307        // buffer_a
1308        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1309        //     .await;
1310
1311        // Edit the buffer as client B and see that edit as client A.
1312        editor_b.update(&mut cx_b, |editor, cx| {
1313            editor.handle_input(&Input("ok, ".into()), cx)
1314        });
1315        buffer_a
1316            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1317            .await;
1318
1319        // TODO
1320        // // Remove the selection set as client B, see those selections disappear as client A.
1321        cx_b.update(move |_| drop(editor_b));
1322        // buffer_a
1323        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1324        //     .await;
1325
1326        // Close the buffer as client A, see that the buffer is closed.
1327        cx_a.update(move |_| drop(buffer_a));
1328        project_a
1329            .condition(&cx_a, |project, cx| {
1330                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1331            })
1332            .await;
1333
1334        // Dropping the client B's project removes client B from client A's collaborators.
1335        cx_b.update(move |_| drop(project_b));
1336        project_a
1337            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1338            .await;
1339    }
1340
1341    #[gpui::test(iterations = 10)]
1342    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1343        let lang_registry = Arc::new(LanguageRegistry::new());
1344        let fs = FakeFs::new(cx_a.background());
1345        cx_a.foreground().forbid_parking();
1346
1347        // Connect to a server as 2 clients.
1348        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1349        let client_a = server.create_client(&mut cx_a, "user_a").await;
1350        let client_b = server.create_client(&mut cx_b, "user_b").await;
1351
1352        // Share a project as client A
1353        fs.insert_tree(
1354            "/a",
1355            json!({
1356                ".zed.toml": r#"collaborators = ["user_b"]"#,
1357                "a.txt": "a-contents",
1358                "b.txt": "b-contents",
1359            }),
1360        )
1361        .await;
1362        let project_a = cx_a.update(|cx| {
1363            Project::local(
1364                client_a.clone(),
1365                client_a.user_store.clone(),
1366                lang_registry.clone(),
1367                fs.clone(),
1368                cx,
1369            )
1370        });
1371        let (worktree_a, _) = project_a
1372            .update(&mut cx_a, |p, cx| {
1373                p.find_or_create_local_worktree("/a", false, cx)
1374            })
1375            .await
1376            .unwrap();
1377        worktree_a
1378            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1379            .await;
1380        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1381        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1382        project_a
1383            .update(&mut cx_a, |p, cx| p.share(cx))
1384            .await
1385            .unwrap();
1386        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1387
1388        // Join that project as client B
1389        let project_b = Project::remote(
1390            project_id,
1391            client_b.clone(),
1392            client_b.user_store.clone(),
1393            lang_registry.clone(),
1394            fs.clone(),
1395            &mut cx_b.to_async(),
1396        )
1397        .await
1398        .unwrap();
1399        project_b
1400            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1401            .await
1402            .unwrap();
1403
1404        // Unshare the project as client A
1405        project_a
1406            .update(&mut cx_a, |project, cx| project.unshare(cx))
1407            .await
1408            .unwrap();
1409        project_b
1410            .condition(&mut cx_b, |project, _| project.is_read_only())
1411            .await;
1412        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1413        drop(project_b);
1414
1415        // Share the project again and ensure guests can still join.
1416        project_a
1417            .update(&mut cx_a, |project, cx| project.share(cx))
1418            .await
1419            .unwrap();
1420        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1421
1422        let project_c = Project::remote(
1423            project_id,
1424            client_b.clone(),
1425            client_b.user_store.clone(),
1426            lang_registry.clone(),
1427            fs.clone(),
1428            &mut cx_b.to_async(),
1429        )
1430        .await
1431        .unwrap();
1432        project_c
1433            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1434            .await
1435            .unwrap();
1436    }
1437
1438    #[gpui::test(iterations = 10)]
1439    async fn test_propagate_saves_and_fs_changes(
1440        mut cx_a: TestAppContext,
1441        mut cx_b: TestAppContext,
1442        mut cx_c: TestAppContext,
1443    ) {
1444        let lang_registry = Arc::new(LanguageRegistry::new());
1445        let fs = FakeFs::new(cx_a.background());
1446        cx_a.foreground().forbid_parking();
1447
1448        // Connect to a server as 3 clients.
1449        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1450        let client_a = server.create_client(&mut cx_a, "user_a").await;
1451        let client_b = server.create_client(&mut cx_b, "user_b").await;
1452        let client_c = server.create_client(&mut cx_c, "user_c").await;
1453
1454        // Share a worktree as client A.
1455        fs.insert_tree(
1456            "/a",
1457            json!({
1458                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1459                "file1": "",
1460                "file2": ""
1461            }),
1462        )
1463        .await;
1464        let project_a = cx_a.update(|cx| {
1465            Project::local(
1466                client_a.clone(),
1467                client_a.user_store.clone(),
1468                lang_registry.clone(),
1469                fs.clone(),
1470                cx,
1471            )
1472        });
1473        let (worktree_a, _) = project_a
1474            .update(&mut cx_a, |p, cx| {
1475                p.find_or_create_local_worktree("/a", false, cx)
1476            })
1477            .await
1478            .unwrap();
1479        worktree_a
1480            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1481            .await;
1482        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1483        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1484        project_a
1485            .update(&mut cx_a, |p, cx| p.share(cx))
1486            .await
1487            .unwrap();
1488
1489        // Join that worktree as clients B and C.
1490        let project_b = Project::remote(
1491            project_id,
1492            client_b.clone(),
1493            client_b.user_store.clone(),
1494            lang_registry.clone(),
1495            fs.clone(),
1496            &mut cx_b.to_async(),
1497        )
1498        .await
1499        .unwrap();
1500        let project_c = Project::remote(
1501            project_id,
1502            client_c.clone(),
1503            client_c.user_store.clone(),
1504            lang_registry.clone(),
1505            fs.clone(),
1506            &mut cx_c.to_async(),
1507        )
1508        .await
1509        .unwrap();
1510        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1511        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1512
1513        // Open and edit a buffer as both guests B and C.
1514        let buffer_b = project_b
1515            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1516            .await
1517            .unwrap();
1518        let buffer_c = project_c
1519            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1520            .await
1521            .unwrap();
1522        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1523        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1524
1525        // Open and edit that buffer as the host.
1526        let buffer_a = project_a
1527            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1528            .await
1529            .unwrap();
1530
1531        buffer_a
1532            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1533            .await;
1534        buffer_a.update(&mut cx_a, |buf, cx| {
1535            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1536        });
1537
1538        // Wait for edits to propagate
1539        buffer_a
1540            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1541            .await;
1542        buffer_b
1543            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1544            .await;
1545        buffer_c
1546            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1547            .await;
1548
1549        // Edit the buffer as the host and concurrently save as guest B.
1550        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1551        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1552        save_b.await.unwrap();
1553        assert_eq!(
1554            fs.load("/a/file1".as_ref()).await.unwrap(),
1555            "hi-a, i-am-c, i-am-b, i-am-a"
1556        );
1557        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1558        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1559        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1560
1561        // Make changes on host's file system, see those changes on guest worktrees.
1562        fs.rename(
1563            "/a/file1".as_ref(),
1564            "/a/file1-renamed".as_ref(),
1565            Default::default(),
1566        )
1567        .await
1568        .unwrap();
1569
1570        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1571            .await
1572            .unwrap();
1573        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1574
1575        worktree_a
1576            .condition(&cx_a, |tree, _| {
1577                tree.paths()
1578                    .map(|p| p.to_string_lossy())
1579                    .collect::<Vec<_>>()
1580                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1581            })
1582            .await;
1583        worktree_b
1584            .condition(&cx_b, |tree, _| {
1585                tree.paths()
1586                    .map(|p| p.to_string_lossy())
1587                    .collect::<Vec<_>>()
1588                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1589            })
1590            .await;
1591        worktree_c
1592            .condition(&cx_c, |tree, _| {
1593                tree.paths()
1594                    .map(|p| p.to_string_lossy())
1595                    .collect::<Vec<_>>()
1596                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1597            })
1598            .await;
1599
1600        // Ensure buffer files are updated as well.
1601        buffer_a
1602            .condition(&cx_a, |buf, _| {
1603                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1604            })
1605            .await;
1606        buffer_b
1607            .condition(&cx_b, |buf, _| {
1608                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1609            })
1610            .await;
1611        buffer_c
1612            .condition(&cx_c, |buf, _| {
1613                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1614            })
1615            .await;
1616    }
1617
1618    #[gpui::test(iterations = 10)]
1619    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1620        cx_a.foreground().forbid_parking();
1621        let lang_registry = Arc::new(LanguageRegistry::new());
1622        let fs = FakeFs::new(cx_a.background());
1623
1624        // Connect to a server as 2 clients.
1625        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1626        let client_a = server.create_client(&mut cx_a, "user_a").await;
1627        let client_b = server.create_client(&mut cx_b, "user_b").await;
1628
1629        // Share a project as client A
1630        fs.insert_tree(
1631            "/dir",
1632            json!({
1633                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1634                "a.txt": "a-contents",
1635            }),
1636        )
1637        .await;
1638
1639        let project_a = cx_a.update(|cx| {
1640            Project::local(
1641                client_a.clone(),
1642                client_a.user_store.clone(),
1643                lang_registry.clone(),
1644                fs.clone(),
1645                cx,
1646            )
1647        });
1648        let (worktree_a, _) = project_a
1649            .update(&mut cx_a, |p, cx| {
1650                p.find_or_create_local_worktree("/dir", false, cx)
1651            })
1652            .await
1653            .unwrap();
1654        worktree_a
1655            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1656            .await;
1657        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1658        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1659        project_a
1660            .update(&mut cx_a, |p, cx| p.share(cx))
1661            .await
1662            .unwrap();
1663
1664        // Join that project as client B
1665        let project_b = Project::remote(
1666            project_id,
1667            client_b.clone(),
1668            client_b.user_store.clone(),
1669            lang_registry.clone(),
1670            fs.clone(),
1671            &mut cx_b.to_async(),
1672        )
1673        .await
1674        .unwrap();
1675
1676        // Open a buffer as client B
1677        let buffer_b = project_b
1678            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1679            .await
1680            .unwrap();
1681
1682        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1683        buffer_b.read_with(&cx_b, |buf, _| {
1684            assert!(buf.is_dirty());
1685            assert!(!buf.has_conflict());
1686        });
1687
1688        buffer_b
1689            .update(&mut cx_b, |buf, cx| buf.save(cx))
1690            .await
1691            .unwrap();
1692        buffer_b
1693            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1694            .await;
1695        buffer_b.read_with(&cx_b, |buf, _| {
1696            assert!(!buf.has_conflict());
1697        });
1698
1699        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1700        buffer_b.read_with(&cx_b, |buf, _| {
1701            assert!(buf.is_dirty());
1702            assert!(!buf.has_conflict());
1703        });
1704    }
1705
1706    #[gpui::test(iterations = 10)]
1707    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1708        cx_a.foreground().forbid_parking();
1709        let lang_registry = Arc::new(LanguageRegistry::new());
1710        let fs = FakeFs::new(cx_a.background());
1711
1712        // Connect to a server as 2 clients.
1713        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1714        let client_a = server.create_client(&mut cx_a, "user_a").await;
1715        let client_b = server.create_client(&mut cx_b, "user_b").await;
1716
1717        // Share a project as client A
1718        fs.insert_tree(
1719            "/dir",
1720            json!({
1721                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1722                "a.txt": "a-contents",
1723            }),
1724        )
1725        .await;
1726
1727        let project_a = cx_a.update(|cx| {
1728            Project::local(
1729                client_a.clone(),
1730                client_a.user_store.clone(),
1731                lang_registry.clone(),
1732                fs.clone(),
1733                cx,
1734            )
1735        });
1736        let (worktree_a, _) = project_a
1737            .update(&mut cx_a, |p, cx| {
1738                p.find_or_create_local_worktree("/dir", false, cx)
1739            })
1740            .await
1741            .unwrap();
1742        worktree_a
1743            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1744            .await;
1745        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1746        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1747        project_a
1748            .update(&mut cx_a, |p, cx| p.share(cx))
1749            .await
1750            .unwrap();
1751
1752        // Join that project as client B
1753        let project_b = Project::remote(
1754            project_id,
1755            client_b.clone(),
1756            client_b.user_store.clone(),
1757            lang_registry.clone(),
1758            fs.clone(),
1759            &mut cx_b.to_async(),
1760        )
1761        .await
1762        .unwrap();
1763        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1764
1765        // Open a buffer as client B
1766        let buffer_b = project_b
1767            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1768            .await
1769            .unwrap();
1770        buffer_b.read_with(&cx_b, |buf, _| {
1771            assert!(!buf.is_dirty());
1772            assert!(!buf.has_conflict());
1773        });
1774
1775        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1776            .await
1777            .unwrap();
1778        buffer_b
1779            .condition(&cx_b, |buf, _| {
1780                buf.text() == "new contents" && !buf.is_dirty()
1781            })
1782            .await;
1783        buffer_b.read_with(&cx_b, |buf, _| {
1784            assert!(!buf.has_conflict());
1785        });
1786    }
1787
1788    #[gpui::test(iterations = 10)]
1789    async fn test_editing_while_guest_opens_buffer(
1790        mut cx_a: TestAppContext,
1791        mut cx_b: TestAppContext,
1792    ) {
1793        cx_a.foreground().forbid_parking();
1794        let lang_registry = Arc::new(LanguageRegistry::new());
1795        let fs = FakeFs::new(cx_a.background());
1796
1797        // Connect to a server as 2 clients.
1798        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1799        let client_a = server.create_client(&mut cx_a, "user_a").await;
1800        let client_b = server.create_client(&mut cx_b, "user_b").await;
1801
1802        // Share a project as client A
1803        fs.insert_tree(
1804            "/dir",
1805            json!({
1806                ".zed.toml": r#"collaborators = ["user_b"]"#,
1807                "a.txt": "a-contents",
1808            }),
1809        )
1810        .await;
1811        let project_a = cx_a.update(|cx| {
1812            Project::local(
1813                client_a.clone(),
1814                client_a.user_store.clone(),
1815                lang_registry.clone(),
1816                fs.clone(),
1817                cx,
1818            )
1819        });
1820        let (worktree_a, _) = project_a
1821            .update(&mut cx_a, |p, cx| {
1822                p.find_or_create_local_worktree("/dir", false, cx)
1823            })
1824            .await
1825            .unwrap();
1826        worktree_a
1827            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1828            .await;
1829        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1830        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1831        project_a
1832            .update(&mut cx_a, |p, cx| p.share(cx))
1833            .await
1834            .unwrap();
1835
1836        // Join that project as client B
1837        let project_b = Project::remote(
1838            project_id,
1839            client_b.clone(),
1840            client_b.user_store.clone(),
1841            lang_registry.clone(),
1842            fs.clone(),
1843            &mut cx_b.to_async(),
1844        )
1845        .await
1846        .unwrap();
1847
1848        // Open a buffer as client A
1849        let buffer_a = project_a
1850            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1851            .await
1852            .unwrap();
1853
1854        // Start opening the same buffer as client B
1855        let buffer_b = cx_b
1856            .background()
1857            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1858
1859        // Edit the buffer as client A while client B is still opening it.
1860        cx_b.background().simulate_random_delay().await;
1861        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1862        cx_b.background().simulate_random_delay().await;
1863        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1864
1865        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1866        let buffer_b = buffer_b.await.unwrap();
1867        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1868    }
1869
1870    #[gpui::test(iterations = 10)]
1871    async fn test_leaving_worktree_while_opening_buffer(
1872        mut cx_a: TestAppContext,
1873        mut cx_b: TestAppContext,
1874    ) {
1875        cx_a.foreground().forbid_parking();
1876        let lang_registry = Arc::new(LanguageRegistry::new());
1877        let fs = FakeFs::new(cx_a.background());
1878
1879        // Connect to a server as 2 clients.
1880        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1881        let client_a = server.create_client(&mut cx_a, "user_a").await;
1882        let client_b = server.create_client(&mut cx_b, "user_b").await;
1883
1884        // Share a project as client A
1885        fs.insert_tree(
1886            "/dir",
1887            json!({
1888                ".zed.toml": r#"collaborators = ["user_b"]"#,
1889                "a.txt": "a-contents",
1890            }),
1891        )
1892        .await;
1893        let project_a = cx_a.update(|cx| {
1894            Project::local(
1895                client_a.clone(),
1896                client_a.user_store.clone(),
1897                lang_registry.clone(),
1898                fs.clone(),
1899                cx,
1900            )
1901        });
1902        let (worktree_a, _) = project_a
1903            .update(&mut cx_a, |p, cx| {
1904                p.find_or_create_local_worktree("/dir", false, cx)
1905            })
1906            .await
1907            .unwrap();
1908        worktree_a
1909            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1910            .await;
1911        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1912        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1913        project_a
1914            .update(&mut cx_a, |p, cx| p.share(cx))
1915            .await
1916            .unwrap();
1917
1918        // Join that project as client B
1919        let project_b = Project::remote(
1920            project_id,
1921            client_b.clone(),
1922            client_b.user_store.clone(),
1923            lang_registry.clone(),
1924            fs.clone(),
1925            &mut cx_b.to_async(),
1926        )
1927        .await
1928        .unwrap();
1929
1930        // See that a guest has joined as client A.
1931        project_a
1932            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1933            .await;
1934
1935        // Begin opening a buffer as client B, but leave the project before the open completes.
1936        let buffer_b = cx_b
1937            .background()
1938            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1939        cx_b.update(|_| drop(project_b));
1940        drop(buffer_b);
1941
1942        // See that the guest has left.
1943        project_a
1944            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1945            .await;
1946    }
1947
1948    #[gpui::test(iterations = 10)]
1949    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1950        cx_a.foreground().forbid_parking();
1951        let lang_registry = Arc::new(LanguageRegistry::new());
1952        let fs = FakeFs::new(cx_a.background());
1953
1954        // Connect to a server as 2 clients.
1955        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1956        let client_a = server.create_client(&mut cx_a, "user_a").await;
1957        let client_b = server.create_client(&mut cx_b, "user_b").await;
1958
1959        // Share a project as client A
1960        fs.insert_tree(
1961            "/a",
1962            json!({
1963                ".zed.toml": r#"collaborators = ["user_b"]"#,
1964                "a.txt": "a-contents",
1965                "b.txt": "b-contents",
1966            }),
1967        )
1968        .await;
1969        let project_a = cx_a.update(|cx| {
1970            Project::local(
1971                client_a.clone(),
1972                client_a.user_store.clone(),
1973                lang_registry.clone(),
1974                fs.clone(),
1975                cx,
1976            )
1977        });
1978        let (worktree_a, _) = project_a
1979            .update(&mut cx_a, |p, cx| {
1980                p.find_or_create_local_worktree("/a", false, cx)
1981            })
1982            .await
1983            .unwrap();
1984        worktree_a
1985            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1986            .await;
1987        let project_id = project_a
1988            .update(&mut cx_a, |project, _| project.next_remote_id())
1989            .await;
1990        project_a
1991            .update(&mut cx_a, |project, cx| project.share(cx))
1992            .await
1993            .unwrap();
1994
1995        // Join that project as client B
1996        let _project_b = Project::remote(
1997            project_id,
1998            client_b.clone(),
1999            client_b.user_store.clone(),
2000            lang_registry.clone(),
2001            fs.clone(),
2002            &mut cx_b.to_async(),
2003        )
2004        .await
2005        .unwrap();
2006
2007        // See that a guest has joined as client A.
2008        project_a
2009            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2010            .await;
2011
2012        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2013        client_b.disconnect(&cx_b.to_async()).unwrap();
2014        project_a
2015            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2016            .await;
2017    }
2018
2019    #[gpui::test(iterations = 10)]
2020    async fn test_collaborating_with_diagnostics(
2021        mut cx_a: TestAppContext,
2022        mut cx_b: TestAppContext,
2023    ) {
2024        cx_a.foreground().forbid_parking();
2025        let mut lang_registry = Arc::new(LanguageRegistry::new());
2026        let fs = FakeFs::new(cx_a.background());
2027
2028        // Set up a fake language server.
2029        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2030        Arc::get_mut(&mut lang_registry)
2031            .unwrap()
2032            .add(Arc::new(Language::new(
2033                LanguageConfig {
2034                    name: "Rust".into(),
2035                    path_suffixes: vec!["rs".to_string()],
2036                    language_server: Some(language_server_config),
2037                    ..Default::default()
2038                },
2039                Some(tree_sitter_rust::language()),
2040            )));
2041
2042        // Connect to a server as 2 clients.
2043        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2044        let client_a = server.create_client(&mut cx_a, "user_a").await;
2045        let client_b = server.create_client(&mut cx_b, "user_b").await;
2046
2047        // Share a project as client A
2048        fs.insert_tree(
2049            "/a",
2050            json!({
2051                ".zed.toml": r#"collaborators = ["user_b"]"#,
2052                "a.rs": "let one = two",
2053                "other.rs": "",
2054            }),
2055        )
2056        .await;
2057        let project_a = cx_a.update(|cx| {
2058            Project::local(
2059                client_a.clone(),
2060                client_a.user_store.clone(),
2061                lang_registry.clone(),
2062                fs.clone(),
2063                cx,
2064            )
2065        });
2066        let (worktree_a, _) = project_a
2067            .update(&mut cx_a, |p, cx| {
2068                p.find_or_create_local_worktree("/a", false, cx)
2069            })
2070            .await
2071            .unwrap();
2072        worktree_a
2073            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2074            .await;
2075        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2076        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2077        project_a
2078            .update(&mut cx_a, |p, cx| p.share(cx))
2079            .await
2080            .unwrap();
2081
2082        // Cause the language server to start.
2083        let _ = cx_a
2084            .background()
2085            .spawn(project_a.update(&mut cx_a, |project, cx| {
2086                project.open_buffer(
2087                    ProjectPath {
2088                        worktree_id,
2089                        path: Path::new("other.rs").into(),
2090                    },
2091                    cx,
2092                )
2093            }))
2094            .await
2095            .unwrap();
2096
2097        // Simulate a language server reporting errors for a file.
2098        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2099        fake_language_server
2100            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2101                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2102                version: None,
2103                diagnostics: vec![lsp::Diagnostic {
2104                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2105                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2106                    message: "message 1".to_string(),
2107                    ..Default::default()
2108                }],
2109            })
2110            .await;
2111
2112        // Wait for server to see the diagnostics update.
2113        server
2114            .condition(|store| {
2115                let worktree = store
2116                    .project(project_id)
2117                    .unwrap()
2118                    .worktrees
2119                    .get(&worktree_id.to_proto())
2120                    .unwrap();
2121
2122                !worktree
2123                    .share
2124                    .as_ref()
2125                    .unwrap()
2126                    .diagnostic_summaries
2127                    .is_empty()
2128            })
2129            .await;
2130
2131        // Join the worktree as client B.
2132        let project_b = Project::remote(
2133            project_id,
2134            client_b.clone(),
2135            client_b.user_store.clone(),
2136            lang_registry.clone(),
2137            fs.clone(),
2138            &mut cx_b.to_async(),
2139        )
2140        .await
2141        .unwrap();
2142
2143        project_b.read_with(&cx_b, |project, cx| {
2144            assert_eq!(
2145                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2146                &[(
2147                    ProjectPath {
2148                        worktree_id,
2149                        path: Arc::from(Path::new("a.rs")),
2150                    },
2151                    DiagnosticSummary {
2152                        error_count: 1,
2153                        warning_count: 0,
2154                        ..Default::default()
2155                    },
2156                )]
2157            )
2158        });
2159
2160        // Simulate a language server reporting more errors for a file.
2161        fake_language_server
2162            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2163                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2164                version: None,
2165                diagnostics: vec![
2166                    lsp::Diagnostic {
2167                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2168                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2169                        message: "message 1".to_string(),
2170                        ..Default::default()
2171                    },
2172                    lsp::Diagnostic {
2173                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2174                        range: lsp::Range::new(
2175                            lsp::Position::new(0, 10),
2176                            lsp::Position::new(0, 13),
2177                        ),
2178                        message: "message 2".to_string(),
2179                        ..Default::default()
2180                    },
2181                ],
2182            })
2183            .await;
2184
2185        // Client b gets the updated summaries
2186        project_b
2187            .condition(&cx_b, |project, cx| {
2188                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2189                    == &[(
2190                        ProjectPath {
2191                            worktree_id,
2192                            path: Arc::from(Path::new("a.rs")),
2193                        },
2194                        DiagnosticSummary {
2195                            error_count: 1,
2196                            warning_count: 1,
2197                            ..Default::default()
2198                        },
2199                    )]
2200            })
2201            .await;
2202
2203        // Open the file with the errors on client B. They should be present.
2204        let buffer_b = cx_b
2205            .background()
2206            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2207            .await
2208            .unwrap();
2209
2210        buffer_b.read_with(&cx_b, |buffer, _| {
2211            assert_eq!(
2212                buffer
2213                    .snapshot()
2214                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2215                    .map(|entry| entry)
2216                    .collect::<Vec<_>>(),
2217                &[
2218                    DiagnosticEntry {
2219                        range: Point::new(0, 4)..Point::new(0, 7),
2220                        diagnostic: Diagnostic {
2221                            group_id: 0,
2222                            message: "message 1".to_string(),
2223                            severity: lsp::DiagnosticSeverity::ERROR,
2224                            is_primary: true,
2225                            ..Default::default()
2226                        }
2227                    },
2228                    DiagnosticEntry {
2229                        range: Point::new(0, 10)..Point::new(0, 13),
2230                        diagnostic: Diagnostic {
2231                            group_id: 1,
2232                            severity: lsp::DiagnosticSeverity::WARNING,
2233                            message: "message 2".to_string(),
2234                            is_primary: true,
2235                            ..Default::default()
2236                        }
2237                    }
2238                ]
2239            );
2240        });
2241    }
2242
2243    #[gpui::test(iterations = 10)]
2244    async fn test_collaborating_with_completion(
2245        mut cx_a: TestAppContext,
2246        mut cx_b: TestAppContext,
2247    ) {
2248        cx_a.foreground().forbid_parking();
2249        let mut lang_registry = Arc::new(LanguageRegistry::new());
2250        let fs = FakeFs::new(cx_a.background());
2251
2252        // Set up a fake language server.
2253        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2254        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2255            completion_provider: Some(lsp::CompletionOptions {
2256                trigger_characters: Some(vec![".".to_string()]),
2257                ..Default::default()
2258            }),
2259            ..Default::default()
2260        });
2261        Arc::get_mut(&mut lang_registry)
2262            .unwrap()
2263            .add(Arc::new(Language::new(
2264                LanguageConfig {
2265                    name: "Rust".into(),
2266                    path_suffixes: vec!["rs".to_string()],
2267                    language_server: Some(language_server_config),
2268                    ..Default::default()
2269                },
2270                Some(tree_sitter_rust::language()),
2271            )));
2272
2273        // Connect to a server as 2 clients.
2274        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2275        let client_a = server.create_client(&mut cx_a, "user_a").await;
2276        let client_b = server.create_client(&mut cx_b, "user_b").await;
2277
2278        // Share a project as client A
2279        fs.insert_tree(
2280            "/a",
2281            json!({
2282                ".zed.toml": r#"collaborators = ["user_b"]"#,
2283                "main.rs": "fn main() { a }",
2284                "other.rs": "",
2285            }),
2286        )
2287        .await;
2288        let project_a = cx_a.update(|cx| {
2289            Project::local(
2290                client_a.clone(),
2291                client_a.user_store.clone(),
2292                lang_registry.clone(),
2293                fs.clone(),
2294                cx,
2295            )
2296        });
2297        let (worktree_a, _) = project_a
2298            .update(&mut cx_a, |p, cx| {
2299                p.find_or_create_local_worktree("/a", false, cx)
2300            })
2301            .await
2302            .unwrap();
2303        worktree_a
2304            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2305            .await;
2306        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2307        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2308        project_a
2309            .update(&mut cx_a, |p, cx| p.share(cx))
2310            .await
2311            .unwrap();
2312
2313        // Join the worktree as client B.
2314        let project_b = Project::remote(
2315            project_id,
2316            client_b.clone(),
2317            client_b.user_store.clone(),
2318            lang_registry.clone(),
2319            fs.clone(),
2320            &mut cx_b.to_async(),
2321        )
2322        .await
2323        .unwrap();
2324
2325        // Open a file in an editor as the guest.
2326        let buffer_b = project_b
2327            .update(&mut cx_b, |p, cx| {
2328                p.open_buffer((worktree_id, "main.rs"), cx)
2329            })
2330            .await
2331            .unwrap();
2332        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2333        let editor_b = cx_b.add_view(window_b, |cx| {
2334            Editor::for_buffer(
2335                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2336                Arc::new(|cx| EditorSettings::test(cx)),
2337                Some(project_b.clone()),
2338                cx,
2339            )
2340        });
2341
2342        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2343        buffer_b
2344            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2345            .await;
2346
2347        // Type a completion trigger character as the guest.
2348        editor_b.update(&mut cx_b, |editor, cx| {
2349            editor.select_ranges([13..13], None, cx);
2350            editor.handle_input(&Input(".".into()), cx);
2351            cx.focus(&editor_b);
2352        });
2353
2354        // Receive a completion request as the host's language server.
2355        // Return some completions from the host's language server.
2356        cx_a.foreground().start_waiting();
2357        fake_language_server
2358            .handle_request::<lsp::request::Completion, _>(|params, _| {
2359                assert_eq!(
2360                    params.text_document_position.text_document.uri,
2361                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2362                );
2363                assert_eq!(
2364                    params.text_document_position.position,
2365                    lsp::Position::new(0, 14),
2366                );
2367
2368                Some(lsp::CompletionResponse::Array(vec![
2369                    lsp::CompletionItem {
2370                        label: "first_method(…)".into(),
2371                        detail: Some("fn(&mut self, B) -> C".into()),
2372                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2373                            new_text: "first_method($1)".to_string(),
2374                            range: lsp::Range::new(
2375                                lsp::Position::new(0, 14),
2376                                lsp::Position::new(0, 14),
2377                            ),
2378                        })),
2379                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2380                        ..Default::default()
2381                    },
2382                    lsp::CompletionItem {
2383                        label: "second_method(…)".into(),
2384                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2385                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2386                            new_text: "second_method()".to_string(),
2387                            range: lsp::Range::new(
2388                                lsp::Position::new(0, 14),
2389                                lsp::Position::new(0, 14),
2390                            ),
2391                        })),
2392                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2393                        ..Default::default()
2394                    },
2395                ]))
2396            })
2397            .next()
2398            .await
2399            .unwrap();
2400        cx_a.foreground().finish_waiting();
2401
2402        // Open the buffer on the host.
2403        let buffer_a = project_a
2404            .update(&mut cx_a, |p, cx| {
2405                p.open_buffer((worktree_id, "main.rs"), cx)
2406            })
2407            .await
2408            .unwrap();
2409        buffer_a
2410            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2411            .await;
2412
2413        // Confirm a completion on the guest.
2414        editor_b
2415            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2416            .await;
2417        editor_b.update(&mut cx_b, |editor, cx| {
2418            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2419            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2420        });
2421
2422        // Return a resolved completion from the host's language server.
2423        // The resolved completion has an additional text edit.
2424        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2425            |params, _| {
2426                assert_eq!(params.label, "first_method(…)");
2427                lsp::CompletionItem {
2428                    label: "first_method(…)".into(),
2429                    detail: Some("fn(&mut self, B) -> C".into()),
2430                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2431                        new_text: "first_method($1)".to_string(),
2432                        range: lsp::Range::new(
2433                            lsp::Position::new(0, 14),
2434                            lsp::Position::new(0, 14),
2435                        ),
2436                    })),
2437                    additional_text_edits: Some(vec![lsp::TextEdit {
2438                        new_text: "use d::SomeTrait;\n".to_string(),
2439                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2440                    }]),
2441                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2442                    ..Default::default()
2443                }
2444            },
2445        );
2446
2447        // The additional edit is applied.
2448        buffer_a
2449            .condition(&cx_a, |buffer, _| {
2450                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2451            })
2452            .await;
2453        buffer_b
2454            .condition(&cx_b, |buffer, _| {
2455                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2456            })
2457            .await;
2458    }
2459
2460    #[gpui::test(iterations = 10)]
2461    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2462        cx_a.foreground().forbid_parking();
2463        let mut lang_registry = Arc::new(LanguageRegistry::new());
2464        let fs = FakeFs::new(cx_a.background());
2465
2466        // Set up a fake language server.
2467        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2468        Arc::get_mut(&mut lang_registry)
2469            .unwrap()
2470            .add(Arc::new(Language::new(
2471                LanguageConfig {
2472                    name: "Rust".into(),
2473                    path_suffixes: vec!["rs".to_string()],
2474                    language_server: Some(language_server_config),
2475                    ..Default::default()
2476                },
2477                Some(tree_sitter_rust::language()),
2478            )));
2479
2480        // Connect to a server as 2 clients.
2481        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2482        let client_a = server.create_client(&mut cx_a, "user_a").await;
2483        let client_b = server.create_client(&mut cx_b, "user_b").await;
2484
2485        // Share a project as client A
2486        fs.insert_tree(
2487            "/a",
2488            json!({
2489                ".zed.toml": r#"collaborators = ["user_b"]"#,
2490                "a.rs": "let one = two",
2491            }),
2492        )
2493        .await;
2494        let project_a = cx_a.update(|cx| {
2495            Project::local(
2496                client_a.clone(),
2497                client_a.user_store.clone(),
2498                lang_registry.clone(),
2499                fs.clone(),
2500                cx,
2501            )
2502        });
2503        let (worktree_a, _) = project_a
2504            .update(&mut cx_a, |p, cx| {
2505                p.find_or_create_local_worktree("/a", false, cx)
2506            })
2507            .await
2508            .unwrap();
2509        worktree_a
2510            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2511            .await;
2512        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2513        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2514        project_a
2515            .update(&mut cx_a, |p, cx| p.share(cx))
2516            .await
2517            .unwrap();
2518
2519        // Join the worktree as client B.
2520        let project_b = Project::remote(
2521            project_id,
2522            client_b.clone(),
2523            client_b.user_store.clone(),
2524            lang_registry.clone(),
2525            fs.clone(),
2526            &mut cx_b.to_async(),
2527        )
2528        .await
2529        .unwrap();
2530
2531        let buffer_b = cx_b
2532            .background()
2533            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2534            .await
2535            .unwrap();
2536
2537        let format = project_b.update(&mut cx_b, |project, cx| {
2538            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2539        });
2540
2541        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2542        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2543            Some(vec![
2544                lsp::TextEdit {
2545                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2546                    new_text: "h".to_string(),
2547                },
2548                lsp::TextEdit {
2549                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2550                    new_text: "y".to_string(),
2551                },
2552            ])
2553        });
2554
2555        format.await.unwrap();
2556        assert_eq!(
2557            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2558            "let honey = two"
2559        );
2560    }
2561
2562    #[gpui::test(iterations = 10)]
2563    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2564        cx_a.foreground().forbid_parking();
2565        let mut lang_registry = Arc::new(LanguageRegistry::new());
2566        let fs = FakeFs::new(cx_a.background());
2567        fs.insert_tree(
2568            "/root-1",
2569            json!({
2570                ".zed.toml": r#"collaborators = ["user_b"]"#,
2571                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2572            }),
2573        )
2574        .await;
2575        fs.insert_tree(
2576            "/root-2",
2577            json!({
2578                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2579            }),
2580        )
2581        .await;
2582
2583        // Set up a fake language server.
2584        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2585        Arc::get_mut(&mut lang_registry)
2586            .unwrap()
2587            .add(Arc::new(Language::new(
2588                LanguageConfig {
2589                    name: "Rust".into(),
2590                    path_suffixes: vec!["rs".to_string()],
2591                    language_server: Some(language_server_config),
2592                    ..Default::default()
2593                },
2594                Some(tree_sitter_rust::language()),
2595            )));
2596
2597        // Connect to a server as 2 clients.
2598        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2599        let client_a = server.create_client(&mut cx_a, "user_a").await;
2600        let client_b = server.create_client(&mut cx_b, "user_b").await;
2601
2602        // Share a project as client A
2603        let project_a = cx_a.update(|cx| {
2604            Project::local(
2605                client_a.clone(),
2606                client_a.user_store.clone(),
2607                lang_registry.clone(),
2608                fs.clone(),
2609                cx,
2610            )
2611        });
2612        let (worktree_a, _) = project_a
2613            .update(&mut cx_a, |p, cx| {
2614                p.find_or_create_local_worktree("/root-1", false, cx)
2615            })
2616            .await
2617            .unwrap();
2618        worktree_a
2619            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2620            .await;
2621        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2622        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2623        project_a
2624            .update(&mut cx_a, |p, cx| p.share(cx))
2625            .await
2626            .unwrap();
2627
2628        // Join the worktree as client B.
2629        let project_b = Project::remote(
2630            project_id,
2631            client_b.clone(),
2632            client_b.user_store.clone(),
2633            lang_registry.clone(),
2634            fs.clone(),
2635            &mut cx_b.to_async(),
2636        )
2637        .await
2638        .unwrap();
2639
2640        // Open the file on client B.
2641        let buffer_b = cx_b
2642            .background()
2643            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2644            .await
2645            .unwrap();
2646
2647        // Request the definition of a symbol as the guest.
2648        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2649
2650        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2651        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2652            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2653                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2654                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2655            )))
2656        });
2657
2658        let definitions_1 = definitions_1.await.unwrap();
2659        cx_b.read(|cx| {
2660            assert_eq!(definitions_1.len(), 1);
2661            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2662            let target_buffer = definitions_1[0].buffer.read(cx);
2663            assert_eq!(
2664                target_buffer.text(),
2665                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2666            );
2667            assert_eq!(
2668                definitions_1[0].range.to_point(target_buffer),
2669                Point::new(0, 6)..Point::new(0, 9)
2670            );
2671        });
2672
2673        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2674        // the previous call to `definition`.
2675        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2676        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2677            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2678                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2679                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2680            )))
2681        });
2682
2683        let definitions_2 = definitions_2.await.unwrap();
2684        cx_b.read(|cx| {
2685            assert_eq!(definitions_2.len(), 1);
2686            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2687            let target_buffer = definitions_2[0].buffer.read(cx);
2688            assert_eq!(
2689                target_buffer.text(),
2690                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2691            );
2692            assert_eq!(
2693                definitions_2[0].range.to_point(target_buffer),
2694                Point::new(1, 6)..Point::new(1, 11)
2695            );
2696        });
2697        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2698
2699        cx_b.update(|_| {
2700            drop(definitions_1);
2701            drop(definitions_2);
2702        });
2703        project_b
2704            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2705            .await;
2706    }
2707
2708    #[gpui::test(iterations = 10)]
2709    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2710        cx_a.foreground().forbid_parking();
2711        let mut lang_registry = Arc::new(LanguageRegistry::new());
2712        let fs = FakeFs::new(cx_a.background());
2713        fs.insert_tree(
2714            "/root-1",
2715            json!({
2716                ".zed.toml": r#"collaborators = ["user_b"]"#,
2717                "one.rs": "const ONE: usize = 1;",
2718                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2719            }),
2720        )
2721        .await;
2722        fs.insert_tree(
2723            "/root-2",
2724            json!({
2725                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2726            }),
2727        )
2728        .await;
2729
2730        // Set up a fake language server.
2731        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2732        Arc::get_mut(&mut lang_registry)
2733            .unwrap()
2734            .add(Arc::new(Language::new(
2735                LanguageConfig {
2736                    name: "Rust".into(),
2737                    path_suffixes: vec!["rs".to_string()],
2738                    language_server: Some(language_server_config),
2739                    ..Default::default()
2740                },
2741                Some(tree_sitter_rust::language()),
2742            )));
2743
2744        // Connect to a server as 2 clients.
2745        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2746        let client_a = server.create_client(&mut cx_a, "user_a").await;
2747        let client_b = server.create_client(&mut cx_b, "user_b").await;
2748
2749        // Share a project as client A
2750        let project_a = cx_a.update(|cx| {
2751            Project::local(
2752                client_a.clone(),
2753                client_a.user_store.clone(),
2754                lang_registry.clone(),
2755                fs.clone(),
2756                cx,
2757            )
2758        });
2759        let (worktree_a, _) = project_a
2760            .update(&mut cx_a, |p, cx| {
2761                p.find_or_create_local_worktree("/root-1", false, cx)
2762            })
2763            .await
2764            .unwrap();
2765        worktree_a
2766            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2767            .await;
2768        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2769        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2770        project_a
2771            .update(&mut cx_a, |p, cx| p.share(cx))
2772            .await
2773            .unwrap();
2774
2775        // Join the worktree as client B.
2776        let project_b = Project::remote(
2777            project_id,
2778            client_b.clone(),
2779            client_b.user_store.clone(),
2780            lang_registry.clone(),
2781            fs.clone(),
2782            &mut cx_b.to_async(),
2783        )
2784        .await
2785        .unwrap();
2786
2787        // Open the file on client B.
2788        let buffer_b = cx_b
2789            .background()
2790            .spawn(project_b.update(&mut cx_b, |p, cx| {
2791                p.open_buffer((worktree_id, "one.rs"), cx)
2792            }))
2793            .await
2794            .unwrap();
2795
2796        // Request references to a symbol as the guest.
2797        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2798
2799        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2800        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2801            assert_eq!(
2802                params.text_document_position.text_document.uri.as_str(),
2803                "file:///root-1/one.rs"
2804            );
2805            Some(vec![
2806                lsp::Location {
2807                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2808                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2809                },
2810                lsp::Location {
2811                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2812                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2813                },
2814                lsp::Location {
2815                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2816                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2817                },
2818            ])
2819        });
2820
2821        let references = references.await.unwrap();
2822        cx_b.read(|cx| {
2823            assert_eq!(references.len(), 3);
2824            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2825
2826            let two_buffer = references[0].buffer.read(cx);
2827            let three_buffer = references[2].buffer.read(cx);
2828            assert_eq!(
2829                two_buffer.file().unwrap().path().as_ref(),
2830                Path::new("two.rs")
2831            );
2832            assert_eq!(references[1].buffer, references[0].buffer);
2833            assert_eq!(
2834                three_buffer.file().unwrap().full_path(cx),
2835                Path::new("three.rs")
2836            );
2837
2838            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2839            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2840            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2841        });
2842    }
2843
2844    #[gpui::test(iterations = 10)]
2845    async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2846        cx_a.foreground().forbid_parking();
2847        let lang_registry = Arc::new(LanguageRegistry::new());
2848        let fs = FakeFs::new(cx_a.background());
2849        fs.insert_tree(
2850            "/root-1",
2851            json!({
2852                ".zed.toml": r#"collaborators = ["user_b"]"#,
2853                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2854            }),
2855        )
2856        .await;
2857
2858        // Set up a fake language server.
2859        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2860        lang_registry.add(Arc::new(Language::new(
2861            LanguageConfig {
2862                name: "Rust".into(),
2863                path_suffixes: vec!["rs".to_string()],
2864                language_server: Some(language_server_config),
2865                ..Default::default()
2866            },
2867            Some(tree_sitter_rust::language()),
2868        )));
2869
2870        // Connect to a server as 2 clients.
2871        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2872        let client_a = server.create_client(&mut cx_a, "user_a").await;
2873        let client_b = server.create_client(&mut cx_b, "user_b").await;
2874
2875        // Share a project as client A
2876        let project_a = cx_a.update(|cx| {
2877            Project::local(
2878                client_a.clone(),
2879                client_a.user_store.clone(),
2880                lang_registry.clone(),
2881                fs.clone(),
2882                cx,
2883            )
2884        });
2885        let (worktree_a, _) = project_a
2886            .update(&mut cx_a, |p, cx| {
2887                p.find_or_create_local_worktree("/root-1", false, cx)
2888            })
2889            .await
2890            .unwrap();
2891        worktree_a
2892            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2893            .await;
2894        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2895        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2896        project_a
2897            .update(&mut cx_a, |p, cx| p.share(cx))
2898            .await
2899            .unwrap();
2900
2901        // Join the worktree as client B.
2902        let project_b = Project::remote(
2903            project_id,
2904            client_b.clone(),
2905            client_b.user_store.clone(),
2906            lang_registry.clone(),
2907            fs.clone(),
2908            &mut cx_b.to_async(),
2909        )
2910        .await
2911        .unwrap();
2912
2913        // Open the file on client B.
2914        let buffer_b = cx_b
2915            .background()
2916            .spawn(project_b.update(&mut cx_b, |p, cx| {
2917                p.open_buffer((worktree_id, "main.rs"), cx)
2918            }))
2919            .await
2920            .unwrap();
2921
2922        // Request document highlights as the guest.
2923        let highlights =
2924            project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2925
2926        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2927        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2928            |params, _| {
2929                assert_eq!(
2930                    params
2931                        .text_document_position_params
2932                        .text_document
2933                        .uri
2934                        .as_str(),
2935                    "file:///root-1/main.rs"
2936                );
2937                assert_eq!(
2938                    params.text_document_position_params.position,
2939                    lsp::Position::new(0, 34)
2940                );
2941                Some(vec![
2942                    lsp::DocumentHighlight {
2943                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2944                        range: lsp::Range::new(
2945                            lsp::Position::new(0, 10),
2946                            lsp::Position::new(0, 16),
2947                        ),
2948                    },
2949                    lsp::DocumentHighlight {
2950                        kind: Some(lsp::DocumentHighlightKind::READ),
2951                        range: lsp::Range::new(
2952                            lsp::Position::new(0, 32),
2953                            lsp::Position::new(0, 38),
2954                        ),
2955                    },
2956                    lsp::DocumentHighlight {
2957                        kind: Some(lsp::DocumentHighlightKind::READ),
2958                        range: lsp::Range::new(
2959                            lsp::Position::new(0, 41),
2960                            lsp::Position::new(0, 47),
2961                        ),
2962                    },
2963                ])
2964            },
2965        );
2966
2967        let highlights = highlights.await.unwrap();
2968        buffer_b.read_with(&cx_b, |buffer, _| {
2969            let snapshot = buffer.snapshot();
2970
2971            let highlights = highlights
2972                .into_iter()
2973                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2974                .collect::<Vec<_>>();
2975            assert_eq!(
2976                highlights,
2977                &[
2978                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2979                    (lsp::DocumentHighlightKind::READ, 32..38),
2980                    (lsp::DocumentHighlightKind::READ, 41..47)
2981                ]
2982            )
2983        });
2984    }
2985
2986    #[gpui::test(iterations = 10)]
2987    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2988        cx_a.foreground().forbid_parking();
2989        let mut lang_registry = Arc::new(LanguageRegistry::new());
2990        let fs = FakeFs::new(cx_a.background());
2991        fs.insert_tree(
2992            "/code",
2993            json!({
2994                "crate-1": {
2995                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2996                    "one.rs": "const ONE: usize = 1;",
2997                },
2998                "crate-2": {
2999                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3000                },
3001                "private": {
3002                    "passwords.txt": "the-password",
3003                }
3004            }),
3005        )
3006        .await;
3007
3008        // Set up a fake language server.
3009        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3010        Arc::get_mut(&mut lang_registry)
3011            .unwrap()
3012            .add(Arc::new(Language::new(
3013                LanguageConfig {
3014                    name: "Rust".into(),
3015                    path_suffixes: vec!["rs".to_string()],
3016                    language_server: Some(language_server_config),
3017                    ..Default::default()
3018                },
3019                Some(tree_sitter_rust::language()),
3020            )));
3021
3022        // Connect to a server as 2 clients.
3023        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3024        let client_a = server.create_client(&mut cx_a, "user_a").await;
3025        let client_b = server.create_client(&mut cx_b, "user_b").await;
3026
3027        // Share a project as client A
3028        let project_a = cx_a.update(|cx| {
3029            Project::local(
3030                client_a.clone(),
3031                client_a.user_store.clone(),
3032                lang_registry.clone(),
3033                fs.clone(),
3034                cx,
3035            )
3036        });
3037        let (worktree_a, _) = project_a
3038            .update(&mut cx_a, |p, cx| {
3039                p.find_or_create_local_worktree("/code/crate-1", false, cx)
3040            })
3041            .await
3042            .unwrap();
3043        worktree_a
3044            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3045            .await;
3046        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3047        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3048        project_a
3049            .update(&mut cx_a, |p, cx| p.share(cx))
3050            .await
3051            .unwrap();
3052
3053        // Join the worktree as client B.
3054        let project_b = Project::remote(
3055            project_id,
3056            client_b.clone(),
3057            client_b.user_store.clone(),
3058            lang_registry.clone(),
3059            fs.clone(),
3060            &mut cx_b.to_async(),
3061        )
3062        .await
3063        .unwrap();
3064
3065        // Cause the language server to start.
3066        let _buffer = cx_b
3067            .background()
3068            .spawn(project_b.update(&mut cx_b, |p, cx| {
3069                p.open_buffer((worktree_id, "one.rs"), cx)
3070            }))
3071            .await
3072            .unwrap();
3073
3074        // Request the definition of a symbol as the guest.
3075        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3076        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3077        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3078            #[allow(deprecated)]
3079            Some(vec![lsp::SymbolInformation {
3080                name: "TWO".into(),
3081                location: lsp::Location {
3082                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3083                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3084                },
3085                kind: lsp::SymbolKind::CONSTANT,
3086                tags: None,
3087                container_name: None,
3088                deprecated: None,
3089            }])
3090        });
3091
3092        let symbols = symbols.await.unwrap();
3093        assert_eq!(symbols.len(), 1);
3094        assert_eq!(symbols[0].name, "TWO");
3095
3096        // Open one of the returned symbols.
3097        let buffer_b_2 = project_b
3098            .update(&mut cx_b, |project, cx| {
3099                project.open_buffer_for_symbol(&symbols[0], cx)
3100            })
3101            .await
3102            .unwrap();
3103        buffer_b_2.read_with(&cx_b, |buffer, _| {
3104            assert_eq!(
3105                buffer.file().unwrap().path().as_ref(),
3106                Path::new("../crate-2/two.rs")
3107            );
3108        });
3109
3110        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3111        let mut fake_symbol = symbols[0].clone();
3112        fake_symbol.path = Path::new("/code/secrets").into();
3113        let error = project_b
3114            .update(&mut cx_b, |project, cx| {
3115                project.open_buffer_for_symbol(&fake_symbol, cx)
3116            })
3117            .await
3118            .unwrap_err();
3119        assert!(error.to_string().contains("invalid symbol signature"));
3120    }
3121
3122    #[gpui::test(iterations = 10)]
3123    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3124        mut cx_a: TestAppContext,
3125        mut cx_b: TestAppContext,
3126        mut rng: StdRng,
3127    ) {
3128        cx_a.foreground().forbid_parking();
3129        let mut lang_registry = Arc::new(LanguageRegistry::new());
3130        let fs = FakeFs::new(cx_a.background());
3131        fs.insert_tree(
3132            "/root",
3133            json!({
3134                ".zed.toml": r#"collaborators = ["user_b"]"#,
3135                "a.rs": "const ONE: usize = b::TWO;",
3136                "b.rs": "const TWO: usize = 2",
3137            }),
3138        )
3139        .await;
3140
3141        // Set up a fake language server.
3142        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3143
3144        Arc::get_mut(&mut lang_registry)
3145            .unwrap()
3146            .add(Arc::new(Language::new(
3147                LanguageConfig {
3148                    name: "Rust".into(),
3149                    path_suffixes: vec!["rs".to_string()],
3150                    language_server: Some(language_server_config),
3151                    ..Default::default()
3152                },
3153                Some(tree_sitter_rust::language()),
3154            )));
3155
3156        // Connect to a server as 2 clients.
3157        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3158        let client_a = server.create_client(&mut cx_a, "user_a").await;
3159        let client_b = server.create_client(&mut cx_b, "user_b").await;
3160
3161        // Share a project as client A
3162        let project_a = cx_a.update(|cx| {
3163            Project::local(
3164                client_a.clone(),
3165                client_a.user_store.clone(),
3166                lang_registry.clone(),
3167                fs.clone(),
3168                cx,
3169            )
3170        });
3171
3172        let (worktree_a, _) = project_a
3173            .update(&mut cx_a, |p, cx| {
3174                p.find_or_create_local_worktree("/root", false, cx)
3175            })
3176            .await
3177            .unwrap();
3178        worktree_a
3179            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3180            .await;
3181        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3182        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3183        project_a
3184            .update(&mut cx_a, |p, cx| p.share(cx))
3185            .await
3186            .unwrap();
3187
3188        // Join the worktree as client B.
3189        let project_b = Project::remote(
3190            project_id,
3191            client_b.clone(),
3192            client_b.user_store.clone(),
3193            lang_registry.clone(),
3194            fs.clone(),
3195            &mut cx_b.to_async(),
3196        )
3197        .await
3198        .unwrap();
3199
3200        let buffer_b1 = cx_b
3201            .background()
3202            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3203            .await
3204            .unwrap();
3205
3206        let definitions;
3207        let buffer_b2;
3208        if rng.gen() {
3209            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3210            buffer_b2 =
3211                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3212        } else {
3213            buffer_b2 =
3214                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3215            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3216        }
3217
3218        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3219        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3220            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3221                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3222                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3223            )))
3224        });
3225
3226        let buffer_b2 = buffer_b2.await.unwrap();
3227        let definitions = definitions.await.unwrap();
3228        assert_eq!(definitions.len(), 1);
3229        assert_eq!(definitions[0].buffer, buffer_b2);
3230    }
3231
3232    #[gpui::test(iterations = 10)]
3233    async fn test_collaborating_with_code_actions(
3234        mut cx_a: TestAppContext,
3235        mut cx_b: TestAppContext,
3236    ) {
3237        cx_a.foreground().forbid_parking();
3238        let mut lang_registry = Arc::new(LanguageRegistry::new());
3239        let fs = FakeFs::new(cx_a.background());
3240        let mut path_openers_b = Vec::new();
3241        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3242
3243        // Set up a fake language server.
3244        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3245        Arc::get_mut(&mut lang_registry)
3246            .unwrap()
3247            .add(Arc::new(Language::new(
3248                LanguageConfig {
3249                    name: "Rust".into(),
3250                    path_suffixes: vec!["rs".to_string()],
3251                    language_server: Some(language_server_config),
3252                    ..Default::default()
3253                },
3254                Some(tree_sitter_rust::language()),
3255            )));
3256
3257        // Connect to a server as 2 clients.
3258        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3259        let client_a = server.create_client(&mut cx_a, "user_a").await;
3260        let client_b = server.create_client(&mut cx_b, "user_b").await;
3261
3262        // Share a project as client A
3263        fs.insert_tree(
3264            "/a",
3265            json!({
3266                ".zed.toml": r#"collaborators = ["user_b"]"#,
3267                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3268                "other.rs": "pub fn foo() -> usize { 4 }",
3269            }),
3270        )
3271        .await;
3272        let project_a = cx_a.update(|cx| {
3273            Project::local(
3274                client_a.clone(),
3275                client_a.user_store.clone(),
3276                lang_registry.clone(),
3277                fs.clone(),
3278                cx,
3279            )
3280        });
3281        let (worktree_a, _) = project_a
3282            .update(&mut cx_a, |p, cx| {
3283                p.find_or_create_local_worktree("/a", false, cx)
3284            })
3285            .await
3286            .unwrap();
3287        worktree_a
3288            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3289            .await;
3290        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3291        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3292        project_a
3293            .update(&mut cx_a, |p, cx| p.share(cx))
3294            .await
3295            .unwrap();
3296
3297        // Join the worktree as client B.
3298        let project_b = Project::remote(
3299            project_id,
3300            client_b.clone(),
3301            client_b.user_store.clone(),
3302            lang_registry.clone(),
3303            fs.clone(),
3304            &mut cx_b.to_async(),
3305        )
3306        .await
3307        .unwrap();
3308        let mut params = cx_b.update(WorkspaceParams::test);
3309        params.languages = lang_registry.clone();
3310        params.client = client_b.client.clone();
3311        params.user_store = client_b.user_store.clone();
3312        params.project = project_b;
3313        params.path_openers = path_openers_b.into();
3314
3315        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3316        let editor_b = workspace_b
3317            .update(&mut cx_b, |workspace, cx| {
3318                workspace.open_path((worktree_id, "main.rs").into(), cx)
3319            })
3320            .await
3321            .unwrap()
3322            .downcast::<Editor>()
3323            .unwrap();
3324
3325        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3326        fake_language_server
3327            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3328                assert_eq!(
3329                    params.text_document.uri,
3330                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3331                );
3332                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3333                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3334                None
3335            })
3336            .next()
3337            .await;
3338
3339        // Move cursor to a location that contains code actions.
3340        editor_b.update(&mut cx_b, |editor, cx| {
3341            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3342            cx.focus(&editor_b);
3343        });
3344
3345        fake_language_server
3346            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3347                assert_eq!(
3348                    params.text_document.uri,
3349                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3350                );
3351                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3352                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3353
3354                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3355                    lsp::CodeAction {
3356                        title: "Inline into all callers".to_string(),
3357                        edit: Some(lsp::WorkspaceEdit {
3358                            changes: Some(
3359                                [
3360                                    (
3361                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3362                                        vec![lsp::TextEdit::new(
3363                                            lsp::Range::new(
3364                                                lsp::Position::new(1, 22),
3365                                                lsp::Position::new(1, 34),
3366                                            ),
3367                                            "4".to_string(),
3368                                        )],
3369                                    ),
3370                                    (
3371                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3372                                        vec![lsp::TextEdit::new(
3373                                            lsp::Range::new(
3374                                                lsp::Position::new(0, 0),
3375                                                lsp::Position::new(0, 27),
3376                                            ),
3377                                            "".to_string(),
3378                                        )],
3379                                    ),
3380                                ]
3381                                .into_iter()
3382                                .collect(),
3383                            ),
3384                            ..Default::default()
3385                        }),
3386                        data: Some(json!({
3387                            "codeActionParams": {
3388                                "range": {
3389                                    "start": {"line": 1, "column": 31},
3390                                    "end": {"line": 1, "column": 31},
3391                                }
3392                            }
3393                        })),
3394                        ..Default::default()
3395                    },
3396                )])
3397            })
3398            .next()
3399            .await;
3400
3401        // Toggle code actions and wait for them to display.
3402        editor_b.update(&mut cx_b, |editor, cx| {
3403            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3404        });
3405        editor_b
3406            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3407            .await;
3408
3409        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3410
3411        // Confirming the code action will trigger a resolve request.
3412        let confirm_action = workspace_b
3413            .update(&mut cx_b, |workspace, cx| {
3414                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3415            })
3416            .unwrap();
3417        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3418            lsp::CodeAction {
3419                title: "Inline into all callers".to_string(),
3420                edit: Some(lsp::WorkspaceEdit {
3421                    changes: Some(
3422                        [
3423                            (
3424                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3425                                vec![lsp::TextEdit::new(
3426                                    lsp::Range::new(
3427                                        lsp::Position::new(1, 22),
3428                                        lsp::Position::new(1, 34),
3429                                    ),
3430                                    "4".to_string(),
3431                                )],
3432                            ),
3433                            (
3434                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3435                                vec![lsp::TextEdit::new(
3436                                    lsp::Range::new(
3437                                        lsp::Position::new(0, 0),
3438                                        lsp::Position::new(0, 27),
3439                                    ),
3440                                    "".to_string(),
3441                                )],
3442                            ),
3443                        ]
3444                        .into_iter()
3445                        .collect(),
3446                    ),
3447                    ..Default::default()
3448                }),
3449                ..Default::default()
3450            }
3451        });
3452
3453        // After the action is confirmed, an editor containing both modified files is opened.
3454        confirm_action.await.unwrap();
3455        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3456            workspace
3457                .active_item(cx)
3458                .unwrap()
3459                .downcast::<Editor>()
3460                .unwrap()
3461        });
3462        code_action_editor.update(&mut cx_b, |editor, cx| {
3463            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3464            editor.undo(&Undo, cx);
3465            assert_eq!(
3466                editor.text(cx),
3467                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3468            );
3469            editor.redo(&Redo, cx);
3470            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3471        });
3472    }
3473
3474    #[gpui::test(iterations = 10)]
3475    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3476        cx_a.foreground().forbid_parking();
3477        let mut lang_registry = Arc::new(LanguageRegistry::new());
3478        let fs = FakeFs::new(cx_a.background());
3479        let mut path_openers_b = Vec::new();
3480        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3481
3482        // Set up a fake language server.
3483        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3484        Arc::get_mut(&mut lang_registry)
3485            .unwrap()
3486            .add(Arc::new(Language::new(
3487                LanguageConfig {
3488                    name: "Rust".into(),
3489                    path_suffixes: vec!["rs".to_string()],
3490                    language_server: Some(language_server_config),
3491                    ..Default::default()
3492                },
3493                Some(tree_sitter_rust::language()),
3494            )));
3495
3496        // Connect to a server as 2 clients.
3497        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3498        let client_a = server.create_client(&mut cx_a, "user_a").await;
3499        let client_b = server.create_client(&mut cx_b, "user_b").await;
3500
3501        // Share a project as client A
3502        fs.insert_tree(
3503            "/dir",
3504            json!({
3505                ".zed.toml": r#"collaborators = ["user_b"]"#,
3506                "one.rs": "const ONE: usize = 1;",
3507                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3508            }),
3509        )
3510        .await;
3511        let project_a = cx_a.update(|cx| {
3512            Project::local(
3513                client_a.clone(),
3514                client_a.user_store.clone(),
3515                lang_registry.clone(),
3516                fs.clone(),
3517                cx,
3518            )
3519        });
3520        let (worktree_a, _) = project_a
3521            .update(&mut cx_a, |p, cx| {
3522                p.find_or_create_local_worktree("/dir", false, cx)
3523            })
3524            .await
3525            .unwrap();
3526        worktree_a
3527            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3528            .await;
3529        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3530        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3531        project_a
3532            .update(&mut cx_a, |p, cx| p.share(cx))
3533            .await
3534            .unwrap();
3535
3536        // Join the worktree as client B.
3537        let project_b = Project::remote(
3538            project_id,
3539            client_b.clone(),
3540            client_b.user_store.clone(),
3541            lang_registry.clone(),
3542            fs.clone(),
3543            &mut cx_b.to_async(),
3544        )
3545        .await
3546        .unwrap();
3547        let mut params = cx_b.update(WorkspaceParams::test);
3548        params.languages = lang_registry.clone();
3549        params.client = client_b.client.clone();
3550        params.user_store = client_b.user_store.clone();
3551        params.project = project_b;
3552        params.path_openers = path_openers_b.into();
3553
3554        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3555        let editor_b = workspace_b
3556            .update(&mut cx_b, |workspace, cx| {
3557                workspace.open_path((worktree_id, "one.rs").into(), cx)
3558            })
3559            .await
3560            .unwrap()
3561            .downcast::<Editor>()
3562            .unwrap();
3563        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3564
3565        // Move cursor to a location that can be renamed.
3566        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3567            editor.select_ranges([7..7], None, cx);
3568            editor.rename(&Rename, cx).unwrap()
3569        });
3570
3571        fake_language_server
3572            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3573                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3574                assert_eq!(params.position, lsp::Position::new(0, 7));
3575                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3576                    lsp::Position::new(0, 6),
3577                    lsp::Position::new(0, 9),
3578                )))
3579            })
3580            .next()
3581            .await
3582            .unwrap();
3583        prepare_rename.await.unwrap();
3584        editor_b.update(&mut cx_b, |editor, cx| {
3585            let rename = editor.pending_rename().unwrap();
3586            let buffer = editor.buffer().read(cx).snapshot(cx);
3587            assert_eq!(
3588                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3589                6..9
3590            );
3591            rename.editor.update(cx, |rename_editor, cx| {
3592                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3593                    rename_buffer.edit([0..3], "THREE", cx);
3594                });
3595            });
3596        });
3597
3598        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3599            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3600        });
3601        fake_language_server
3602            .handle_request::<lsp::request::Rename, _>(|params, _| {
3603                assert_eq!(
3604                    params.text_document_position.text_document.uri.as_str(),
3605                    "file:///dir/one.rs"
3606                );
3607                assert_eq!(
3608                    params.text_document_position.position,
3609                    lsp::Position::new(0, 6)
3610                );
3611                assert_eq!(params.new_name, "THREE");
3612                Some(lsp::WorkspaceEdit {
3613                    changes: Some(
3614                        [
3615                            (
3616                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3617                                vec![lsp::TextEdit::new(
3618                                    lsp::Range::new(
3619                                        lsp::Position::new(0, 6),
3620                                        lsp::Position::new(0, 9),
3621                                    ),
3622                                    "THREE".to_string(),
3623                                )],
3624                            ),
3625                            (
3626                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3627                                vec![
3628                                    lsp::TextEdit::new(
3629                                        lsp::Range::new(
3630                                            lsp::Position::new(0, 24),
3631                                            lsp::Position::new(0, 27),
3632                                        ),
3633                                        "THREE".to_string(),
3634                                    ),
3635                                    lsp::TextEdit::new(
3636                                        lsp::Range::new(
3637                                            lsp::Position::new(0, 35),
3638                                            lsp::Position::new(0, 38),
3639                                        ),
3640                                        "THREE".to_string(),
3641                                    ),
3642                                ],
3643                            ),
3644                        ]
3645                        .into_iter()
3646                        .collect(),
3647                    ),
3648                    ..Default::default()
3649                })
3650            })
3651            .next()
3652            .await
3653            .unwrap();
3654        confirm_rename.await.unwrap();
3655
3656        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3657            workspace
3658                .active_item(cx)
3659                .unwrap()
3660                .downcast::<Editor>()
3661                .unwrap()
3662        });
3663        rename_editor.update(&mut cx_b, |editor, cx| {
3664            assert_eq!(
3665                editor.text(cx),
3666                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3667            );
3668            editor.undo(&Undo, cx);
3669            assert_eq!(
3670                editor.text(cx),
3671                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3672            );
3673            editor.redo(&Redo, cx);
3674            assert_eq!(
3675                editor.text(cx),
3676                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3677            );
3678        });
3679
3680        // Ensure temporary rename edits cannot be undone/redone.
3681        editor_b.update(&mut cx_b, |editor, cx| {
3682            editor.undo(&Undo, cx);
3683            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3684            editor.undo(&Undo, cx);
3685            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3686            editor.redo(&Redo, cx);
3687            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3688        })
3689    }
3690
3691    #[gpui::test(iterations = 10)]
3692    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3693        cx_a.foreground().forbid_parking();
3694
3695        // Connect to a server as 2 clients.
3696        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3697        let client_a = server.create_client(&mut cx_a, "user_a").await;
3698        let client_b = server.create_client(&mut cx_b, "user_b").await;
3699
3700        // Create an org that includes these 2 users.
3701        let db = &server.app_state.db;
3702        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3703        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3704            .await
3705            .unwrap();
3706        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3707            .await
3708            .unwrap();
3709
3710        // Create a channel that includes all the users.
3711        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3712        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3713            .await
3714            .unwrap();
3715        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3716            .await
3717            .unwrap();
3718        db.create_channel_message(
3719            channel_id,
3720            client_b.current_user_id(&cx_b),
3721            "hello A, it's B.",
3722            OffsetDateTime::now_utc(),
3723            1,
3724        )
3725        .await
3726        .unwrap();
3727
3728        let channels_a = cx_a
3729            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3730        channels_a
3731            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3732            .await;
3733        channels_a.read_with(&cx_a, |list, _| {
3734            assert_eq!(
3735                list.available_channels().unwrap(),
3736                &[ChannelDetails {
3737                    id: channel_id.to_proto(),
3738                    name: "test-channel".to_string()
3739                }]
3740            )
3741        });
3742        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3743            this.get_channel(channel_id.to_proto(), cx).unwrap()
3744        });
3745        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3746        channel_a
3747            .condition(&cx_a, |channel, _| {
3748                channel_messages(channel)
3749                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3750            })
3751            .await;
3752
3753        let channels_b = cx_b
3754            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3755        channels_b
3756            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3757            .await;
3758        channels_b.read_with(&cx_b, |list, _| {
3759            assert_eq!(
3760                list.available_channels().unwrap(),
3761                &[ChannelDetails {
3762                    id: channel_id.to_proto(),
3763                    name: "test-channel".to_string()
3764                }]
3765            )
3766        });
3767
3768        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3769            this.get_channel(channel_id.to_proto(), cx).unwrap()
3770        });
3771        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3772        channel_b
3773            .condition(&cx_b, |channel, _| {
3774                channel_messages(channel)
3775                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3776            })
3777            .await;
3778
3779        channel_a
3780            .update(&mut cx_a, |channel, cx| {
3781                channel
3782                    .send_message("oh, hi B.".to_string(), cx)
3783                    .unwrap()
3784                    .detach();
3785                let task = channel.send_message("sup".to_string(), cx).unwrap();
3786                assert_eq!(
3787                    channel_messages(channel),
3788                    &[
3789                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3790                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3791                        ("user_a".to_string(), "sup".to_string(), true)
3792                    ]
3793                );
3794                task
3795            })
3796            .await
3797            .unwrap();
3798
3799        channel_b
3800            .condition(&cx_b, |channel, _| {
3801                channel_messages(channel)
3802                    == [
3803                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3804                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3805                        ("user_a".to_string(), "sup".to_string(), false),
3806                    ]
3807            })
3808            .await;
3809
3810        assert_eq!(
3811            server
3812                .state()
3813                .await
3814                .channel(channel_id)
3815                .unwrap()
3816                .connection_ids
3817                .len(),
3818            2
3819        );
3820        cx_b.update(|_| drop(channel_b));
3821        server
3822            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3823            .await;
3824
3825        cx_a.update(|_| drop(channel_a));
3826        server
3827            .condition(|state| state.channel(channel_id).is_none())
3828            .await;
3829    }
3830
3831    #[gpui::test(iterations = 10)]
3832    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3833        cx_a.foreground().forbid_parking();
3834
3835        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3836        let client_a = server.create_client(&mut cx_a, "user_a").await;
3837
3838        let db = &server.app_state.db;
3839        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3840        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3841        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3842            .await
3843            .unwrap();
3844        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3845            .await
3846            .unwrap();
3847
3848        let channels_a = cx_a
3849            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3850        channels_a
3851            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3852            .await;
3853        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3854            this.get_channel(channel_id.to_proto(), cx).unwrap()
3855        });
3856
3857        // Messages aren't allowed to be too long.
3858        channel_a
3859            .update(&mut cx_a, |channel, cx| {
3860                let long_body = "this is long.\n".repeat(1024);
3861                channel.send_message(long_body, cx).unwrap()
3862            })
3863            .await
3864            .unwrap_err();
3865
3866        // Messages aren't allowed to be blank.
3867        channel_a.update(&mut cx_a, |channel, cx| {
3868            channel.send_message(String::new(), cx).unwrap_err()
3869        });
3870
3871        // Leading and trailing whitespace are trimmed.
3872        channel_a
3873            .update(&mut cx_a, |channel, cx| {
3874                channel
3875                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3876                    .unwrap()
3877            })
3878            .await
3879            .unwrap();
3880        assert_eq!(
3881            db.get_channel_messages(channel_id, 10, None)
3882                .await
3883                .unwrap()
3884                .iter()
3885                .map(|m| &m.body)
3886                .collect::<Vec<_>>(),
3887            &["surrounded by whitespace"]
3888        );
3889    }
3890
3891    #[gpui::test(iterations = 10)]
3892    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3893        cx_a.foreground().forbid_parking();
3894
3895        // Connect to a server as 2 clients.
3896        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3897        let client_a = server.create_client(&mut cx_a, "user_a").await;
3898        let client_b = server.create_client(&mut cx_b, "user_b").await;
3899        let mut status_b = client_b.status();
3900
3901        // Create an org that includes these 2 users.
3902        let db = &server.app_state.db;
3903        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3904        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3905            .await
3906            .unwrap();
3907        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3908            .await
3909            .unwrap();
3910
3911        // Create a channel that includes all the users.
3912        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3913        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3914            .await
3915            .unwrap();
3916        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3917            .await
3918            .unwrap();
3919        db.create_channel_message(
3920            channel_id,
3921            client_b.current_user_id(&cx_b),
3922            "hello A, it's B.",
3923            OffsetDateTime::now_utc(),
3924            2,
3925        )
3926        .await
3927        .unwrap();
3928
3929        let channels_a = cx_a
3930            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3931        channels_a
3932            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3933            .await;
3934
3935        channels_a.read_with(&cx_a, |list, _| {
3936            assert_eq!(
3937                list.available_channels().unwrap(),
3938                &[ChannelDetails {
3939                    id: channel_id.to_proto(),
3940                    name: "test-channel".to_string()
3941                }]
3942            )
3943        });
3944        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3945            this.get_channel(channel_id.to_proto(), cx).unwrap()
3946        });
3947        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3948        channel_a
3949            .condition(&cx_a, |channel, _| {
3950                channel_messages(channel)
3951                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3952            })
3953            .await;
3954
3955        let channels_b = cx_b
3956            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3957        channels_b
3958            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3959            .await;
3960        channels_b.read_with(&cx_b, |list, _| {
3961            assert_eq!(
3962                list.available_channels().unwrap(),
3963                &[ChannelDetails {
3964                    id: channel_id.to_proto(),
3965                    name: "test-channel".to_string()
3966                }]
3967            )
3968        });
3969
3970        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3971            this.get_channel(channel_id.to_proto(), cx).unwrap()
3972        });
3973        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3974        channel_b
3975            .condition(&cx_b, |channel, _| {
3976                channel_messages(channel)
3977                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3978            })
3979            .await;
3980
3981        // Disconnect client B, ensuring we can still access its cached channel data.
3982        server.forbid_connections();
3983        server.disconnect_client(client_b.current_user_id(&cx_b));
3984        while !matches!(
3985            status_b.next().await,
3986            Some(client::Status::ReconnectionError { .. })
3987        ) {}
3988
3989        channels_b.read_with(&cx_b, |channels, _| {
3990            assert_eq!(
3991                channels.available_channels().unwrap(),
3992                [ChannelDetails {
3993                    id: channel_id.to_proto(),
3994                    name: "test-channel".to_string()
3995                }]
3996            )
3997        });
3998        channel_b.read_with(&cx_b, |channel, _| {
3999            assert_eq!(
4000                channel_messages(channel),
4001                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4002            )
4003        });
4004
4005        // Send a message from client B while it is disconnected.
4006        channel_b
4007            .update(&mut cx_b, |channel, cx| {
4008                let task = channel
4009                    .send_message("can you see this?".to_string(), cx)
4010                    .unwrap();
4011                assert_eq!(
4012                    channel_messages(channel),
4013                    &[
4014                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4015                        ("user_b".to_string(), "can you see this?".to_string(), true)
4016                    ]
4017                );
4018                task
4019            })
4020            .await
4021            .unwrap_err();
4022
4023        // Send a message from client A while B is disconnected.
4024        channel_a
4025            .update(&mut cx_a, |channel, cx| {
4026                channel
4027                    .send_message("oh, hi B.".to_string(), cx)
4028                    .unwrap()
4029                    .detach();
4030                let task = channel.send_message("sup".to_string(), cx).unwrap();
4031                assert_eq!(
4032                    channel_messages(channel),
4033                    &[
4034                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4035                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4036                        ("user_a".to_string(), "sup".to_string(), true)
4037                    ]
4038                );
4039                task
4040            })
4041            .await
4042            .unwrap();
4043
4044        // Give client B a chance to reconnect.
4045        server.allow_connections();
4046        cx_b.foreground().advance_clock(Duration::from_secs(10));
4047
4048        // Verify that B sees the new messages upon reconnection, as well as the message client B
4049        // sent while offline.
4050        channel_b
4051            .condition(&cx_b, |channel, _| {
4052                channel_messages(channel)
4053                    == [
4054                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4055                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4056                        ("user_a".to_string(), "sup".to_string(), false),
4057                        ("user_b".to_string(), "can you see this?".to_string(), false),
4058                    ]
4059            })
4060            .await;
4061
4062        // Ensure client A and B can communicate normally after reconnection.
4063        channel_a
4064            .update(&mut cx_a, |channel, cx| {
4065                channel.send_message("you online?".to_string(), cx).unwrap()
4066            })
4067            .await
4068            .unwrap();
4069        channel_b
4070            .condition(&cx_b, |channel, _| {
4071                channel_messages(channel)
4072                    == [
4073                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4074                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4075                        ("user_a".to_string(), "sup".to_string(), false),
4076                        ("user_b".to_string(), "can you see this?".to_string(), false),
4077                        ("user_a".to_string(), "you online?".to_string(), false),
4078                    ]
4079            })
4080            .await;
4081
4082        channel_b
4083            .update(&mut cx_b, |channel, cx| {
4084                channel.send_message("yep".to_string(), cx).unwrap()
4085            })
4086            .await
4087            .unwrap();
4088        channel_a
4089            .condition(&cx_a, |channel, _| {
4090                channel_messages(channel)
4091                    == [
4092                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4093                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4094                        ("user_a".to_string(), "sup".to_string(), false),
4095                        ("user_b".to_string(), "can you see this?".to_string(), false),
4096                        ("user_a".to_string(), "you online?".to_string(), false),
4097                        ("user_b".to_string(), "yep".to_string(), false),
4098                    ]
4099            })
4100            .await;
4101    }
4102
4103    #[gpui::test(iterations = 10)]
4104    async fn test_contacts(
4105        mut cx_a: TestAppContext,
4106        mut cx_b: TestAppContext,
4107        mut cx_c: TestAppContext,
4108    ) {
4109        cx_a.foreground().forbid_parking();
4110        let lang_registry = Arc::new(LanguageRegistry::new());
4111        let fs = FakeFs::new(cx_a.background());
4112
4113        // Connect to a server as 3 clients.
4114        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4115        let client_a = server.create_client(&mut cx_a, "user_a").await;
4116        let client_b = server.create_client(&mut cx_b, "user_b").await;
4117        let client_c = server.create_client(&mut cx_c, "user_c").await;
4118
4119        // Share a worktree as client A.
4120        fs.insert_tree(
4121            "/a",
4122            json!({
4123                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4124            }),
4125        )
4126        .await;
4127
4128        let project_a = cx_a.update(|cx| {
4129            Project::local(
4130                client_a.clone(),
4131                client_a.user_store.clone(),
4132                lang_registry.clone(),
4133                fs.clone(),
4134                cx,
4135            )
4136        });
4137        let (worktree_a, _) = project_a
4138            .update(&mut cx_a, |p, cx| {
4139                p.find_or_create_local_worktree("/a", false, cx)
4140            })
4141            .await
4142            .unwrap();
4143        worktree_a
4144            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4145            .await;
4146
4147        client_a
4148            .user_store
4149            .condition(&cx_a, |user_store, _| {
4150                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4151            })
4152            .await;
4153        client_b
4154            .user_store
4155            .condition(&cx_b, |user_store, _| {
4156                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4157            })
4158            .await;
4159        client_c
4160            .user_store
4161            .condition(&cx_c, |user_store, _| {
4162                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4163            })
4164            .await;
4165
4166        let project_id = project_a
4167            .update(&mut cx_a, |project, _| project.next_remote_id())
4168            .await;
4169        project_a
4170            .update(&mut cx_a, |project, cx| project.share(cx))
4171            .await
4172            .unwrap();
4173
4174        let _project_b = Project::remote(
4175            project_id,
4176            client_b.clone(),
4177            client_b.user_store.clone(),
4178            lang_registry.clone(),
4179            fs.clone(),
4180            &mut cx_b.to_async(),
4181        )
4182        .await
4183        .unwrap();
4184
4185        client_a
4186            .user_store
4187            .condition(&cx_a, |user_store, _| {
4188                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4189            })
4190            .await;
4191        client_b
4192            .user_store
4193            .condition(&cx_b, |user_store, _| {
4194                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4195            })
4196            .await;
4197        client_c
4198            .user_store
4199            .condition(&cx_c, |user_store, _| {
4200                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4201            })
4202            .await;
4203
4204        project_a
4205            .condition(&cx_a, |project, _| {
4206                project.collaborators().contains_key(&client_b.peer_id)
4207            })
4208            .await;
4209
4210        cx_a.update(move |_| drop(project_a));
4211        client_a
4212            .user_store
4213            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4214            .await;
4215        client_b
4216            .user_store
4217            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4218            .await;
4219        client_c
4220            .user_store
4221            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4222            .await;
4223
4224        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4225            user_store
4226                .contacts()
4227                .iter()
4228                .map(|contact| {
4229                    let worktrees = contact
4230                        .projects
4231                        .iter()
4232                        .map(|p| {
4233                            (
4234                                p.worktree_root_names[0].as_str(),
4235                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4236                            )
4237                        })
4238                        .collect();
4239                    (contact.user.github_login.as_str(), worktrees)
4240                })
4241                .collect()
4242        }
4243    }
4244
4245    #[gpui::test(iterations = 100)]
4246    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4247        cx.foreground().forbid_parking();
4248        let max_peers = env::var("MAX_PEERS")
4249            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4250            .unwrap_or(5);
4251        let max_operations = env::var("OPERATIONS")
4252            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4253            .unwrap_or(10);
4254
4255        let rng = Arc::new(Mutex::new(rng));
4256
4257        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4258        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4259
4260        let fs = FakeFs::new(cx.background());
4261        fs.insert_tree(
4262            "/_collab",
4263            json!({
4264                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4265            }),
4266        )
4267        .await;
4268
4269        let operations = Rc::new(Cell::new(0));
4270        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4271        let mut clients = Vec::new();
4272
4273        let mut next_entity_id = 100000;
4274        let mut host_cx = TestAppContext::new(
4275            cx.foreground_platform(),
4276            cx.platform(),
4277            cx.foreground(),
4278            cx.background(),
4279            cx.font_cache(),
4280            next_entity_id,
4281        );
4282        let host = server.create_client(&mut host_cx, "host").await;
4283        let host_project = host_cx.update(|cx| {
4284            Project::local(
4285                host.client.clone(),
4286                host.user_store.clone(),
4287                Arc::new(LanguageRegistry::new()),
4288                fs.clone(),
4289                cx,
4290            )
4291        });
4292        let host_project_id = host_project
4293            .update(&mut host_cx, |p, _| p.next_remote_id())
4294            .await;
4295
4296        let (collab_worktree, _) = host_project
4297            .update(&mut host_cx, |project, cx| {
4298                project.find_or_create_local_worktree("/_collab", false, cx)
4299            })
4300            .await
4301            .unwrap();
4302        collab_worktree
4303            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4304            .await;
4305        host_project
4306            .update(&mut host_cx, |project, cx| project.share(cx))
4307            .await
4308            .unwrap();
4309
4310        clients.push(cx.foreground().spawn(host.simulate_host(
4311            host_project.clone(),
4312            language_server_config,
4313            operations.clone(),
4314            max_operations,
4315            rng.clone(),
4316            host_cx.clone(),
4317        )));
4318
4319        while operations.get() < max_operations {
4320            cx.background().simulate_random_delay().await;
4321            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4322                operations.set(operations.get() + 1);
4323
4324                let guest_id = clients.len();
4325                log::info!("Adding guest {}", guest_id);
4326                next_entity_id += 100000;
4327                let mut guest_cx = TestAppContext::new(
4328                    cx.foreground_platform(),
4329                    cx.platform(),
4330                    cx.foreground(),
4331                    cx.background(),
4332                    cx.font_cache(),
4333                    next_entity_id,
4334                );
4335                let guest = server
4336                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4337                    .await;
4338                let guest_project = Project::remote(
4339                    host_project_id,
4340                    guest.client.clone(),
4341                    guest.user_store.clone(),
4342                    guest_lang_registry.clone(),
4343                    fs.clone(),
4344                    &mut guest_cx.to_async(),
4345                )
4346                .await
4347                .unwrap();
4348                clients.push(cx.foreground().spawn(guest.simulate_guest(
4349                    guest_id,
4350                    guest_project,
4351                    operations.clone(),
4352                    max_operations,
4353                    rng.clone(),
4354                    guest_cx,
4355                )));
4356
4357                log::info!("Guest {} added", guest_id);
4358            }
4359        }
4360
4361        let clients = futures::future::join_all(clients).await;
4362        cx.foreground().run_until_parked();
4363
4364        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4365            project
4366                .worktrees(cx)
4367                .map(|worktree| {
4368                    let snapshot = worktree.read(cx).snapshot();
4369                    (snapshot.id(), snapshot)
4370                })
4371                .collect::<BTreeMap<_, _>>()
4372        });
4373
4374        for (guest_client, guest_cx) in clients.iter().skip(1) {
4375            let guest_id = guest_client.client.id();
4376            let worktree_snapshots =
4377                guest_client
4378                    .project
4379                    .as_ref()
4380                    .unwrap()
4381                    .read_with(guest_cx, |project, cx| {
4382                        project
4383                            .worktrees(cx)
4384                            .map(|worktree| {
4385                                let worktree = worktree.read(cx);
4386                                assert!(
4387                                    !worktree.as_remote().unwrap().has_pending_updates(),
4388                                    "Guest {} worktree {:?} contains deferred updates",
4389                                    guest_id,
4390                                    worktree.id()
4391                                );
4392                                (worktree.id(), worktree.snapshot())
4393                            })
4394                            .collect::<BTreeMap<_, _>>()
4395                    });
4396
4397            assert_eq!(
4398                worktree_snapshots.keys().collect::<Vec<_>>(),
4399                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4400                "guest {} has different worktrees than the host",
4401                guest_id
4402            );
4403            for (id, host_snapshot) in &host_worktree_snapshots {
4404                let guest_snapshot = &worktree_snapshots[id];
4405                assert_eq!(
4406                    guest_snapshot.root_name(),
4407                    host_snapshot.root_name(),
4408                    "guest {} has different root name than the host for worktree {}",
4409                    guest_id,
4410                    id
4411                );
4412                assert_eq!(
4413                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4414                    host_snapshot.entries(false).collect::<Vec<_>>(),
4415                    "guest {} has different snapshot than the host for worktree {}",
4416                    guest_id,
4417                    id
4418                );
4419            }
4420
4421            guest_client
4422                .project
4423                .as_ref()
4424                .unwrap()
4425                .read_with(guest_cx, |project, _| {
4426                    assert!(
4427                        !project.has_buffered_operations(),
4428                        "guest {} has buffered operations ",
4429                        guest_id,
4430                    );
4431                });
4432
4433            for guest_buffer in &guest_client.buffers {
4434                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4435                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4436                    project
4437                        .shared_buffer(guest_client.peer_id, buffer_id)
4438                        .expect(&format!(
4439                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4440                            guest_id, guest_client.peer_id, buffer_id
4441                        ))
4442                });
4443                assert_eq!(
4444                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4445                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4446                    "guest {} buffer {} differs from the host's buffer",
4447                    guest_id,
4448                    buffer_id,
4449                );
4450            }
4451        }
4452    }
4453
4454    struct TestServer {
4455        peer: Arc<Peer>,
4456        app_state: Arc<AppState>,
4457        server: Arc<Server>,
4458        foreground: Rc<executor::Foreground>,
4459        notifications: mpsc::UnboundedReceiver<()>,
4460        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4461        forbid_connections: Arc<AtomicBool>,
4462        _test_db: TestDb,
4463    }
4464
4465    impl TestServer {
4466        async fn start(
4467            foreground: Rc<executor::Foreground>,
4468            background: Arc<executor::Background>,
4469        ) -> Self {
4470            let test_db = TestDb::fake(background);
4471            let app_state = Self::build_app_state(&test_db).await;
4472            let peer = Peer::new();
4473            let notifications = mpsc::unbounded();
4474            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4475            Self {
4476                peer,
4477                app_state,
4478                server,
4479                foreground,
4480                notifications: notifications.1,
4481                connection_killers: Default::default(),
4482                forbid_connections: Default::default(),
4483                _test_db: test_db,
4484            }
4485        }
4486
4487        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4488            let http = FakeHttpClient::with_404_response();
4489            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4490            let client_name = name.to_string();
4491            let mut client = Client::new(http.clone());
4492            let server = self.server.clone();
4493            let connection_killers = self.connection_killers.clone();
4494            let forbid_connections = self.forbid_connections.clone();
4495            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4496
4497            Arc::get_mut(&mut client)
4498                .unwrap()
4499                .override_authenticate(move |cx| {
4500                    cx.spawn(|_| async move {
4501                        let access_token = "the-token".to_string();
4502                        Ok(Credentials {
4503                            user_id: user_id.0 as u64,
4504                            access_token,
4505                        })
4506                    })
4507                })
4508                .override_establish_connection(move |credentials, cx| {
4509                    assert_eq!(credentials.user_id, user_id.0 as u64);
4510                    assert_eq!(credentials.access_token, "the-token");
4511
4512                    let server = server.clone();
4513                    let connection_killers = connection_killers.clone();
4514                    let forbid_connections = forbid_connections.clone();
4515                    let client_name = client_name.clone();
4516                    let connection_id_tx = connection_id_tx.clone();
4517                    cx.spawn(move |cx| async move {
4518                        if forbid_connections.load(SeqCst) {
4519                            Err(EstablishConnectionError::other(anyhow!(
4520                                "server is forbidding connections"
4521                            )))
4522                        } else {
4523                            let (client_conn, server_conn, kill_conn) =
4524                                Connection::in_memory(cx.background());
4525                            connection_killers.lock().insert(user_id, kill_conn);
4526                            cx.background()
4527                                .spawn(server.handle_connection(
4528                                    server_conn,
4529                                    client_name,
4530                                    user_id,
4531                                    Some(connection_id_tx),
4532                                    cx.background(),
4533                                ))
4534                                .detach();
4535                            Ok(client_conn)
4536                        }
4537                    })
4538                });
4539
4540            client
4541                .authenticate_and_connect(&cx.to_async())
4542                .await
4543                .unwrap();
4544
4545            Channel::init(&client);
4546            Project::init(&client);
4547
4548            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4549            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4550            let mut authed_user =
4551                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4552            while authed_user.next().await.unwrap().is_none() {}
4553
4554            TestClient {
4555                client,
4556                peer_id,
4557                user_store,
4558                project: Default::default(),
4559                buffers: Default::default(),
4560            }
4561        }
4562
4563        fn disconnect_client(&self, user_id: UserId) {
4564            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4565                let _ = kill_conn.try_send(Some(()));
4566            }
4567        }
4568
4569        fn forbid_connections(&self) {
4570            self.forbid_connections.store(true, SeqCst);
4571        }
4572
4573        fn allow_connections(&self) {
4574            self.forbid_connections.store(false, SeqCst);
4575        }
4576
4577        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4578            let mut config = Config::default();
4579            config.session_secret = "a".repeat(32);
4580            config.database_url = test_db.url.clone();
4581            let github_client = github::AppClient::test();
4582            Arc::new(AppState {
4583                db: test_db.db().clone(),
4584                handlebars: Default::default(),
4585                auth_client: auth::build_client("", ""),
4586                repo_client: github::RepoClient::test(&github_client),
4587                github_client,
4588                config,
4589            })
4590        }
4591
4592        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4593            self.server.store.read()
4594        }
4595
4596        async fn condition<F>(&mut self, mut predicate: F)
4597        where
4598            F: FnMut(&Store) -> bool,
4599        {
4600            async_std::future::timeout(Duration::from_millis(500), async {
4601                while !(predicate)(&*self.server.store.read()) {
4602                    self.foreground.start_waiting();
4603                    self.notifications.next().await;
4604                    self.foreground.finish_waiting();
4605                }
4606            })
4607            .await
4608            .expect("condition timed out");
4609        }
4610    }
4611
4612    impl Drop for TestServer {
4613        fn drop(&mut self) {
4614            self.peer.reset();
4615        }
4616    }
4617
4618    struct TestClient {
4619        client: Arc<Client>,
4620        pub peer_id: PeerId,
4621        pub user_store: ModelHandle<UserStore>,
4622        project: Option<ModelHandle<Project>>,
4623        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4624    }
4625
4626    impl Deref for TestClient {
4627        type Target = Arc<Client>;
4628
4629        fn deref(&self) -> &Self::Target {
4630            &self.client
4631        }
4632    }
4633
4634    impl TestClient {
4635        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4636            UserId::from_proto(
4637                self.user_store
4638                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4639            )
4640        }
4641
4642        fn simulate_host(
4643            mut self,
4644            project: ModelHandle<Project>,
4645            mut language_server_config: LanguageServerConfig,
4646            operations: Rc<Cell<usize>>,
4647            max_operations: usize,
4648            rng: Arc<Mutex<StdRng>>,
4649            mut cx: TestAppContext,
4650        ) -> impl Future<Output = (Self, TestAppContext)> {
4651            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4652
4653            // Set up a fake language server.
4654            language_server_config.set_fake_initializer({
4655                let rng = rng.clone();
4656                let files = files.clone();
4657                let project = project.clone();
4658                move |fake_server| {
4659                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4660                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4661                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4662                                range: lsp::Range::new(
4663                                    lsp::Position::new(0, 0),
4664                                    lsp::Position::new(0, 0),
4665                                ),
4666                                new_text: "the-new-text".to_string(),
4667                            })),
4668                            ..Default::default()
4669                        }]))
4670                    });
4671
4672                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4673                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4674                            lsp::CodeAction {
4675                                title: "the-code-action".to_string(),
4676                                ..Default::default()
4677                            },
4678                        )])
4679                    });
4680
4681                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4682                        |params, _| {
4683                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4684                                params.position,
4685                                params.position,
4686                            )))
4687                        },
4688                    );
4689
4690                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4691                        let files = files.clone();
4692                        let rng = rng.clone();
4693                        move |_, _| {
4694                            let files = files.lock();
4695                            let mut rng = rng.lock();
4696                            let count = rng.gen_range::<usize, _>(1..3);
4697                            Some(lsp::GotoDefinitionResponse::Array(
4698                                (0..count)
4699                                    .map(|_| {
4700                                        let file = files.choose(&mut *rng).unwrap().as_path();
4701                                        lsp::Location {
4702                                            uri: lsp::Url::from_file_path(file).unwrap(),
4703                                            range: Default::default(),
4704                                        }
4705                                    })
4706                                    .collect(),
4707                            ))
4708                        }
4709                    });
4710
4711                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4712                        let rng = rng.clone();
4713                        let project = project.clone();
4714                        move |params, mut cx| {
4715                            project.update(&mut cx, |project, cx| {
4716                                let path = params
4717                                    .text_document_position_params
4718                                    .text_document
4719                                    .uri
4720                                    .to_file_path()
4721                                    .unwrap();
4722                                let (worktree, relative_path) =
4723                                    project.find_local_worktree(&path, cx)?;
4724                                let project_path =
4725                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4726                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4727
4728                                let mut highlights = Vec::new();
4729                                let highlight_count = rng.lock().gen_range(1..=5);
4730                                let mut prev_end = 0;
4731                                for _ in 0..highlight_count {
4732                                    let range =
4733                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4734                                    let start =
4735                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4736                                    let end =
4737                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4738                                    highlights.push(lsp::DocumentHighlight {
4739                                        range: lsp::Range::new(start, end),
4740                                        kind: Some(lsp::DocumentHighlightKind::READ),
4741                                    });
4742                                    prev_end = range.end;
4743                                }
4744                                Some(highlights)
4745                            })
4746                        }
4747                    });
4748                }
4749            });
4750
4751            project.update(&mut cx, |project, _| {
4752                project.languages().add(Arc::new(Language::new(
4753                    LanguageConfig {
4754                        name: "Rust".into(),
4755                        path_suffixes: vec!["rs".to_string()],
4756                        language_server: Some(language_server_config),
4757                        ..Default::default()
4758                    },
4759                    None,
4760                )));
4761            });
4762
4763            async move {
4764                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4765                while operations.get() < max_operations {
4766                    operations.set(operations.get() + 1);
4767
4768                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4769                    match distribution {
4770                        0..=20 if !files.lock().is_empty() => {
4771                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4772                            let mut path = path.as_path();
4773                            while let Some(parent_path) = path.parent() {
4774                                path = parent_path;
4775                                if rng.lock().gen() {
4776                                    break;
4777                                }
4778                            }
4779
4780                            log::info!("Host: find/create local worktree {:?}", path);
4781                            project
4782                                .update(&mut cx, |project, cx| {
4783                                    project.find_or_create_local_worktree(path, false, cx)
4784                                })
4785                                .await
4786                                .unwrap();
4787                        }
4788                        10..=80 if !files.lock().is_empty() => {
4789                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4790                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4791                                let (worktree, path) = project
4792                                    .update(&mut cx, |project, cx| {
4793                                        project.find_or_create_local_worktree(file, false, cx)
4794                                    })
4795                                    .await
4796                                    .unwrap();
4797                                let project_path =
4798                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4799                                log::info!("Host: opening path {:?}", project_path);
4800                                let buffer = project
4801                                    .update(&mut cx, |project, cx| {
4802                                        project.open_buffer(project_path, cx)
4803                                    })
4804                                    .await
4805                                    .unwrap();
4806                                self.buffers.insert(buffer.clone());
4807                                buffer
4808                            } else {
4809                                self.buffers
4810                                    .iter()
4811                                    .choose(&mut *rng.lock())
4812                                    .unwrap()
4813                                    .clone()
4814                            };
4815
4816                            if rng.lock().gen_bool(0.1) {
4817                                cx.update(|cx| {
4818                                    log::info!(
4819                                        "Host: dropping buffer {:?}",
4820                                        buffer.read(cx).file().unwrap().full_path(cx)
4821                                    );
4822                                    self.buffers.remove(&buffer);
4823                                    drop(buffer);
4824                                });
4825                            } else {
4826                                buffer.update(&mut cx, |buffer, cx| {
4827                                    log::info!(
4828                                        "Host: updating buffer {:?}",
4829                                        buffer.file().unwrap().full_path(cx)
4830                                    );
4831                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4832                                });
4833                            }
4834                        }
4835                        _ => loop {
4836                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4837                            let mut path = PathBuf::new();
4838                            path.push("/");
4839                            for _ in 0..path_component_count {
4840                                let letter = rng.lock().gen_range(b'a'..=b'z');
4841                                path.push(std::str::from_utf8(&[letter]).unwrap());
4842                            }
4843                            path.set_extension("rs");
4844                            let parent_path = path.parent().unwrap();
4845
4846                            log::info!("Host: creating file {:?}", path,);
4847
4848                            if fs.create_dir(&parent_path).await.is_ok()
4849                                && fs.create_file(&path, Default::default()).await.is_ok()
4850                            {
4851                                files.lock().push(path);
4852                                break;
4853                            } else {
4854                                log::info!("Host: cannot create file");
4855                            }
4856                        },
4857                    }
4858
4859                    cx.background().simulate_random_delay().await;
4860                }
4861
4862                self.project = Some(project);
4863                (self, cx)
4864            }
4865        }
4866
4867        pub async fn simulate_guest(
4868            mut self,
4869            guest_id: usize,
4870            project: ModelHandle<Project>,
4871            operations: Rc<Cell<usize>>,
4872            max_operations: usize,
4873            rng: Arc<Mutex<StdRng>>,
4874            mut cx: TestAppContext,
4875        ) -> (Self, TestAppContext) {
4876            while operations.get() < max_operations {
4877                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4878                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4879                        project
4880                            .worktrees(&cx)
4881                            .filter(|worktree| {
4882                                worktree.read(cx).entries(false).any(|e| e.is_file())
4883                            })
4884                            .choose(&mut *rng.lock())
4885                    }) {
4886                        worktree
4887                    } else {
4888                        cx.background().simulate_random_delay().await;
4889                        continue;
4890                    };
4891
4892                    operations.set(operations.get() + 1);
4893                    let project_path = worktree.read_with(&cx, |worktree, _| {
4894                        let entry = worktree
4895                            .entries(false)
4896                            .filter(|e| e.is_file())
4897                            .choose(&mut *rng.lock())
4898                            .unwrap();
4899                        (worktree.id(), entry.path.clone())
4900                    });
4901                    log::info!("Guest {}: opening path {:?}", guest_id, project_path);
4902                    let buffer = project
4903                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
4904                        .await
4905                        .unwrap();
4906                    self.buffers.insert(buffer.clone());
4907                    buffer
4908                } else {
4909                    operations.set(operations.get() + 1);
4910
4911                    self.buffers
4912                        .iter()
4913                        .choose(&mut *rng.lock())
4914                        .unwrap()
4915                        .clone()
4916                };
4917
4918                let choice = rng.lock().gen_range(0..100);
4919                match choice {
4920                    0..=9 => {
4921                        cx.update(|cx| {
4922                            log::info!(
4923                                "Guest {}: dropping buffer {:?}",
4924                                guest_id,
4925                                buffer.read(cx).file().unwrap().full_path(cx)
4926                            );
4927                            self.buffers.remove(&buffer);
4928                            drop(buffer);
4929                        });
4930                    }
4931                    10..=19 => {
4932                        let completions = project.update(&mut cx, |project, cx| {
4933                            log::info!(
4934                                "Guest {}: requesting completions for buffer {:?}",
4935                                guest_id,
4936                                buffer.read(cx).file().unwrap().full_path(cx)
4937                            );
4938                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4939                            project.completions(&buffer, offset, cx)
4940                        });
4941                        let completions = cx.background().spawn(async move {
4942                            completions.await.expect("completions request failed");
4943                        });
4944                        if rng.lock().gen_bool(0.3) {
4945                            log::info!("Guest {}: detaching completions request", guest_id);
4946                            completions.detach();
4947                        } else {
4948                            completions.await;
4949                        }
4950                    }
4951                    20..=29 => {
4952                        let code_actions = project.update(&mut cx, |project, cx| {
4953                            log::info!(
4954                                "Guest {}: requesting code actions for buffer {:?}",
4955                                guest_id,
4956                                buffer.read(cx).file().unwrap().full_path(cx)
4957                            );
4958                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4959                            project.code_actions(&buffer, range, cx)
4960                        });
4961                        let code_actions = cx.background().spawn(async move {
4962                            code_actions.await.expect("code actions request failed");
4963                        });
4964                        if rng.lock().gen_bool(0.3) {
4965                            log::info!("Guest {}: detaching code actions request", guest_id);
4966                            code_actions.detach();
4967                        } else {
4968                            code_actions.await;
4969                        }
4970                    }
4971                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4972                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4973                            log::info!(
4974                                "Guest {}: saving buffer {:?}",
4975                                guest_id,
4976                                buffer.file().unwrap().full_path(cx)
4977                            );
4978                            (buffer.version(), buffer.save(cx))
4979                        });
4980                        let save = cx.spawn(|cx| async move {
4981                            let (saved_version, _) = save.await.expect("save request failed");
4982                            buffer.read_with(&cx, |buffer, _| {
4983                                assert!(buffer.version().observed_all(&saved_version));
4984                                assert!(saved_version.observed_all(&requested_version));
4985                            });
4986                        });
4987                        if rng.lock().gen_bool(0.3) {
4988                            log::info!("Guest {}: detaching save request", guest_id);
4989                            save.detach();
4990                        } else {
4991                            save.await;
4992                        }
4993                    }
4994                    40..=45 => {
4995                        let prepare_rename = project.update(&mut cx, |project, cx| {
4996                            log::info!(
4997                                "Guest {}: preparing rename for buffer {:?}",
4998                                guest_id,
4999                                buffer.read(cx).file().unwrap().full_path(cx)
5000                            );
5001                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5002                            project.prepare_rename(buffer, offset, cx)
5003                        });
5004                        let prepare_rename = cx.background().spawn(async move {
5005                            prepare_rename.await.expect("prepare rename request failed");
5006                        });
5007                        if rng.lock().gen_bool(0.3) {
5008                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5009                            prepare_rename.detach();
5010                        } else {
5011                            prepare_rename.await;
5012                        }
5013                    }
5014                    46..=49 => {
5015                        let definitions = project.update(&mut cx, |project, cx| {
5016                            log::info!(
5017                                "Guest {}: requesting defintions for buffer {:?}",
5018                                guest_id,
5019                                buffer.read(cx).file().unwrap().full_path(cx)
5020                            );
5021                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5022                            project.definition(&buffer, offset, cx)
5023                        });
5024                        let definitions = cx.background().spawn(async move {
5025                            definitions.await.expect("definitions request failed");
5026                        });
5027                        if rng.lock().gen_bool(0.3) {
5028                            log::info!("Guest {}: detaching definitions request", guest_id);
5029                            definitions.detach();
5030                        } else {
5031                            definitions.await;
5032                        }
5033                    }
5034                    50..=55 => {
5035                        let highlights = project.update(&mut cx, |project, cx| {
5036                            log::info!(
5037                                "Guest {}: requesting highlights for buffer {:?}",
5038                                guest_id,
5039                                buffer.read(cx).file().unwrap().full_path(cx)
5040                            );
5041                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5042                            project.document_highlights(&buffer, offset, cx)
5043                        });
5044                        let highlights = cx.background().spawn(async move {
5045                            highlights.await.expect("highlights request failed");
5046                        });
5047                        if rng.lock().gen_bool(0.3) {
5048                            log::info!("Guest {}: detaching highlights request", guest_id);
5049                            highlights.detach();
5050                        } else {
5051                            highlights.await;
5052                        }
5053                    }
5054                    _ => {
5055                        buffer.update(&mut cx, |buffer, cx| {
5056                            log::info!(
5057                                "Guest {}: updating buffer {:?}",
5058                                guest_id,
5059                                buffer.file().unwrap().full_path(cx)
5060                            );
5061                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5062                        });
5063                    }
5064                }
5065                cx.background().simulate_random_delay().await;
5066            }
5067
5068            self.project = Some(project);
5069            (self, cx)
5070        }
5071    }
5072
5073    impl Executor for Arc<gpui::executor::Background> {
5074        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5075            self.spawn(future).detach();
5076        }
5077    }
5078
5079    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5080        channel
5081            .messages()
5082            .cursor::<()>()
5083            .map(|m| {
5084                (
5085                    m.sender.github_login.clone(),
5086                    m.body.clone(),
5087                    m.is_pending(),
5088                )
5089            })
5090            .collect()
5091    }
5092
5093    struct EmptyView;
5094
5095    impl gpui::Entity for EmptyView {
5096        type Event = ();
5097    }
5098
5099    impl gpui::View for EmptyView {
5100        fn ui_name() -> &'static str {
5101            "empty view"
5102        }
5103
5104        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5105            gpui::Element::boxed(gpui::elements::Empty)
5106        }
5107    }
5108}