rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use rpc::{
  15    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  16    Connection, ConnectionId, Peer, TypedEnvelope,
  17};
  18use sha1::{Digest as _, Sha1};
  19use std::{any::TypeId, future::Future, sync::Arc, time::Instant};
  20use store::{Store, Worktree};
  21use surf::StatusCode;
  22use tide::log;
  23use tide::{
  24    http::headers::{HeaderName, CONNECTION, UPGRADE},
  25    Request, Response,
  26};
  27use time::OffsetDateTime;
  28
  29type MessageHandler = Box<
  30    dyn Send
  31        + Sync
  32        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  33>;
  34
  35pub struct Server {
  36    peer: Arc<Peer>,
  37    store: RwLock<Store>,
  38    app_state: Arc<AppState>,
  39    handlers: HashMap<TypeId, MessageHandler>,
  40    notifications: Option<mpsc::UnboundedSender<()>>,
  41}
  42
  43pub trait Executor {
  44    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  45}
  46
  47pub struct RealExecutor;
  48
  49const MESSAGE_COUNT_PER_PAGE: usize = 100;
  50const MAX_MESSAGE_LEN: usize = 1024;
  51
  52impl Server {
  53    pub fn new(
  54        app_state: Arc<AppState>,
  55        peer: Arc<Peer>,
  56        notifications: Option<mpsc::UnboundedSender<()>>,
  57    ) -> Arc<Self> {
  58        let mut server = Self {
  59            peer,
  60            app_state,
  61            store: Default::default(),
  62            handlers: Default::default(),
  63            notifications,
  64        };
  65
  66        server
  67            .add_request_handler(Server::ping)
  68            .add_request_handler(Server::register_project)
  69            .add_message_handler(Server::unregister_project)
  70            .add_request_handler(Server::share_project)
  71            .add_message_handler(Server::unshare_project)
  72            .add_request_handler(Server::join_project)
  73            .add_message_handler(Server::leave_project)
  74            .add_request_handler(Server::register_worktree)
  75            .add_message_handler(Server::unregister_worktree)
  76            .add_request_handler(Server::update_worktree)
  77            .add_message_handler(Server::update_diagnostic_summary)
  78            .add_message_handler(Server::disk_based_diagnostics_updating)
  79            .add_message_handler(Server::disk_based_diagnostics_updated)
  80            .add_request_handler(Server::get_definition)
  81            .add_request_handler(Server::get_references)
  82            .add_request_handler(Server::search_project)
  83            .add_request_handler(Server::get_document_highlights)
  84            .add_request_handler(Server::get_project_symbols)
  85            .add_request_handler(Server::open_buffer_for_symbol)
  86            .add_request_handler(Server::open_buffer)
  87            .add_message_handler(Server::close_buffer)
  88            .add_request_handler(Server::update_buffer)
  89            .add_message_handler(Server::update_buffer_file)
  90            .add_message_handler(Server::buffer_reloaded)
  91            .add_message_handler(Server::buffer_saved)
  92            .add_request_handler(Server::save_buffer)
  93            .add_request_handler(Server::format_buffers)
  94            .add_request_handler(Server::get_completions)
  95            .add_request_handler(Server::apply_additional_edits_for_completion)
  96            .add_request_handler(Server::get_code_actions)
  97            .add_request_handler(Server::apply_code_action)
  98            .add_request_handler(Server::prepare_rename)
  99            .add_request_handler(Server::perform_rename)
 100            .add_request_handler(Server::get_channels)
 101            .add_request_handler(Server::get_users)
 102            .add_request_handler(Server::join_channel)
 103            .add_message_handler(Server::leave_channel)
 104            .add_request_handler(Server::send_channel_message)
 105            .add_request_handler(Server::get_channel_messages);
 106
 107        Arc::new(server)
 108    }
 109
 110    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 111    where
 112        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 113        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 114        M: EnvelopedMessage,
 115    {
 116        let prev_handler = self.handlers.insert(
 117            TypeId::of::<M>(),
 118            Box::new(move |server, envelope| {
 119                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 120                (handler)(server, *envelope).boxed()
 121            }),
 122        );
 123        if prev_handler.is_some() {
 124            panic!("registered a handler for the same message twice");
 125        }
 126        self
 127    }
 128
 129    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 130    where
 131        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 132        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 133        M: RequestMessage,
 134    {
 135        self.add_message_handler(move |server, envelope| {
 136            let receipt = envelope.receipt();
 137            let response = (handler)(server.clone(), envelope);
 138            async move {
 139                match response.await {
 140                    Ok(response) => {
 141                        server.peer.respond(receipt, response)?;
 142                        Ok(())
 143                    }
 144                    Err(error) => {
 145                        server.peer.respond_with_error(
 146                            receipt,
 147                            proto::Error {
 148                                message: error.to_string(),
 149                            },
 150                        )?;
 151                        Err(error)
 152                    }
 153                }
 154            }
 155        })
 156    }
 157
 158    pub fn handle_connection<E: Executor>(
 159        self: &Arc<Self>,
 160        connection: Connection,
 161        addr: String,
 162        user_id: UserId,
 163        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 164        executor: E,
 165    ) -> impl Future<Output = ()> {
 166        let mut this = self.clone();
 167        async move {
 168            let (connection_id, handle_io, mut incoming_rx) =
 169                this.peer.add_connection(connection).await;
 170
 171            if let Some(send_connection_id) = send_connection_id.as_mut() {
 172                let _ = send_connection_id.send(connection_id).await;
 173            }
 174
 175            this.state_mut().add_connection(connection_id, user_id);
 176            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 177                log::error!("error updating contacts for {:?}: {}", user_id, err);
 178            }
 179
 180            let handle_io = handle_io.fuse();
 181            futures::pin_mut!(handle_io);
 182            loop {
 183                let next_message = incoming_rx.next().fuse();
 184                futures::pin_mut!(next_message);
 185                futures::select_biased! {
 186                    result = handle_io => {
 187                        if let Err(err) = result {
 188                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 189                        }
 190                        break;
 191                    }
 192                    message = next_message => {
 193                        if let Some(message) = message {
 194                            let start_time = Instant::now();
 195                            let type_name = message.payload_type_name();
 196                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 197                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 198                                let notifications = this.notifications.clone();
 199                                let is_background = message.is_background();
 200                                let handle_message = (handler)(this.clone(), message);
 201                                let handle_message = async move {
 202                                    if let Err(err) = handle_message.await {
 203                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 204                                    } else {
 205                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 206                                    }
 207                                    if let Some(mut notifications) = notifications {
 208                                        let _ = notifications.send(()).await;
 209                                    }
 210                                };
 211                                if is_background {
 212                                    executor.spawn_detached(handle_message);
 213                                } else {
 214                                    handle_message.await;
 215                                }
 216                            } else {
 217                                log::warn!("unhandled message: {}", type_name);
 218                            }
 219                        } else {
 220                            log::info!("rpc connection closed {:?}", addr);
 221                            break;
 222                        }
 223                    }
 224                }
 225            }
 226
 227            if let Err(err) = this.sign_out(connection_id).await {
 228                log::error!("error signing out connection {:?} - {:?}", addr, err);
 229            }
 230        }
 231    }
 232
 233    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 234        self.peer.disconnect(connection_id);
 235        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 236
 237        for (project_id, project) in removed_connection.hosted_projects {
 238            if let Some(share) = project.share {
 239                broadcast(
 240                    connection_id,
 241                    share.guests.keys().copied().collect(),
 242                    |conn_id| {
 243                        self.peer
 244                            .send(conn_id, proto::UnshareProject { project_id })
 245                    },
 246                )?;
 247            }
 248        }
 249
 250        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 251            broadcast(connection_id, peer_ids, |conn_id| {
 252                self.peer.send(
 253                    conn_id,
 254                    proto::RemoveProjectCollaborator {
 255                        project_id,
 256                        peer_id: connection_id.0,
 257                    },
 258                )
 259            })?;
 260        }
 261
 262        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 263        Ok(())
 264    }
 265
 266    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 267        Ok(proto::Ack {})
 268    }
 269
 270    async fn register_project(
 271        mut self: Arc<Server>,
 272        request: TypedEnvelope<proto::RegisterProject>,
 273    ) -> tide::Result<proto::RegisterProjectResponse> {
 274        let project_id = {
 275            let mut state = self.state_mut();
 276            let user_id = state.user_id_for_connection(request.sender_id)?;
 277            state.register_project(request.sender_id, user_id)
 278        };
 279        Ok(proto::RegisterProjectResponse { project_id })
 280    }
 281
 282    async fn unregister_project(
 283        mut self: Arc<Server>,
 284        request: TypedEnvelope<proto::UnregisterProject>,
 285    ) -> tide::Result<()> {
 286        let project = self
 287            .state_mut()
 288            .unregister_project(request.payload.project_id, request.sender_id)?;
 289        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 290        Ok(())
 291    }
 292
 293    async fn share_project(
 294        mut self: Arc<Server>,
 295        request: TypedEnvelope<proto::ShareProject>,
 296    ) -> tide::Result<proto::Ack> {
 297        self.state_mut()
 298            .share_project(request.payload.project_id, request.sender_id);
 299        Ok(proto::Ack {})
 300    }
 301
 302    async fn unshare_project(
 303        mut self: Arc<Server>,
 304        request: TypedEnvelope<proto::UnshareProject>,
 305    ) -> tide::Result<()> {
 306        let project_id = request.payload.project_id;
 307        let project = self
 308            .state_mut()
 309            .unshare_project(project_id, request.sender_id)?;
 310
 311        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 312            self.peer
 313                .send(conn_id, proto::UnshareProject { project_id })
 314        })?;
 315        self.update_contacts_for_users(&project.authorized_user_ids)?;
 316        Ok(())
 317    }
 318
 319    async fn join_project(
 320        mut self: Arc<Server>,
 321        request: TypedEnvelope<proto::JoinProject>,
 322    ) -> tide::Result<proto::JoinProjectResponse> {
 323        let project_id = request.payload.project_id;
 324
 325        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 326        let (response, connection_ids, contact_user_ids) = self
 327            .state_mut()
 328            .join_project(request.sender_id, user_id, project_id)
 329            .and_then(|joined| {
 330                let share = joined.project.share()?;
 331                let peer_count = share.guests.len();
 332                let mut collaborators = Vec::with_capacity(peer_count);
 333                collaborators.push(proto::Collaborator {
 334                    peer_id: joined.project.host_connection_id.0,
 335                    replica_id: 0,
 336                    user_id: joined.project.host_user_id.to_proto(),
 337                });
 338                let worktrees = share
 339                    .worktrees
 340                    .iter()
 341                    .filter_map(|(id, shared_worktree)| {
 342                        let worktree = joined.project.worktrees.get(&id)?;
 343                        Some(proto::Worktree {
 344                            id: *id,
 345                            root_name: worktree.root_name.clone(),
 346                            entries: shared_worktree.entries.values().cloned().collect(),
 347                            diagnostic_summaries: shared_worktree
 348                                .diagnostic_summaries
 349                                .values()
 350                                .cloned()
 351                                .collect(),
 352                            weak: worktree.weak,
 353                        })
 354                    })
 355                    .collect();
 356                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 357                    if *peer_conn_id != request.sender_id {
 358                        collaborators.push(proto::Collaborator {
 359                            peer_id: peer_conn_id.0,
 360                            replica_id: *peer_replica_id as u32,
 361                            user_id: peer_user_id.to_proto(),
 362                        });
 363                    }
 364                }
 365                let response = proto::JoinProjectResponse {
 366                    worktrees,
 367                    replica_id: joined.replica_id as u32,
 368                    collaborators,
 369                };
 370                let connection_ids = joined.project.connection_ids();
 371                let contact_user_ids = joined.project.authorized_user_ids();
 372                Ok((response, connection_ids, contact_user_ids))
 373            })?;
 374
 375        broadcast(request.sender_id, connection_ids, |conn_id| {
 376            self.peer.send(
 377                conn_id,
 378                proto::AddProjectCollaborator {
 379                    project_id,
 380                    collaborator: Some(proto::Collaborator {
 381                        peer_id: request.sender_id.0,
 382                        replica_id: response.replica_id,
 383                        user_id: user_id.to_proto(),
 384                    }),
 385                },
 386            )
 387        })?;
 388        self.update_contacts_for_users(&contact_user_ids)?;
 389        Ok(response)
 390    }
 391
 392    async fn leave_project(
 393        mut self: Arc<Server>,
 394        request: TypedEnvelope<proto::LeaveProject>,
 395    ) -> tide::Result<()> {
 396        let sender_id = request.sender_id;
 397        let project_id = request.payload.project_id;
 398        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 399
 400        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 401            self.peer.send(
 402                conn_id,
 403                proto::RemoveProjectCollaborator {
 404                    project_id,
 405                    peer_id: sender_id.0,
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 410
 411        Ok(())
 412    }
 413
 414    async fn register_worktree(
 415        mut self: Arc<Server>,
 416        request: TypedEnvelope<proto::RegisterWorktree>,
 417    ) -> tide::Result<proto::Ack> {
 418        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 419
 420        let mut contact_user_ids = HashSet::default();
 421        contact_user_ids.insert(host_user_id);
 422        for github_login in &request.payload.authorized_logins {
 423            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 424            contact_user_ids.insert(contact_user_id);
 425        }
 426
 427        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 428        let guest_connection_ids;
 429        {
 430            let mut state = self.state_mut();
 431            guest_connection_ids = state
 432                .read_project(request.payload.project_id, request.sender_id)?
 433                .guest_connection_ids();
 434            state.register_worktree(
 435                request.payload.project_id,
 436                request.payload.worktree_id,
 437                request.sender_id,
 438                Worktree {
 439                    authorized_user_ids: contact_user_ids.clone(),
 440                    root_name: request.payload.root_name.clone(),
 441                    weak: request.payload.weak,
 442                },
 443            )?;
 444        }
 445        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 446            self.peer
 447                .forward_send(request.sender_id, connection_id, request.payload.clone())
 448        })?;
 449        self.update_contacts_for_users(&contact_user_ids)?;
 450        Ok(proto::Ack {})
 451    }
 452
 453    async fn unregister_worktree(
 454        mut self: Arc<Server>,
 455        request: TypedEnvelope<proto::UnregisterWorktree>,
 456    ) -> tide::Result<()> {
 457        let project_id = request.payload.project_id;
 458        let worktree_id = request.payload.worktree_id;
 459        let (worktree, guest_connection_ids) =
 460            self.state_mut()
 461                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 462        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 463            self.peer.send(
 464                conn_id,
 465                proto::UnregisterWorktree {
 466                    project_id,
 467                    worktree_id,
 468                },
 469            )
 470        })?;
 471        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 472        Ok(())
 473    }
 474
 475    async fn update_worktree(
 476        mut self: Arc<Server>,
 477        request: TypedEnvelope<proto::UpdateWorktree>,
 478    ) -> tide::Result<proto::Ack> {
 479        let connection_ids = self.state_mut().update_worktree(
 480            request.sender_id,
 481            request.payload.project_id,
 482            request.payload.worktree_id,
 483            &request.payload.removed_entries,
 484            &request.payload.updated_entries,
 485        )?;
 486
 487        broadcast(request.sender_id, connection_ids, |connection_id| {
 488            self.peer
 489                .forward_send(request.sender_id, connection_id, request.payload.clone())
 490        })?;
 491
 492        Ok(proto::Ack {})
 493    }
 494
 495    async fn update_diagnostic_summary(
 496        mut self: Arc<Server>,
 497        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 498    ) -> tide::Result<()> {
 499        let summary = request
 500            .payload
 501            .summary
 502            .clone()
 503            .ok_or_else(|| anyhow!("invalid summary"))?;
 504        let receiver_ids = self.state_mut().update_diagnostic_summary(
 505            request.payload.project_id,
 506            request.payload.worktree_id,
 507            request.sender_id,
 508            summary,
 509        )?;
 510
 511        broadcast(request.sender_id, receiver_ids, |connection_id| {
 512            self.peer
 513                .forward_send(request.sender_id, connection_id, request.payload.clone())
 514        })?;
 515        Ok(())
 516    }
 517
 518    async fn disk_based_diagnostics_updating(
 519        self: Arc<Server>,
 520        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 521    ) -> tide::Result<()> {
 522        let receiver_ids = self
 523            .state()
 524            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 525        broadcast(request.sender_id, receiver_ids, |connection_id| {
 526            self.peer
 527                .forward_send(request.sender_id, connection_id, request.payload.clone())
 528        })?;
 529        Ok(())
 530    }
 531
 532    async fn disk_based_diagnostics_updated(
 533        self: Arc<Server>,
 534        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 535    ) -> tide::Result<()> {
 536        let receiver_ids = self
 537            .state()
 538            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 539        broadcast(request.sender_id, receiver_ids, |connection_id| {
 540            self.peer
 541                .forward_send(request.sender_id, connection_id, request.payload.clone())
 542        })?;
 543        Ok(())
 544    }
 545
 546    async fn get_definition(
 547        self: Arc<Server>,
 548        request: TypedEnvelope<proto::GetDefinition>,
 549    ) -> tide::Result<proto::GetDefinitionResponse> {
 550        let host_connection_id = self
 551            .state()
 552            .read_project(request.payload.project_id, request.sender_id)?
 553            .host_connection_id;
 554        Ok(self
 555            .peer
 556            .forward_request(request.sender_id, host_connection_id, request.payload)
 557            .await?)
 558    }
 559
 560    async fn get_references(
 561        self: Arc<Server>,
 562        request: TypedEnvelope<proto::GetReferences>,
 563    ) -> tide::Result<proto::GetReferencesResponse> {
 564        let host_connection_id = self
 565            .state()
 566            .read_project(request.payload.project_id, request.sender_id)?
 567            .host_connection_id;
 568        Ok(self
 569            .peer
 570            .forward_request(request.sender_id, host_connection_id, request.payload)
 571            .await?)
 572    }
 573
 574    async fn search_project(
 575        self: Arc<Server>,
 576        request: TypedEnvelope<proto::SearchProject>,
 577    ) -> tide::Result<proto::SearchProjectResponse> {
 578        let host_connection_id = self
 579            .state()
 580            .read_project(request.payload.project_id, request.sender_id)?
 581            .host_connection_id;
 582        Ok(self
 583            .peer
 584            .forward_request(request.sender_id, host_connection_id, request.payload)
 585            .await?)
 586    }
 587
 588    async fn get_document_highlights(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::GetDocumentHighlights>,
 591    ) -> tide::Result<proto::GetDocumentHighlightsResponse> {
 592        let host_connection_id = self
 593            .state()
 594            .read_project(request.payload.project_id, request.sender_id)?
 595            .host_connection_id;
 596        Ok(self
 597            .peer
 598            .forward_request(request.sender_id, host_connection_id, request.payload)
 599            .await?)
 600    }
 601
 602    async fn get_project_symbols(
 603        self: Arc<Server>,
 604        request: TypedEnvelope<proto::GetProjectSymbols>,
 605    ) -> tide::Result<proto::GetProjectSymbolsResponse> {
 606        let host_connection_id = self
 607            .state()
 608            .read_project(request.payload.project_id, request.sender_id)?
 609            .host_connection_id;
 610        Ok(self
 611            .peer
 612            .forward_request(request.sender_id, host_connection_id, request.payload)
 613            .await?)
 614    }
 615
 616    async fn open_buffer_for_symbol(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::OpenBufferForSymbol>,
 619    ) -> tide::Result<proto::OpenBufferForSymbolResponse> {
 620        let host_connection_id = self
 621            .state()
 622            .read_project(request.payload.project_id, request.sender_id)?
 623            .host_connection_id;
 624        Ok(self
 625            .peer
 626            .forward_request(request.sender_id, host_connection_id, request.payload)
 627            .await?)
 628    }
 629
 630    async fn open_buffer(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::OpenBuffer>,
 633    ) -> tide::Result<proto::OpenBufferResponse> {
 634        let host_connection_id = self
 635            .state()
 636            .read_project(request.payload.project_id, request.sender_id)?
 637            .host_connection_id;
 638        Ok(self
 639            .peer
 640            .forward_request(request.sender_id, host_connection_id, request.payload)
 641            .await?)
 642    }
 643
 644    async fn close_buffer(
 645        self: Arc<Server>,
 646        request: TypedEnvelope<proto::CloseBuffer>,
 647    ) -> tide::Result<()> {
 648        let host_connection_id = self
 649            .state()
 650            .read_project(request.payload.project_id, request.sender_id)?
 651            .host_connection_id;
 652        self.peer
 653            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 654        Ok(())
 655    }
 656
 657    async fn save_buffer(
 658        self: Arc<Server>,
 659        request: TypedEnvelope<proto::SaveBuffer>,
 660    ) -> tide::Result<proto::BufferSaved> {
 661        let host;
 662        let mut guests;
 663        {
 664            let state = self.state();
 665            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 666            host = project.host_connection_id;
 667            guests = project.guest_connection_ids()
 668        }
 669
 670        let response = self
 671            .peer
 672            .forward_request(request.sender_id, host, request.payload.clone())
 673            .await?;
 674
 675        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 676        broadcast(host, guests, |conn_id| {
 677            self.peer.forward_send(host, conn_id, response.clone())
 678        })?;
 679
 680        Ok(response)
 681    }
 682
 683    async fn format_buffers(
 684        self: Arc<Server>,
 685        request: TypedEnvelope<proto::FormatBuffers>,
 686    ) -> tide::Result<proto::FormatBuffersResponse> {
 687        let host = self
 688            .state()
 689            .read_project(request.payload.project_id, request.sender_id)?
 690            .host_connection_id;
 691        Ok(self
 692            .peer
 693            .forward_request(request.sender_id, host, request.payload.clone())
 694            .await?)
 695    }
 696
 697    async fn get_completions(
 698        self: Arc<Server>,
 699        request: TypedEnvelope<proto::GetCompletions>,
 700    ) -> tide::Result<proto::GetCompletionsResponse> {
 701        let host = self
 702            .state()
 703            .read_project(request.payload.project_id, request.sender_id)?
 704            .host_connection_id;
 705        Ok(self
 706            .peer
 707            .forward_request(request.sender_id, host, request.payload.clone())
 708            .await?)
 709    }
 710
 711    async fn apply_additional_edits_for_completion(
 712        self: Arc<Server>,
 713        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 714    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 715        let host = self
 716            .state()
 717            .read_project(request.payload.project_id, request.sender_id)?
 718            .host_connection_id;
 719        Ok(self
 720            .peer
 721            .forward_request(request.sender_id, host, request.payload.clone())
 722            .await?)
 723    }
 724
 725    async fn get_code_actions(
 726        self: Arc<Server>,
 727        request: TypedEnvelope<proto::GetCodeActions>,
 728    ) -> tide::Result<proto::GetCodeActionsResponse> {
 729        let host = self
 730            .state()
 731            .read_project(request.payload.project_id, request.sender_id)?
 732            .host_connection_id;
 733        Ok(self
 734            .peer
 735            .forward_request(request.sender_id, host, request.payload.clone())
 736            .await?)
 737    }
 738
 739    async fn apply_code_action(
 740        self: Arc<Server>,
 741        request: TypedEnvelope<proto::ApplyCodeAction>,
 742    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 743        let host = self
 744            .state()
 745            .read_project(request.payload.project_id, request.sender_id)?
 746            .host_connection_id;
 747        Ok(self
 748            .peer
 749            .forward_request(request.sender_id, host, request.payload.clone())
 750            .await?)
 751    }
 752
 753    async fn prepare_rename(
 754        self: Arc<Server>,
 755        request: TypedEnvelope<proto::PrepareRename>,
 756    ) -> tide::Result<proto::PrepareRenameResponse> {
 757        let host = self
 758            .state()
 759            .read_project(request.payload.project_id, request.sender_id)?
 760            .host_connection_id;
 761        Ok(self
 762            .peer
 763            .forward_request(request.sender_id, host, request.payload.clone())
 764            .await?)
 765    }
 766
 767    async fn perform_rename(
 768        self: Arc<Server>,
 769        request: TypedEnvelope<proto::PerformRename>,
 770    ) -> tide::Result<proto::PerformRenameResponse> {
 771        let host = self
 772            .state()
 773            .read_project(request.payload.project_id, request.sender_id)?
 774            .host_connection_id;
 775        Ok(self
 776            .peer
 777            .forward_request(request.sender_id, host, request.payload.clone())
 778            .await?)
 779    }
 780
 781    async fn update_buffer(
 782        self: Arc<Server>,
 783        request: TypedEnvelope<proto::UpdateBuffer>,
 784    ) -> tide::Result<proto::Ack> {
 785        let receiver_ids = self
 786            .state()
 787            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 788        broadcast(request.sender_id, receiver_ids, |connection_id| {
 789            self.peer
 790                .forward_send(request.sender_id, connection_id, request.payload.clone())
 791        })?;
 792        Ok(proto::Ack {})
 793    }
 794
 795    async fn update_buffer_file(
 796        self: Arc<Server>,
 797        request: TypedEnvelope<proto::UpdateBufferFile>,
 798    ) -> tide::Result<()> {
 799        let receiver_ids = self
 800            .state()
 801            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 802        broadcast(request.sender_id, receiver_ids, |connection_id| {
 803            self.peer
 804                .forward_send(request.sender_id, connection_id, request.payload.clone())
 805        })?;
 806        Ok(())
 807    }
 808
 809    async fn buffer_reloaded(
 810        self: Arc<Server>,
 811        request: TypedEnvelope<proto::BufferReloaded>,
 812    ) -> tide::Result<()> {
 813        let receiver_ids = self
 814            .state()
 815            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 816        broadcast(request.sender_id, receiver_ids, |connection_id| {
 817            self.peer
 818                .forward_send(request.sender_id, connection_id, request.payload.clone())
 819        })?;
 820        Ok(())
 821    }
 822
 823    async fn buffer_saved(
 824        self: Arc<Server>,
 825        request: TypedEnvelope<proto::BufferSaved>,
 826    ) -> tide::Result<()> {
 827        let receiver_ids = self
 828            .state()
 829            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 830        broadcast(request.sender_id, receiver_ids, |connection_id| {
 831            self.peer
 832                .forward_send(request.sender_id, connection_id, request.payload.clone())
 833        })?;
 834        Ok(())
 835    }
 836
 837    async fn get_channels(
 838        self: Arc<Server>,
 839        request: TypedEnvelope<proto::GetChannels>,
 840    ) -> tide::Result<proto::GetChannelsResponse> {
 841        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 842        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 843        Ok(proto::GetChannelsResponse {
 844            channels: channels
 845                .into_iter()
 846                .map(|chan| proto::Channel {
 847                    id: chan.id.to_proto(),
 848                    name: chan.name,
 849                })
 850                .collect(),
 851        })
 852    }
 853
 854    async fn get_users(
 855        self: Arc<Server>,
 856        request: TypedEnvelope<proto::GetUsers>,
 857    ) -> tide::Result<proto::GetUsersResponse> {
 858        let user_ids = request
 859            .payload
 860            .user_ids
 861            .into_iter()
 862            .map(UserId::from_proto)
 863            .collect();
 864        let users = self
 865            .app_state
 866            .db
 867            .get_users_by_ids(user_ids)
 868            .await?
 869            .into_iter()
 870            .map(|user| proto::User {
 871                id: user.id.to_proto(),
 872                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 873                github_login: user.github_login,
 874            })
 875            .collect();
 876        Ok(proto::GetUsersResponse { users })
 877    }
 878
 879    fn update_contacts_for_users<'a>(
 880        self: &Arc<Server>,
 881        user_ids: impl IntoIterator<Item = &'a UserId>,
 882    ) -> anyhow::Result<()> {
 883        let mut result = Ok(());
 884        let state = self.state();
 885        for user_id in user_ids {
 886            let contacts = state.contacts_for_user(*user_id);
 887            for connection_id in state.connection_ids_for_user(*user_id) {
 888                if let Err(error) = self.peer.send(
 889                    connection_id,
 890                    proto::UpdateContacts {
 891                        contacts: contacts.clone(),
 892                    },
 893                ) {
 894                    result = Err(error);
 895                }
 896            }
 897        }
 898        result
 899    }
 900
 901    async fn join_channel(
 902        mut self: Arc<Self>,
 903        request: TypedEnvelope<proto::JoinChannel>,
 904    ) -> tide::Result<proto::JoinChannelResponse> {
 905        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 906        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 907        if !self
 908            .app_state
 909            .db
 910            .can_user_access_channel(user_id, channel_id)
 911            .await?
 912        {
 913            Err(anyhow!("access denied"))?;
 914        }
 915
 916        self.state_mut().join_channel(request.sender_id, channel_id);
 917        let messages = self
 918            .app_state
 919            .db
 920            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 921            .await?
 922            .into_iter()
 923            .map(|msg| proto::ChannelMessage {
 924                id: msg.id.to_proto(),
 925                body: msg.body,
 926                timestamp: msg.sent_at.unix_timestamp() as u64,
 927                sender_id: msg.sender_id.to_proto(),
 928                nonce: Some(msg.nonce.as_u128().into()),
 929            })
 930            .collect::<Vec<_>>();
 931        Ok(proto::JoinChannelResponse {
 932            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 933            messages,
 934        })
 935    }
 936
 937    async fn leave_channel(
 938        mut self: Arc<Self>,
 939        request: TypedEnvelope<proto::LeaveChannel>,
 940    ) -> tide::Result<()> {
 941        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 942        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 943        if !self
 944            .app_state
 945            .db
 946            .can_user_access_channel(user_id, channel_id)
 947            .await?
 948        {
 949            Err(anyhow!("access denied"))?;
 950        }
 951
 952        self.state_mut()
 953            .leave_channel(request.sender_id, channel_id);
 954
 955        Ok(())
 956    }
 957
 958    async fn send_channel_message(
 959        self: Arc<Self>,
 960        request: TypedEnvelope<proto::SendChannelMessage>,
 961    ) -> tide::Result<proto::SendChannelMessageResponse> {
 962        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 963        let user_id;
 964        let connection_ids;
 965        {
 966            let state = self.state();
 967            user_id = state.user_id_for_connection(request.sender_id)?;
 968            connection_ids = state.channel_connection_ids(channel_id)?;
 969        }
 970
 971        // Validate the message body.
 972        let body = request.payload.body.trim().to_string();
 973        if body.len() > MAX_MESSAGE_LEN {
 974            return Err(anyhow!("message is too long"))?;
 975        }
 976        if body.is_empty() {
 977            return Err(anyhow!("message can't be blank"))?;
 978        }
 979
 980        let timestamp = OffsetDateTime::now_utc();
 981        let nonce = request
 982            .payload
 983            .nonce
 984            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 985
 986        let message_id = self
 987            .app_state
 988            .db
 989            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 990            .await?
 991            .to_proto();
 992        let message = proto::ChannelMessage {
 993            sender_id: user_id.to_proto(),
 994            id: message_id,
 995            body,
 996            timestamp: timestamp.unix_timestamp() as u64,
 997            nonce: Some(nonce),
 998        };
 999        broadcast(request.sender_id, connection_ids, |conn_id| {
1000            self.peer.send(
1001                conn_id,
1002                proto::ChannelMessageSent {
1003                    channel_id: channel_id.to_proto(),
1004                    message: Some(message.clone()),
1005                },
1006            )
1007        })?;
1008        Ok(proto::SendChannelMessageResponse {
1009            message: Some(message),
1010        })
1011    }
1012
1013    async fn get_channel_messages(
1014        self: Arc<Self>,
1015        request: TypedEnvelope<proto::GetChannelMessages>,
1016    ) -> tide::Result<proto::GetChannelMessagesResponse> {
1017        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1018        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1019        if !self
1020            .app_state
1021            .db
1022            .can_user_access_channel(user_id, channel_id)
1023            .await?
1024        {
1025            Err(anyhow!("access denied"))?;
1026        }
1027
1028        let messages = self
1029            .app_state
1030            .db
1031            .get_channel_messages(
1032                channel_id,
1033                MESSAGE_COUNT_PER_PAGE,
1034                Some(MessageId::from_proto(request.payload.before_message_id)),
1035            )
1036            .await?
1037            .into_iter()
1038            .map(|msg| proto::ChannelMessage {
1039                id: msg.id.to_proto(),
1040                body: msg.body,
1041                timestamp: msg.sent_at.unix_timestamp() as u64,
1042                sender_id: msg.sender_id.to_proto(),
1043                nonce: Some(msg.nonce.as_u128().into()),
1044            })
1045            .collect::<Vec<_>>();
1046
1047        Ok(proto::GetChannelMessagesResponse {
1048            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1049            messages,
1050        })
1051    }
1052
1053    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1054        self.store.read()
1055    }
1056
1057    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1058        self.store.write()
1059    }
1060}
1061
1062impl Executor for RealExecutor {
1063    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
1064        task::spawn(future);
1065    }
1066}
1067
1068fn broadcast<F>(
1069    sender_id: ConnectionId,
1070    receiver_ids: Vec<ConnectionId>,
1071    mut f: F,
1072) -> anyhow::Result<()>
1073where
1074    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1075{
1076    let mut result = Ok(());
1077    for receiver_id in receiver_ids {
1078        if receiver_id != sender_id {
1079            if let Err(error) = f(receiver_id) {
1080                if result.is_ok() {
1081                    result = Err(error);
1082                }
1083            }
1084        }
1085    }
1086    result
1087}
1088
1089pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1090    let server = Server::new(app.state().clone(), rpc.clone(), None);
1091    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1092        let server = server.clone();
1093        async move {
1094            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1095
1096            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1097            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1098            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1099            let client_protocol_version: Option<u32> = request
1100                .header("X-Zed-Protocol-Version")
1101                .and_then(|v| v.as_str().parse().ok());
1102
1103            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1104                return Ok(Response::new(StatusCode::UpgradeRequired));
1105            }
1106
1107            let header = match request.header("Sec-Websocket-Key") {
1108                Some(h) => h.as_str(),
1109                None => return Err(anyhow!("expected sec-websocket-key"))?,
1110            };
1111
1112            let user_id = process_auth_header(&request).await?;
1113
1114            let mut response = Response::new(StatusCode::SwitchingProtocols);
1115            response.insert_header(UPGRADE, "websocket");
1116            response.insert_header(CONNECTION, "Upgrade");
1117            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1118            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1119            response.insert_header("Sec-Websocket-Version", "13");
1120
1121            let http_res: &mut tide::http::Response = response.as_mut();
1122            let upgrade_receiver = http_res.recv_upgrade().await;
1123            let addr = request.remote().unwrap_or("unknown").to_string();
1124            task::spawn(async move {
1125                if let Some(stream) = upgrade_receiver.await {
1126                    server
1127                        .handle_connection(
1128                            Connection::new(
1129                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1130                            ),
1131                            addr,
1132                            user_id,
1133                            None,
1134                            RealExecutor,
1135                        )
1136                        .await;
1137                }
1138            });
1139
1140            Ok(response)
1141        }
1142    });
1143}
1144
1145fn header_contains_ignore_case<T>(
1146    request: &tide::Request<T>,
1147    header_name: HeaderName,
1148    value: &str,
1149) -> bool {
1150    request
1151        .header(header_name)
1152        .map(|h| {
1153            h.as_str()
1154                .split(',')
1155                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1156        })
1157        .unwrap_or(false)
1158}
1159
1160#[cfg(test)]
1161mod tests {
1162    use super::*;
1163    use crate::{
1164        auth,
1165        db::{tests::TestDb, UserId},
1166        github, AppState, Config,
1167    };
1168    use ::rpc::Peer;
1169    use collections::BTreeMap;
1170    use gpui::{executor, ModelHandle, TestAppContext};
1171    use parking_lot::Mutex;
1172    use postage::{sink::Sink, watch};
1173    use rand::prelude::*;
1174    use rpc::PeerId;
1175    use serde_json::json;
1176    use sqlx::types::time::OffsetDateTime;
1177    use std::{
1178        cell::Cell,
1179        env,
1180        ops::Deref,
1181        path::{Path, PathBuf},
1182        rc::Rc,
1183        sync::{
1184            atomic::{AtomicBool, Ordering::SeqCst},
1185            Arc,
1186        },
1187        time::Duration,
1188    };
1189    use zed::{
1190        client::{
1191            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1192            EstablishConnectionError, UserStore,
1193        },
1194        editor::{
1195            self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1196            Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1197        },
1198        fs::{FakeFs, Fs as _},
1199        language::{
1200            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1201            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1202        },
1203        lsp,
1204        project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath},
1205        workspace::{Settings, Workspace, WorkspaceParams},
1206    };
1207
1208    #[cfg(test)]
1209    #[ctor::ctor]
1210    fn init_logger() {
1211        if std::env::var("RUST_LOG").is_ok() {
1212            env_logger::init();
1213        }
1214    }
1215
1216    #[gpui::test(iterations = 10)]
1217    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1218        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1219        let lang_registry = Arc::new(LanguageRegistry::new());
1220        let fs = FakeFs::new(cx_a.background());
1221        cx_a.foreground().forbid_parking();
1222
1223        // Connect to a server as 2 clients.
1224        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1225        let client_a = server.create_client(&mut cx_a, "user_a").await;
1226        let client_b = server.create_client(&mut cx_b, "user_b").await;
1227
1228        // Share a project as client A
1229        fs.insert_tree(
1230            "/a",
1231            json!({
1232                ".zed.toml": r#"collaborators = ["user_b"]"#,
1233                "a.txt": "a-contents",
1234                "b.txt": "b-contents",
1235            }),
1236        )
1237        .await;
1238        let project_a = cx_a.update(|cx| {
1239            Project::local(
1240                client_a.clone(),
1241                client_a.user_store.clone(),
1242                lang_registry.clone(),
1243                fs.clone(),
1244                cx,
1245            )
1246        });
1247        let (worktree_a, _) = project_a
1248            .update(&mut cx_a, |p, cx| {
1249                p.find_or_create_local_worktree("/a", false, cx)
1250            })
1251            .await
1252            .unwrap();
1253        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1254        worktree_a
1255            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1256            .await;
1257        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1258        project_a
1259            .update(&mut cx_a, |p, cx| p.share(cx))
1260            .await
1261            .unwrap();
1262
1263        // Join that project as client B
1264        let project_b = Project::remote(
1265            project_id,
1266            client_b.clone(),
1267            client_b.user_store.clone(),
1268            lang_registry.clone(),
1269            fs.clone(),
1270            &mut cx_b.to_async(),
1271        )
1272        .await
1273        .unwrap();
1274
1275        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1276            assert_eq!(
1277                project
1278                    .collaborators()
1279                    .get(&client_a.peer_id)
1280                    .unwrap()
1281                    .user
1282                    .github_login,
1283                "user_a"
1284            );
1285            project.replica_id()
1286        });
1287        project_a
1288            .condition(&cx_a, |tree, _| {
1289                tree.collaborators()
1290                    .get(&client_b.peer_id)
1291                    .map_or(false, |collaborator| {
1292                        collaborator.replica_id == replica_id_b
1293                            && collaborator.user.github_login == "user_b"
1294                    })
1295            })
1296            .await;
1297
1298        // Open the same file as client B and client A.
1299        let buffer_b = project_b
1300            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1301            .await
1302            .unwrap();
1303        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1304        buffer_b.read_with(&cx_b, |buf, cx| {
1305            assert_eq!(buf.read(cx).text(), "b-contents")
1306        });
1307        project_a.read_with(&cx_a, |project, cx| {
1308            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1309        });
1310        let buffer_a = project_a
1311            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1312            .await
1313            .unwrap();
1314
1315        let editor_b = cx_b.add_view(window_b, |cx| {
1316            Editor::for_buffer(
1317                buffer_b,
1318                None,
1319                watch::channel_with(Settings::test(cx)).1,
1320                cx,
1321            )
1322        });
1323
1324        // TODO
1325        // // Create a selection set as client B and see that selection set as client A.
1326        // buffer_a
1327        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1328        //     .await;
1329
1330        // Edit the buffer as client B and see that edit as client A.
1331        editor_b.update(&mut cx_b, |editor, cx| {
1332            editor.handle_input(&Input("ok, ".into()), cx)
1333        });
1334        buffer_a
1335            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1336            .await;
1337
1338        // TODO
1339        // // Remove the selection set as client B, see those selections disappear as client A.
1340        cx_b.update(move |_| drop(editor_b));
1341        // buffer_a
1342        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1343        //     .await;
1344
1345        // Close the buffer as client A, see that the buffer is closed.
1346        cx_a.update(move |_| drop(buffer_a));
1347        project_a
1348            .condition(&cx_a, |project, cx| {
1349                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1350            })
1351            .await;
1352
1353        // Dropping the client B's project removes client B from client A's collaborators.
1354        cx_b.update(move |_| drop(project_b));
1355        project_a
1356            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1357            .await;
1358    }
1359
1360    #[gpui::test(iterations = 10)]
1361    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1362        let lang_registry = Arc::new(LanguageRegistry::new());
1363        let fs = FakeFs::new(cx_a.background());
1364        cx_a.foreground().forbid_parking();
1365
1366        // Connect to a server as 2 clients.
1367        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1368        let client_a = server.create_client(&mut cx_a, "user_a").await;
1369        let client_b = server.create_client(&mut cx_b, "user_b").await;
1370
1371        // Share a project as client A
1372        fs.insert_tree(
1373            "/a",
1374            json!({
1375                ".zed.toml": r#"collaborators = ["user_b"]"#,
1376                "a.txt": "a-contents",
1377                "b.txt": "b-contents",
1378            }),
1379        )
1380        .await;
1381        let project_a = cx_a.update(|cx| {
1382            Project::local(
1383                client_a.clone(),
1384                client_a.user_store.clone(),
1385                lang_registry.clone(),
1386                fs.clone(),
1387                cx,
1388            )
1389        });
1390        let (worktree_a, _) = project_a
1391            .update(&mut cx_a, |p, cx| {
1392                p.find_or_create_local_worktree("/a", false, cx)
1393            })
1394            .await
1395            .unwrap();
1396        worktree_a
1397            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1398            .await;
1399        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1400        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1401        project_a
1402            .update(&mut cx_a, |p, cx| p.share(cx))
1403            .await
1404            .unwrap();
1405        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1406
1407        // Join that project as client B
1408        let project_b = Project::remote(
1409            project_id,
1410            client_b.clone(),
1411            client_b.user_store.clone(),
1412            lang_registry.clone(),
1413            fs.clone(),
1414            &mut cx_b.to_async(),
1415        )
1416        .await
1417        .unwrap();
1418        project_b
1419            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1420            .await
1421            .unwrap();
1422
1423        // Unshare the project as client A
1424        project_a
1425            .update(&mut cx_a, |project, cx| project.unshare(cx))
1426            .await
1427            .unwrap();
1428        project_b
1429            .condition(&mut cx_b, |project, _| project.is_read_only())
1430            .await;
1431        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1432        drop(project_b);
1433
1434        // Share the project again and ensure guests can still join.
1435        project_a
1436            .update(&mut cx_a, |project, cx| project.share(cx))
1437            .await
1438            .unwrap();
1439        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1440
1441        let project_c = Project::remote(
1442            project_id,
1443            client_b.clone(),
1444            client_b.user_store.clone(),
1445            lang_registry.clone(),
1446            fs.clone(),
1447            &mut cx_b.to_async(),
1448        )
1449        .await
1450        .unwrap();
1451        project_c
1452            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1453            .await
1454            .unwrap();
1455    }
1456
1457    #[gpui::test(iterations = 10)]
1458    async fn test_propagate_saves_and_fs_changes(
1459        mut cx_a: TestAppContext,
1460        mut cx_b: TestAppContext,
1461        mut cx_c: TestAppContext,
1462    ) {
1463        let lang_registry = Arc::new(LanguageRegistry::new());
1464        let fs = FakeFs::new(cx_a.background());
1465        cx_a.foreground().forbid_parking();
1466
1467        // Connect to a server as 3 clients.
1468        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1469        let client_a = server.create_client(&mut cx_a, "user_a").await;
1470        let client_b = server.create_client(&mut cx_b, "user_b").await;
1471        let client_c = server.create_client(&mut cx_c, "user_c").await;
1472
1473        // Share a worktree as client A.
1474        fs.insert_tree(
1475            "/a",
1476            json!({
1477                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1478                "file1": "",
1479                "file2": ""
1480            }),
1481        )
1482        .await;
1483        let project_a = cx_a.update(|cx| {
1484            Project::local(
1485                client_a.clone(),
1486                client_a.user_store.clone(),
1487                lang_registry.clone(),
1488                fs.clone(),
1489                cx,
1490            )
1491        });
1492        let (worktree_a, _) = project_a
1493            .update(&mut cx_a, |p, cx| {
1494                p.find_or_create_local_worktree("/a", false, cx)
1495            })
1496            .await
1497            .unwrap();
1498        worktree_a
1499            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1500            .await;
1501        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1502        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1503        project_a
1504            .update(&mut cx_a, |p, cx| p.share(cx))
1505            .await
1506            .unwrap();
1507
1508        // Join that worktree as clients B and C.
1509        let project_b = Project::remote(
1510            project_id,
1511            client_b.clone(),
1512            client_b.user_store.clone(),
1513            lang_registry.clone(),
1514            fs.clone(),
1515            &mut cx_b.to_async(),
1516        )
1517        .await
1518        .unwrap();
1519        let project_c = Project::remote(
1520            project_id,
1521            client_c.clone(),
1522            client_c.user_store.clone(),
1523            lang_registry.clone(),
1524            fs.clone(),
1525            &mut cx_c.to_async(),
1526        )
1527        .await
1528        .unwrap();
1529        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1530        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1531
1532        // Open and edit a buffer as both guests B and C.
1533        let buffer_b = project_b
1534            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1535            .await
1536            .unwrap();
1537        let buffer_c = project_c
1538            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1539            .await
1540            .unwrap();
1541        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1542        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1543
1544        // Open and edit that buffer as the host.
1545        let buffer_a = project_a
1546            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1547            .await
1548            .unwrap();
1549
1550        buffer_a
1551            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1552            .await;
1553        buffer_a.update(&mut cx_a, |buf, cx| {
1554            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1555        });
1556
1557        // Wait for edits to propagate
1558        buffer_a
1559            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1560            .await;
1561        buffer_b
1562            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1563            .await;
1564        buffer_c
1565            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1566            .await;
1567
1568        // Edit the buffer as the host and concurrently save as guest B.
1569        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1570        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1571        save_b.await.unwrap();
1572        assert_eq!(
1573            fs.load("/a/file1".as_ref()).await.unwrap(),
1574            "hi-a, i-am-c, i-am-b, i-am-a"
1575        );
1576        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1577        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1578        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1579
1580        // Make changes on host's file system, see those changes on guest worktrees.
1581        fs.rename(
1582            "/a/file1".as_ref(),
1583            "/a/file1-renamed".as_ref(),
1584            Default::default(),
1585        )
1586        .await
1587        .unwrap();
1588
1589        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1590            .await
1591            .unwrap();
1592        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1593
1594        worktree_a
1595            .condition(&cx_a, |tree, _| {
1596                tree.paths()
1597                    .map(|p| p.to_string_lossy())
1598                    .collect::<Vec<_>>()
1599                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1600            })
1601            .await;
1602        worktree_b
1603            .condition(&cx_b, |tree, _| {
1604                tree.paths()
1605                    .map(|p| p.to_string_lossy())
1606                    .collect::<Vec<_>>()
1607                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1608            })
1609            .await;
1610        worktree_c
1611            .condition(&cx_c, |tree, _| {
1612                tree.paths()
1613                    .map(|p| p.to_string_lossy())
1614                    .collect::<Vec<_>>()
1615                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1616            })
1617            .await;
1618
1619        // Ensure buffer files are updated as well.
1620        buffer_a
1621            .condition(&cx_a, |buf, _| {
1622                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1623            })
1624            .await;
1625        buffer_b
1626            .condition(&cx_b, |buf, _| {
1627                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1628            })
1629            .await;
1630        buffer_c
1631            .condition(&cx_c, |buf, _| {
1632                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1633            })
1634            .await;
1635    }
1636
1637    #[gpui::test(iterations = 10)]
1638    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1639        cx_a.foreground().forbid_parking();
1640        let lang_registry = Arc::new(LanguageRegistry::new());
1641        let fs = FakeFs::new(cx_a.background());
1642
1643        // Connect to a server as 2 clients.
1644        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1645        let client_a = server.create_client(&mut cx_a, "user_a").await;
1646        let client_b = server.create_client(&mut cx_b, "user_b").await;
1647
1648        // Share a project as client A
1649        fs.insert_tree(
1650            "/dir",
1651            json!({
1652                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1653                "a.txt": "a-contents",
1654            }),
1655        )
1656        .await;
1657
1658        let project_a = cx_a.update(|cx| {
1659            Project::local(
1660                client_a.clone(),
1661                client_a.user_store.clone(),
1662                lang_registry.clone(),
1663                fs.clone(),
1664                cx,
1665            )
1666        });
1667        let (worktree_a, _) = project_a
1668            .update(&mut cx_a, |p, cx| {
1669                p.find_or_create_local_worktree("/dir", false, cx)
1670            })
1671            .await
1672            .unwrap();
1673        worktree_a
1674            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1675            .await;
1676        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1677        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1678        project_a
1679            .update(&mut cx_a, |p, cx| p.share(cx))
1680            .await
1681            .unwrap();
1682
1683        // Join that project as client B
1684        let project_b = Project::remote(
1685            project_id,
1686            client_b.clone(),
1687            client_b.user_store.clone(),
1688            lang_registry.clone(),
1689            fs.clone(),
1690            &mut cx_b.to_async(),
1691        )
1692        .await
1693        .unwrap();
1694
1695        // Open a buffer as client B
1696        let buffer_b = project_b
1697            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1698            .await
1699            .unwrap();
1700
1701        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1702        buffer_b.read_with(&cx_b, |buf, _| {
1703            assert!(buf.is_dirty());
1704            assert!(!buf.has_conflict());
1705        });
1706
1707        buffer_b
1708            .update(&mut cx_b, |buf, cx| buf.save(cx))
1709            .await
1710            .unwrap();
1711        buffer_b
1712            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1713            .await;
1714        buffer_b.read_with(&cx_b, |buf, _| {
1715            assert!(!buf.has_conflict());
1716        });
1717
1718        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1719        buffer_b.read_with(&cx_b, |buf, _| {
1720            assert!(buf.is_dirty());
1721            assert!(!buf.has_conflict());
1722        });
1723    }
1724
1725    #[gpui::test(iterations = 10)]
1726    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1727        cx_a.foreground().forbid_parking();
1728        let lang_registry = Arc::new(LanguageRegistry::new());
1729        let fs = FakeFs::new(cx_a.background());
1730
1731        // Connect to a server as 2 clients.
1732        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1733        let client_a = server.create_client(&mut cx_a, "user_a").await;
1734        let client_b = server.create_client(&mut cx_b, "user_b").await;
1735
1736        // Share a project as client A
1737        fs.insert_tree(
1738            "/dir",
1739            json!({
1740                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1741                "a.txt": "a-contents",
1742            }),
1743        )
1744        .await;
1745
1746        let project_a = cx_a.update(|cx| {
1747            Project::local(
1748                client_a.clone(),
1749                client_a.user_store.clone(),
1750                lang_registry.clone(),
1751                fs.clone(),
1752                cx,
1753            )
1754        });
1755        let (worktree_a, _) = project_a
1756            .update(&mut cx_a, |p, cx| {
1757                p.find_or_create_local_worktree("/dir", false, cx)
1758            })
1759            .await
1760            .unwrap();
1761        worktree_a
1762            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1763            .await;
1764        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1765        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1766        project_a
1767            .update(&mut cx_a, |p, cx| p.share(cx))
1768            .await
1769            .unwrap();
1770
1771        // Join that project as client B
1772        let project_b = Project::remote(
1773            project_id,
1774            client_b.clone(),
1775            client_b.user_store.clone(),
1776            lang_registry.clone(),
1777            fs.clone(),
1778            &mut cx_b.to_async(),
1779        )
1780        .await
1781        .unwrap();
1782        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1783
1784        // Open a buffer as client B
1785        let buffer_b = project_b
1786            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1787            .await
1788            .unwrap();
1789        buffer_b.read_with(&cx_b, |buf, _| {
1790            assert!(!buf.is_dirty());
1791            assert!(!buf.has_conflict());
1792        });
1793
1794        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1795            .await
1796            .unwrap();
1797        buffer_b
1798            .condition(&cx_b, |buf, _| {
1799                buf.text() == "new contents" && !buf.is_dirty()
1800            })
1801            .await;
1802        buffer_b.read_with(&cx_b, |buf, _| {
1803            assert!(!buf.has_conflict());
1804        });
1805    }
1806
1807    #[gpui::test(iterations = 10)]
1808    async fn test_editing_while_guest_opens_buffer(
1809        mut cx_a: TestAppContext,
1810        mut cx_b: TestAppContext,
1811    ) {
1812        cx_a.foreground().forbid_parking();
1813        let lang_registry = Arc::new(LanguageRegistry::new());
1814        let fs = FakeFs::new(cx_a.background());
1815
1816        // Connect to a server as 2 clients.
1817        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1818        let client_a = server.create_client(&mut cx_a, "user_a").await;
1819        let client_b = server.create_client(&mut cx_b, "user_b").await;
1820
1821        // Share a project as client A
1822        fs.insert_tree(
1823            "/dir",
1824            json!({
1825                ".zed.toml": r#"collaborators = ["user_b"]"#,
1826                "a.txt": "a-contents",
1827            }),
1828        )
1829        .await;
1830        let project_a = cx_a.update(|cx| {
1831            Project::local(
1832                client_a.clone(),
1833                client_a.user_store.clone(),
1834                lang_registry.clone(),
1835                fs.clone(),
1836                cx,
1837            )
1838        });
1839        let (worktree_a, _) = project_a
1840            .update(&mut cx_a, |p, cx| {
1841                p.find_or_create_local_worktree("/dir", false, cx)
1842            })
1843            .await
1844            .unwrap();
1845        worktree_a
1846            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1847            .await;
1848        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1849        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1850        project_a
1851            .update(&mut cx_a, |p, cx| p.share(cx))
1852            .await
1853            .unwrap();
1854
1855        // Join that project as client B
1856        let project_b = Project::remote(
1857            project_id,
1858            client_b.clone(),
1859            client_b.user_store.clone(),
1860            lang_registry.clone(),
1861            fs.clone(),
1862            &mut cx_b.to_async(),
1863        )
1864        .await
1865        .unwrap();
1866
1867        // Open a buffer as client A
1868        let buffer_a = project_a
1869            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1870            .await
1871            .unwrap();
1872
1873        // Start opening the same buffer as client B
1874        let buffer_b = cx_b
1875            .background()
1876            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1877
1878        // Edit the buffer as client A while client B is still opening it.
1879        cx_b.background().simulate_random_delay().await;
1880        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1881        cx_b.background().simulate_random_delay().await;
1882        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1883
1884        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1885        let buffer_b = buffer_b.await.unwrap();
1886        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1887    }
1888
1889    #[gpui::test(iterations = 10)]
1890    async fn test_leaving_worktree_while_opening_buffer(
1891        mut cx_a: TestAppContext,
1892        mut cx_b: TestAppContext,
1893    ) {
1894        cx_a.foreground().forbid_parking();
1895        let lang_registry = Arc::new(LanguageRegistry::new());
1896        let fs = FakeFs::new(cx_a.background());
1897
1898        // Connect to a server as 2 clients.
1899        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1900        let client_a = server.create_client(&mut cx_a, "user_a").await;
1901        let client_b = server.create_client(&mut cx_b, "user_b").await;
1902
1903        // Share a project as client A
1904        fs.insert_tree(
1905            "/dir",
1906            json!({
1907                ".zed.toml": r#"collaborators = ["user_b"]"#,
1908                "a.txt": "a-contents",
1909            }),
1910        )
1911        .await;
1912        let project_a = cx_a.update(|cx| {
1913            Project::local(
1914                client_a.clone(),
1915                client_a.user_store.clone(),
1916                lang_registry.clone(),
1917                fs.clone(),
1918                cx,
1919            )
1920        });
1921        let (worktree_a, _) = project_a
1922            .update(&mut cx_a, |p, cx| {
1923                p.find_or_create_local_worktree("/dir", false, cx)
1924            })
1925            .await
1926            .unwrap();
1927        worktree_a
1928            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1929            .await;
1930        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1931        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1932        project_a
1933            .update(&mut cx_a, |p, cx| p.share(cx))
1934            .await
1935            .unwrap();
1936
1937        // Join that project as client B
1938        let project_b = Project::remote(
1939            project_id,
1940            client_b.clone(),
1941            client_b.user_store.clone(),
1942            lang_registry.clone(),
1943            fs.clone(),
1944            &mut cx_b.to_async(),
1945        )
1946        .await
1947        .unwrap();
1948
1949        // See that a guest has joined as client A.
1950        project_a
1951            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1952            .await;
1953
1954        // Begin opening a buffer as client B, but leave the project before the open completes.
1955        let buffer_b = cx_b
1956            .background()
1957            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1958        cx_b.update(|_| drop(project_b));
1959        drop(buffer_b);
1960
1961        // See that the guest has left.
1962        project_a
1963            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1964            .await;
1965    }
1966
1967    #[gpui::test(iterations = 10)]
1968    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1969        cx_a.foreground().forbid_parking();
1970        let lang_registry = Arc::new(LanguageRegistry::new());
1971        let fs = FakeFs::new(cx_a.background());
1972
1973        // Connect to a server as 2 clients.
1974        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1975        let client_a = server.create_client(&mut cx_a, "user_a").await;
1976        let client_b = server.create_client(&mut cx_b, "user_b").await;
1977
1978        // Share a project as client A
1979        fs.insert_tree(
1980            "/a",
1981            json!({
1982                ".zed.toml": r#"collaborators = ["user_b"]"#,
1983                "a.txt": "a-contents",
1984                "b.txt": "b-contents",
1985            }),
1986        )
1987        .await;
1988        let project_a = cx_a.update(|cx| {
1989            Project::local(
1990                client_a.clone(),
1991                client_a.user_store.clone(),
1992                lang_registry.clone(),
1993                fs.clone(),
1994                cx,
1995            )
1996        });
1997        let (worktree_a, _) = project_a
1998            .update(&mut cx_a, |p, cx| {
1999                p.find_or_create_local_worktree("/a", false, cx)
2000            })
2001            .await
2002            .unwrap();
2003        worktree_a
2004            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2005            .await;
2006        let project_id = project_a
2007            .update(&mut cx_a, |project, _| project.next_remote_id())
2008            .await;
2009        project_a
2010            .update(&mut cx_a, |project, cx| project.share(cx))
2011            .await
2012            .unwrap();
2013
2014        // Join that project as client B
2015        let _project_b = Project::remote(
2016            project_id,
2017            client_b.clone(),
2018            client_b.user_store.clone(),
2019            lang_registry.clone(),
2020            fs.clone(),
2021            &mut cx_b.to_async(),
2022        )
2023        .await
2024        .unwrap();
2025
2026        // See that a guest has joined as client A.
2027        project_a
2028            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2029            .await;
2030
2031        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2032        client_b.disconnect(&cx_b.to_async()).unwrap();
2033        project_a
2034            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2035            .await;
2036    }
2037
2038    #[gpui::test(iterations = 10)]
2039    async fn test_collaborating_with_diagnostics(
2040        mut cx_a: TestAppContext,
2041        mut cx_b: TestAppContext,
2042    ) {
2043        cx_a.foreground().forbid_parking();
2044        let mut lang_registry = Arc::new(LanguageRegistry::new());
2045        let fs = FakeFs::new(cx_a.background());
2046
2047        // Set up a fake language server.
2048        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2049        Arc::get_mut(&mut lang_registry)
2050            .unwrap()
2051            .add(Arc::new(Language::new(
2052                LanguageConfig {
2053                    name: "Rust".into(),
2054                    path_suffixes: vec!["rs".to_string()],
2055                    language_server: Some(language_server_config),
2056                    ..Default::default()
2057                },
2058                Some(tree_sitter_rust::language()),
2059            )));
2060
2061        // Connect to a server as 2 clients.
2062        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2063        let client_a = server.create_client(&mut cx_a, "user_a").await;
2064        let client_b = server.create_client(&mut cx_b, "user_b").await;
2065
2066        // Share a project as client A
2067        fs.insert_tree(
2068            "/a",
2069            json!({
2070                ".zed.toml": r#"collaborators = ["user_b"]"#,
2071                "a.rs": "let one = two",
2072                "other.rs": "",
2073            }),
2074        )
2075        .await;
2076        let project_a = cx_a.update(|cx| {
2077            Project::local(
2078                client_a.clone(),
2079                client_a.user_store.clone(),
2080                lang_registry.clone(),
2081                fs.clone(),
2082                cx,
2083            )
2084        });
2085        let (worktree_a, _) = project_a
2086            .update(&mut cx_a, |p, cx| {
2087                p.find_or_create_local_worktree("/a", false, cx)
2088            })
2089            .await
2090            .unwrap();
2091        worktree_a
2092            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2093            .await;
2094        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2095        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2096        project_a
2097            .update(&mut cx_a, |p, cx| p.share(cx))
2098            .await
2099            .unwrap();
2100
2101        // Cause the language server to start.
2102        let _ = cx_a
2103            .background()
2104            .spawn(project_a.update(&mut cx_a, |project, cx| {
2105                project.open_buffer(
2106                    ProjectPath {
2107                        worktree_id,
2108                        path: Path::new("other.rs").into(),
2109                    },
2110                    cx,
2111                )
2112            }))
2113            .await
2114            .unwrap();
2115
2116        // Simulate a language server reporting errors for a file.
2117        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2118        fake_language_server
2119            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2120                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2121                version: None,
2122                diagnostics: vec![lsp::Diagnostic {
2123                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2124                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2125                    message: "message 1".to_string(),
2126                    ..Default::default()
2127                }],
2128            })
2129            .await;
2130
2131        // Wait for server to see the diagnostics update.
2132        server
2133            .condition(|store| {
2134                let worktree = store
2135                    .project(project_id)
2136                    .unwrap()
2137                    .share
2138                    .as_ref()
2139                    .unwrap()
2140                    .worktrees
2141                    .get(&worktree_id.to_proto())
2142                    .unwrap();
2143
2144                !worktree.diagnostic_summaries.is_empty()
2145            })
2146            .await;
2147
2148        // Join the worktree as client B.
2149        let project_b = Project::remote(
2150            project_id,
2151            client_b.clone(),
2152            client_b.user_store.clone(),
2153            lang_registry.clone(),
2154            fs.clone(),
2155            &mut cx_b.to_async(),
2156        )
2157        .await
2158        .unwrap();
2159
2160        project_b.read_with(&cx_b, |project, cx| {
2161            assert_eq!(
2162                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2163                &[(
2164                    ProjectPath {
2165                        worktree_id,
2166                        path: Arc::from(Path::new("a.rs")),
2167                    },
2168                    DiagnosticSummary {
2169                        error_count: 1,
2170                        warning_count: 0,
2171                        ..Default::default()
2172                    },
2173                )]
2174            )
2175        });
2176
2177        // Simulate a language server reporting more errors for a file.
2178        fake_language_server
2179            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2180                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2181                version: None,
2182                diagnostics: vec![
2183                    lsp::Diagnostic {
2184                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2185                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2186                        message: "message 1".to_string(),
2187                        ..Default::default()
2188                    },
2189                    lsp::Diagnostic {
2190                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2191                        range: lsp::Range::new(
2192                            lsp::Position::new(0, 10),
2193                            lsp::Position::new(0, 13),
2194                        ),
2195                        message: "message 2".to_string(),
2196                        ..Default::default()
2197                    },
2198                ],
2199            })
2200            .await;
2201
2202        // Client b gets the updated summaries
2203        project_b
2204            .condition(&cx_b, |project, cx| {
2205                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2206                    == &[(
2207                        ProjectPath {
2208                            worktree_id,
2209                            path: Arc::from(Path::new("a.rs")),
2210                        },
2211                        DiagnosticSummary {
2212                            error_count: 1,
2213                            warning_count: 1,
2214                            ..Default::default()
2215                        },
2216                    )]
2217            })
2218            .await;
2219
2220        // Open the file with the errors on client B. They should be present.
2221        let buffer_b = cx_b
2222            .background()
2223            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2224            .await
2225            .unwrap();
2226
2227        buffer_b.read_with(&cx_b, |buffer, _| {
2228            assert_eq!(
2229                buffer
2230                    .snapshot()
2231                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2232                    .map(|entry| entry)
2233                    .collect::<Vec<_>>(),
2234                &[
2235                    DiagnosticEntry {
2236                        range: Point::new(0, 4)..Point::new(0, 7),
2237                        diagnostic: Diagnostic {
2238                            group_id: 0,
2239                            message: "message 1".to_string(),
2240                            severity: lsp::DiagnosticSeverity::ERROR,
2241                            is_primary: true,
2242                            ..Default::default()
2243                        }
2244                    },
2245                    DiagnosticEntry {
2246                        range: Point::new(0, 10)..Point::new(0, 13),
2247                        diagnostic: Diagnostic {
2248                            group_id: 1,
2249                            severity: lsp::DiagnosticSeverity::WARNING,
2250                            message: "message 2".to_string(),
2251                            is_primary: true,
2252                            ..Default::default()
2253                        }
2254                    }
2255                ]
2256            );
2257        });
2258    }
2259
2260    #[gpui::test(iterations = 10)]
2261    async fn test_collaborating_with_completion(
2262        mut cx_a: TestAppContext,
2263        mut cx_b: TestAppContext,
2264    ) {
2265        cx_a.foreground().forbid_parking();
2266        let mut lang_registry = Arc::new(LanguageRegistry::new());
2267        let fs = FakeFs::new(cx_a.background());
2268
2269        // Set up a fake language server.
2270        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2271        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2272            completion_provider: Some(lsp::CompletionOptions {
2273                trigger_characters: Some(vec![".".to_string()]),
2274                ..Default::default()
2275            }),
2276            ..Default::default()
2277        });
2278        Arc::get_mut(&mut lang_registry)
2279            .unwrap()
2280            .add(Arc::new(Language::new(
2281                LanguageConfig {
2282                    name: "Rust".into(),
2283                    path_suffixes: vec!["rs".to_string()],
2284                    language_server: Some(language_server_config),
2285                    ..Default::default()
2286                },
2287                Some(tree_sitter_rust::language()),
2288            )));
2289
2290        // Connect to a server as 2 clients.
2291        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2292        let client_a = server.create_client(&mut cx_a, "user_a").await;
2293        let client_b = server.create_client(&mut cx_b, "user_b").await;
2294
2295        // Share a project as client A
2296        fs.insert_tree(
2297            "/a",
2298            json!({
2299                ".zed.toml": r#"collaborators = ["user_b"]"#,
2300                "main.rs": "fn main() { a }",
2301                "other.rs": "",
2302            }),
2303        )
2304        .await;
2305        let project_a = cx_a.update(|cx| {
2306            Project::local(
2307                client_a.clone(),
2308                client_a.user_store.clone(),
2309                lang_registry.clone(),
2310                fs.clone(),
2311                cx,
2312            )
2313        });
2314        let (worktree_a, _) = project_a
2315            .update(&mut cx_a, |p, cx| {
2316                p.find_or_create_local_worktree("/a", false, cx)
2317            })
2318            .await
2319            .unwrap();
2320        worktree_a
2321            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2322            .await;
2323        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2324        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2325        project_a
2326            .update(&mut cx_a, |p, cx| p.share(cx))
2327            .await
2328            .unwrap();
2329
2330        // Join the worktree as client B.
2331        let project_b = Project::remote(
2332            project_id,
2333            client_b.clone(),
2334            client_b.user_store.clone(),
2335            lang_registry.clone(),
2336            fs.clone(),
2337            &mut cx_b.to_async(),
2338        )
2339        .await
2340        .unwrap();
2341
2342        // Open a file in an editor as the guest.
2343        let buffer_b = project_b
2344            .update(&mut cx_b, |p, cx| {
2345                p.open_buffer((worktree_id, "main.rs"), cx)
2346            })
2347            .await
2348            .unwrap();
2349        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2350        let editor_b = cx_b.add_view(window_b, |cx| {
2351            Editor::for_buffer(
2352                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2353                Some(project_b.clone()),
2354                watch::channel_with(Settings::test(cx)).1,
2355                cx,
2356            )
2357        });
2358
2359        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2360        buffer_b
2361            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2362            .await;
2363
2364        // Type a completion trigger character as the guest.
2365        editor_b.update(&mut cx_b, |editor, cx| {
2366            editor.select_ranges([13..13], None, cx);
2367            editor.handle_input(&Input(".".into()), cx);
2368            cx.focus(&editor_b);
2369        });
2370
2371        // Receive a completion request as the host's language server.
2372        // Return some completions from the host's language server.
2373        cx_a.foreground().start_waiting();
2374        fake_language_server
2375            .handle_request::<lsp::request::Completion, _>(|params, _| {
2376                assert_eq!(
2377                    params.text_document_position.text_document.uri,
2378                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2379                );
2380                assert_eq!(
2381                    params.text_document_position.position,
2382                    lsp::Position::new(0, 14),
2383                );
2384
2385                Some(lsp::CompletionResponse::Array(vec![
2386                    lsp::CompletionItem {
2387                        label: "first_method(…)".into(),
2388                        detail: Some("fn(&mut self, B) -> C".into()),
2389                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2390                            new_text: "first_method($1)".to_string(),
2391                            range: lsp::Range::new(
2392                                lsp::Position::new(0, 14),
2393                                lsp::Position::new(0, 14),
2394                            ),
2395                        })),
2396                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2397                        ..Default::default()
2398                    },
2399                    lsp::CompletionItem {
2400                        label: "second_method(…)".into(),
2401                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2402                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2403                            new_text: "second_method()".to_string(),
2404                            range: lsp::Range::new(
2405                                lsp::Position::new(0, 14),
2406                                lsp::Position::new(0, 14),
2407                            ),
2408                        })),
2409                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2410                        ..Default::default()
2411                    },
2412                ]))
2413            })
2414            .next()
2415            .await
2416            .unwrap();
2417        cx_a.foreground().finish_waiting();
2418
2419        // Open the buffer on the host.
2420        let buffer_a = project_a
2421            .update(&mut cx_a, |p, cx| {
2422                p.open_buffer((worktree_id, "main.rs"), cx)
2423            })
2424            .await
2425            .unwrap();
2426        buffer_a
2427            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2428            .await;
2429
2430        // Confirm a completion on the guest.
2431        editor_b
2432            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2433            .await;
2434        editor_b.update(&mut cx_b, |editor, cx| {
2435            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2436            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2437        });
2438
2439        // Return a resolved completion from the host's language server.
2440        // The resolved completion has an additional text edit.
2441        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2442            |params, _| {
2443                assert_eq!(params.label, "first_method(…)");
2444                lsp::CompletionItem {
2445                    label: "first_method(…)".into(),
2446                    detail: Some("fn(&mut self, B) -> C".into()),
2447                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2448                        new_text: "first_method($1)".to_string(),
2449                        range: lsp::Range::new(
2450                            lsp::Position::new(0, 14),
2451                            lsp::Position::new(0, 14),
2452                        ),
2453                    })),
2454                    additional_text_edits: Some(vec![lsp::TextEdit {
2455                        new_text: "use d::SomeTrait;\n".to_string(),
2456                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2457                    }]),
2458                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2459                    ..Default::default()
2460                }
2461            },
2462        );
2463
2464        // The additional edit is applied.
2465        buffer_a
2466            .condition(&cx_a, |buffer, _| {
2467                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2468            })
2469            .await;
2470        buffer_b
2471            .condition(&cx_b, |buffer, _| {
2472                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2473            })
2474            .await;
2475    }
2476
2477    #[gpui::test(iterations = 10)]
2478    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2479        cx_a.foreground().forbid_parking();
2480        let mut lang_registry = Arc::new(LanguageRegistry::new());
2481        let fs = FakeFs::new(cx_a.background());
2482
2483        // Set up a fake language server.
2484        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2485        Arc::get_mut(&mut lang_registry)
2486            .unwrap()
2487            .add(Arc::new(Language::new(
2488                LanguageConfig {
2489                    name: "Rust".into(),
2490                    path_suffixes: vec!["rs".to_string()],
2491                    language_server: Some(language_server_config),
2492                    ..Default::default()
2493                },
2494                Some(tree_sitter_rust::language()),
2495            )));
2496
2497        // Connect to a server as 2 clients.
2498        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2499        let client_a = server.create_client(&mut cx_a, "user_a").await;
2500        let client_b = server.create_client(&mut cx_b, "user_b").await;
2501
2502        // Share a project as client A
2503        fs.insert_tree(
2504            "/a",
2505            json!({
2506                ".zed.toml": r#"collaborators = ["user_b"]"#,
2507                "a.rs": "let one = two",
2508            }),
2509        )
2510        .await;
2511        let project_a = cx_a.update(|cx| {
2512            Project::local(
2513                client_a.clone(),
2514                client_a.user_store.clone(),
2515                lang_registry.clone(),
2516                fs.clone(),
2517                cx,
2518            )
2519        });
2520        let (worktree_a, _) = project_a
2521            .update(&mut cx_a, |p, cx| {
2522                p.find_or_create_local_worktree("/a", false, cx)
2523            })
2524            .await
2525            .unwrap();
2526        worktree_a
2527            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2528            .await;
2529        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2530        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2531        project_a
2532            .update(&mut cx_a, |p, cx| p.share(cx))
2533            .await
2534            .unwrap();
2535
2536        // Join the worktree as client B.
2537        let project_b = Project::remote(
2538            project_id,
2539            client_b.clone(),
2540            client_b.user_store.clone(),
2541            lang_registry.clone(),
2542            fs.clone(),
2543            &mut cx_b.to_async(),
2544        )
2545        .await
2546        .unwrap();
2547
2548        let buffer_b = cx_b
2549            .background()
2550            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2551            .await
2552            .unwrap();
2553
2554        let format = project_b.update(&mut cx_b, |project, cx| {
2555            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2556        });
2557
2558        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2559        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2560            Some(vec![
2561                lsp::TextEdit {
2562                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2563                    new_text: "h".to_string(),
2564                },
2565                lsp::TextEdit {
2566                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2567                    new_text: "y".to_string(),
2568                },
2569            ])
2570        });
2571
2572        format.await.unwrap();
2573        assert_eq!(
2574            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2575            "let honey = two"
2576        );
2577    }
2578
2579    #[gpui::test(iterations = 10)]
2580    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2581        cx_a.foreground().forbid_parking();
2582        let mut lang_registry = Arc::new(LanguageRegistry::new());
2583        let fs = FakeFs::new(cx_a.background());
2584        fs.insert_tree(
2585            "/root-1",
2586            json!({
2587                ".zed.toml": r#"collaborators = ["user_b"]"#,
2588                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2589            }),
2590        )
2591        .await;
2592        fs.insert_tree(
2593            "/root-2",
2594            json!({
2595                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2596            }),
2597        )
2598        .await;
2599
2600        // Set up a fake language server.
2601        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2602        Arc::get_mut(&mut lang_registry)
2603            .unwrap()
2604            .add(Arc::new(Language::new(
2605                LanguageConfig {
2606                    name: "Rust".into(),
2607                    path_suffixes: vec!["rs".to_string()],
2608                    language_server: Some(language_server_config),
2609                    ..Default::default()
2610                },
2611                Some(tree_sitter_rust::language()),
2612            )));
2613
2614        // Connect to a server as 2 clients.
2615        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2616        let client_a = server.create_client(&mut cx_a, "user_a").await;
2617        let client_b = server.create_client(&mut cx_b, "user_b").await;
2618
2619        // Share a project as client A
2620        let project_a = cx_a.update(|cx| {
2621            Project::local(
2622                client_a.clone(),
2623                client_a.user_store.clone(),
2624                lang_registry.clone(),
2625                fs.clone(),
2626                cx,
2627            )
2628        });
2629        let (worktree_a, _) = project_a
2630            .update(&mut cx_a, |p, cx| {
2631                p.find_or_create_local_worktree("/root-1", false, cx)
2632            })
2633            .await
2634            .unwrap();
2635        worktree_a
2636            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2637            .await;
2638        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2639        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2640        project_a
2641            .update(&mut cx_a, |p, cx| p.share(cx))
2642            .await
2643            .unwrap();
2644
2645        // Join the worktree as client B.
2646        let project_b = Project::remote(
2647            project_id,
2648            client_b.clone(),
2649            client_b.user_store.clone(),
2650            lang_registry.clone(),
2651            fs.clone(),
2652            &mut cx_b.to_async(),
2653        )
2654        .await
2655        .unwrap();
2656
2657        // Open the file on client B.
2658        let buffer_b = cx_b
2659            .background()
2660            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2661            .await
2662            .unwrap();
2663
2664        // Request the definition of a symbol as the guest.
2665        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2666
2667        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2668        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2669            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2670                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2671                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2672            )))
2673        });
2674
2675        let definitions_1 = definitions_1.await.unwrap();
2676        cx_b.read(|cx| {
2677            assert_eq!(definitions_1.len(), 1);
2678            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2679            let target_buffer = definitions_1[0].buffer.read(cx);
2680            assert_eq!(
2681                target_buffer.text(),
2682                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2683            );
2684            assert_eq!(
2685                definitions_1[0].range.to_point(target_buffer),
2686                Point::new(0, 6)..Point::new(0, 9)
2687            );
2688        });
2689
2690        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2691        // the previous call to `definition`.
2692        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2693        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2694            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2695                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2696                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2697            )))
2698        });
2699
2700        let definitions_2 = definitions_2.await.unwrap();
2701        cx_b.read(|cx| {
2702            assert_eq!(definitions_2.len(), 1);
2703            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2704            let target_buffer = definitions_2[0].buffer.read(cx);
2705            assert_eq!(
2706                target_buffer.text(),
2707                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2708            );
2709            assert_eq!(
2710                definitions_2[0].range.to_point(target_buffer),
2711                Point::new(1, 6)..Point::new(1, 11)
2712            );
2713        });
2714        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2715
2716        cx_b.update(|_| {
2717            drop(definitions_1);
2718            drop(definitions_2);
2719        });
2720        project_b
2721            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2722            .await;
2723    }
2724
2725    #[gpui::test(iterations = 10)]
2726    async fn test_references(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2727        cx_a.foreground().forbid_parking();
2728        let mut lang_registry = Arc::new(LanguageRegistry::new());
2729        let fs = FakeFs::new(cx_a.background());
2730        fs.insert_tree(
2731            "/root-1",
2732            json!({
2733                ".zed.toml": r#"collaborators = ["user_b"]"#,
2734                "one.rs": "const ONE: usize = 1;",
2735                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2736            }),
2737        )
2738        .await;
2739        fs.insert_tree(
2740            "/root-2",
2741            json!({
2742                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2743            }),
2744        )
2745        .await;
2746
2747        // Set up a fake language server.
2748        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2749        Arc::get_mut(&mut lang_registry)
2750            .unwrap()
2751            .add(Arc::new(Language::new(
2752                LanguageConfig {
2753                    name: "Rust".into(),
2754                    path_suffixes: vec!["rs".to_string()],
2755                    language_server: Some(language_server_config),
2756                    ..Default::default()
2757                },
2758                Some(tree_sitter_rust::language()),
2759            )));
2760
2761        // Connect to a server as 2 clients.
2762        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2763        let client_a = server.create_client(&mut cx_a, "user_a").await;
2764        let client_b = server.create_client(&mut cx_b, "user_b").await;
2765
2766        // Share a project as client A
2767        let project_a = cx_a.update(|cx| {
2768            Project::local(
2769                client_a.clone(),
2770                client_a.user_store.clone(),
2771                lang_registry.clone(),
2772                fs.clone(),
2773                cx,
2774            )
2775        });
2776        let (worktree_a, _) = project_a
2777            .update(&mut cx_a, |p, cx| {
2778                p.find_or_create_local_worktree("/root-1", false, cx)
2779            })
2780            .await
2781            .unwrap();
2782        worktree_a
2783            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2784            .await;
2785        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2786        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2787        project_a
2788            .update(&mut cx_a, |p, cx| p.share(cx))
2789            .await
2790            .unwrap();
2791
2792        // Join the worktree as client B.
2793        let project_b = Project::remote(
2794            project_id,
2795            client_b.clone(),
2796            client_b.user_store.clone(),
2797            lang_registry.clone(),
2798            fs.clone(),
2799            &mut cx_b.to_async(),
2800        )
2801        .await
2802        .unwrap();
2803
2804        // Open the file on client B.
2805        let buffer_b = cx_b
2806            .background()
2807            .spawn(project_b.update(&mut cx_b, |p, cx| {
2808                p.open_buffer((worktree_id, "one.rs"), cx)
2809            }))
2810            .await
2811            .unwrap();
2812
2813        // Request references to a symbol as the guest.
2814        let references = project_b.update(&mut cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2815
2816        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2817        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2818            assert_eq!(
2819                params.text_document_position.text_document.uri.as_str(),
2820                "file:///root-1/one.rs"
2821            );
2822            Some(vec![
2823                lsp::Location {
2824                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2825                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2826                },
2827                lsp::Location {
2828                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2829                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2830                },
2831                lsp::Location {
2832                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2833                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2834                },
2835            ])
2836        });
2837
2838        let references = references.await.unwrap();
2839        cx_b.read(|cx| {
2840            assert_eq!(references.len(), 3);
2841            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2842
2843            let two_buffer = references[0].buffer.read(cx);
2844            let three_buffer = references[2].buffer.read(cx);
2845            assert_eq!(
2846                two_buffer.file().unwrap().path().as_ref(),
2847                Path::new("two.rs")
2848            );
2849            assert_eq!(references[1].buffer, references[0].buffer);
2850            assert_eq!(
2851                three_buffer.file().unwrap().full_path(cx),
2852                Path::new("three.rs")
2853            );
2854
2855            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2856            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2857            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2858        });
2859    }
2860
2861    #[gpui::test(iterations = 10)]
2862    async fn test_project_search(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2863        cx_a.foreground().forbid_parking();
2864        let lang_registry = Arc::new(LanguageRegistry::new());
2865        let fs = FakeFs::new(cx_a.background());
2866        fs.insert_tree(
2867            "/root-1",
2868            json!({
2869                ".zed.toml": r#"collaborators = ["user_b"]"#,
2870                "a": "hello world",
2871                "b": "goodnight moon",
2872                "c": "a world of goo",
2873                "d": "world champion of clown world",
2874            }),
2875        )
2876        .await;
2877        fs.insert_tree(
2878            "/root-2",
2879            json!({
2880                "e": "disney world is fun",
2881            }),
2882        )
2883        .await;
2884
2885        // Connect to a server as 2 clients.
2886        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2887        let client_a = server.create_client(&mut cx_a, "user_a").await;
2888        let client_b = server.create_client(&mut cx_b, "user_b").await;
2889
2890        // Share a project as client A
2891        let project_a = cx_a.update(|cx| {
2892            Project::local(
2893                client_a.clone(),
2894                client_a.user_store.clone(),
2895                lang_registry.clone(),
2896                fs.clone(),
2897                cx,
2898            )
2899        });
2900        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2901
2902        let (worktree_1, _) = project_a
2903            .update(&mut cx_a, |p, cx| {
2904                p.find_or_create_local_worktree("/root-1", false, cx)
2905            })
2906            .await
2907            .unwrap();
2908        worktree_1
2909            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2910            .await;
2911        let (worktree_2, _) = project_a
2912            .update(&mut cx_a, |p, cx| {
2913                p.find_or_create_local_worktree("/root-2", false, cx)
2914            })
2915            .await
2916            .unwrap();
2917        worktree_2
2918            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2919            .await;
2920
2921        eprintln!("sharing");
2922
2923        project_a
2924            .update(&mut cx_a, |p, cx| p.share(cx))
2925            .await
2926            .unwrap();
2927
2928        // Join the worktree as client B.
2929        let project_b = Project::remote(
2930            project_id,
2931            client_b.clone(),
2932            client_b.user_store.clone(),
2933            lang_registry.clone(),
2934            fs.clone(),
2935            &mut cx_b.to_async(),
2936        )
2937        .await
2938        .unwrap();
2939
2940        let results = project_b
2941            .update(&mut cx_b, |project, cx| {
2942                project.search(SearchQuery::text("world", false, false), cx)
2943            })
2944            .await
2945            .unwrap();
2946
2947        let mut ranges_by_path = results
2948            .into_iter()
2949            .map(|(buffer, ranges)| {
2950                buffer.read_with(&cx_b, |buffer, cx| {
2951                    let path = buffer.file().unwrap().full_path(cx);
2952                    let offset_ranges = ranges
2953                        .into_iter()
2954                        .map(|range| range.to_offset(buffer))
2955                        .collect::<Vec<_>>();
2956                    (path, offset_ranges)
2957                })
2958            })
2959            .collect::<Vec<_>>();
2960        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2961
2962        assert_eq!(
2963            ranges_by_path,
2964            &[
2965                (PathBuf::from("root-1/a"), vec![6..11]),
2966                (PathBuf::from("root-1/c"), vec![2..7]),
2967                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2968                (PathBuf::from("root-2/e"), vec![7..12]),
2969            ]
2970        );
2971    }
2972
2973    #[gpui::test(iterations = 10)]
2974    async fn test_document_highlights(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2975        cx_a.foreground().forbid_parking();
2976        let lang_registry = Arc::new(LanguageRegistry::new());
2977        let fs = FakeFs::new(cx_a.background());
2978        fs.insert_tree(
2979            "/root-1",
2980            json!({
2981                ".zed.toml": r#"collaborators = ["user_b"]"#,
2982                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2983            }),
2984        )
2985        .await;
2986
2987        // Set up a fake language server.
2988        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2989        lang_registry.add(Arc::new(Language::new(
2990            LanguageConfig {
2991                name: "Rust".into(),
2992                path_suffixes: vec!["rs".to_string()],
2993                language_server: Some(language_server_config),
2994                ..Default::default()
2995            },
2996            Some(tree_sitter_rust::language()),
2997        )));
2998
2999        // Connect to a server as 2 clients.
3000        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3001        let client_a = server.create_client(&mut cx_a, "user_a").await;
3002        let client_b = server.create_client(&mut cx_b, "user_b").await;
3003
3004        // Share a project as client A
3005        let project_a = cx_a.update(|cx| {
3006            Project::local(
3007                client_a.clone(),
3008                client_a.user_store.clone(),
3009                lang_registry.clone(),
3010                fs.clone(),
3011                cx,
3012            )
3013        });
3014        let (worktree_a, _) = project_a
3015            .update(&mut cx_a, |p, cx| {
3016                p.find_or_create_local_worktree("/root-1", false, cx)
3017            })
3018            .await
3019            .unwrap();
3020        worktree_a
3021            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3022            .await;
3023        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3024        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3025        project_a
3026            .update(&mut cx_a, |p, cx| p.share(cx))
3027            .await
3028            .unwrap();
3029
3030        // Join the worktree as client B.
3031        let project_b = Project::remote(
3032            project_id,
3033            client_b.clone(),
3034            client_b.user_store.clone(),
3035            lang_registry.clone(),
3036            fs.clone(),
3037            &mut cx_b.to_async(),
3038        )
3039        .await
3040        .unwrap();
3041
3042        // Open the file on client B.
3043        let buffer_b = cx_b
3044            .background()
3045            .spawn(project_b.update(&mut cx_b, |p, cx| {
3046                p.open_buffer((worktree_id, "main.rs"), cx)
3047            }))
3048            .await
3049            .unwrap();
3050
3051        // Request document highlights as the guest.
3052        let highlights =
3053            project_b.update(&mut cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
3054
3055        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3056        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
3057            |params, _| {
3058                assert_eq!(
3059                    params
3060                        .text_document_position_params
3061                        .text_document
3062                        .uri
3063                        .as_str(),
3064                    "file:///root-1/main.rs"
3065                );
3066                assert_eq!(
3067                    params.text_document_position_params.position,
3068                    lsp::Position::new(0, 34)
3069                );
3070                Some(vec![
3071                    lsp::DocumentHighlight {
3072                        kind: Some(lsp::DocumentHighlightKind::WRITE),
3073                        range: lsp::Range::new(
3074                            lsp::Position::new(0, 10),
3075                            lsp::Position::new(0, 16),
3076                        ),
3077                    },
3078                    lsp::DocumentHighlight {
3079                        kind: Some(lsp::DocumentHighlightKind::READ),
3080                        range: lsp::Range::new(
3081                            lsp::Position::new(0, 32),
3082                            lsp::Position::new(0, 38),
3083                        ),
3084                    },
3085                    lsp::DocumentHighlight {
3086                        kind: Some(lsp::DocumentHighlightKind::READ),
3087                        range: lsp::Range::new(
3088                            lsp::Position::new(0, 41),
3089                            lsp::Position::new(0, 47),
3090                        ),
3091                    },
3092                ])
3093            },
3094        );
3095
3096        let highlights = highlights.await.unwrap();
3097        buffer_b.read_with(&cx_b, |buffer, _| {
3098            let snapshot = buffer.snapshot();
3099
3100            let highlights = highlights
3101                .into_iter()
3102                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
3103                .collect::<Vec<_>>();
3104            assert_eq!(
3105                highlights,
3106                &[
3107                    (lsp::DocumentHighlightKind::WRITE, 10..16),
3108                    (lsp::DocumentHighlightKind::READ, 32..38),
3109                    (lsp::DocumentHighlightKind::READ, 41..47)
3110                ]
3111            )
3112        });
3113    }
3114
3115    #[gpui::test(iterations = 10)]
3116    async fn test_project_symbols(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3117        cx_a.foreground().forbid_parking();
3118        let mut lang_registry = Arc::new(LanguageRegistry::new());
3119        let fs = FakeFs::new(cx_a.background());
3120        fs.insert_tree(
3121            "/code",
3122            json!({
3123                "crate-1": {
3124                    ".zed.toml": r#"collaborators = ["user_b"]"#,
3125                    "one.rs": "const ONE: usize = 1;",
3126                },
3127                "crate-2": {
3128                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
3129                },
3130                "private": {
3131                    "passwords.txt": "the-password",
3132                }
3133            }),
3134        )
3135        .await;
3136
3137        // Set up a fake language server.
3138        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3139        Arc::get_mut(&mut lang_registry)
3140            .unwrap()
3141            .add(Arc::new(Language::new(
3142                LanguageConfig {
3143                    name: "Rust".into(),
3144                    path_suffixes: vec!["rs".to_string()],
3145                    language_server: Some(language_server_config),
3146                    ..Default::default()
3147                },
3148                Some(tree_sitter_rust::language()),
3149            )));
3150
3151        // Connect to a server as 2 clients.
3152        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3153        let client_a = server.create_client(&mut cx_a, "user_a").await;
3154        let client_b = server.create_client(&mut cx_b, "user_b").await;
3155
3156        // Share a project as client A
3157        let project_a = cx_a.update(|cx| {
3158            Project::local(
3159                client_a.clone(),
3160                client_a.user_store.clone(),
3161                lang_registry.clone(),
3162                fs.clone(),
3163                cx,
3164            )
3165        });
3166        let (worktree_a, _) = project_a
3167            .update(&mut cx_a, |p, cx| {
3168                p.find_or_create_local_worktree("/code/crate-1", false, cx)
3169            })
3170            .await
3171            .unwrap();
3172        worktree_a
3173            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3174            .await;
3175        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3176        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3177        project_a
3178            .update(&mut cx_a, |p, cx| p.share(cx))
3179            .await
3180            .unwrap();
3181
3182        // Join the worktree as client B.
3183        let project_b = Project::remote(
3184            project_id,
3185            client_b.clone(),
3186            client_b.user_store.clone(),
3187            lang_registry.clone(),
3188            fs.clone(),
3189            &mut cx_b.to_async(),
3190        )
3191        .await
3192        .unwrap();
3193
3194        // Cause the language server to start.
3195        let _buffer = cx_b
3196            .background()
3197            .spawn(project_b.update(&mut cx_b, |p, cx| {
3198                p.open_buffer((worktree_id, "one.rs"), cx)
3199            }))
3200            .await
3201            .unwrap();
3202
3203        // Request the definition of a symbol as the guest.
3204        let symbols = project_b.update(&mut cx_b, |p, cx| p.symbols("two", cx));
3205        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3206        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3207            #[allow(deprecated)]
3208            Some(vec![lsp::SymbolInformation {
3209                name: "TWO".into(),
3210                location: lsp::Location {
3211                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3212                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3213                },
3214                kind: lsp::SymbolKind::CONSTANT,
3215                tags: None,
3216                container_name: None,
3217                deprecated: None,
3218            }])
3219        });
3220
3221        let symbols = symbols.await.unwrap();
3222        assert_eq!(symbols.len(), 1);
3223        assert_eq!(symbols[0].name, "TWO");
3224
3225        // Open one of the returned symbols.
3226        let buffer_b_2 = project_b
3227            .update(&mut cx_b, |project, cx| {
3228                project.open_buffer_for_symbol(&symbols[0], cx)
3229            })
3230            .await
3231            .unwrap();
3232        buffer_b_2.read_with(&cx_b, |buffer, _| {
3233            assert_eq!(
3234                buffer.file().unwrap().path().as_ref(),
3235                Path::new("../crate-2/two.rs")
3236            );
3237        });
3238
3239        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3240        let mut fake_symbol = symbols[0].clone();
3241        fake_symbol.path = Path::new("/code/secrets").into();
3242        let error = project_b
3243            .update(&mut cx_b, |project, cx| {
3244                project.open_buffer_for_symbol(&fake_symbol, cx)
3245            })
3246            .await
3247            .unwrap_err();
3248        assert!(error.to_string().contains("invalid symbol signature"));
3249    }
3250
3251    #[gpui::test(iterations = 10)]
3252    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3253        mut cx_a: TestAppContext,
3254        mut cx_b: TestAppContext,
3255        mut rng: StdRng,
3256    ) {
3257        cx_a.foreground().forbid_parking();
3258        let mut lang_registry = Arc::new(LanguageRegistry::new());
3259        let fs = FakeFs::new(cx_a.background());
3260        fs.insert_tree(
3261            "/root",
3262            json!({
3263                ".zed.toml": r#"collaborators = ["user_b"]"#,
3264                "a.rs": "const ONE: usize = b::TWO;",
3265                "b.rs": "const TWO: usize = 2",
3266            }),
3267        )
3268        .await;
3269
3270        // Set up a fake language server.
3271        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3272
3273        Arc::get_mut(&mut lang_registry)
3274            .unwrap()
3275            .add(Arc::new(Language::new(
3276                LanguageConfig {
3277                    name: "Rust".into(),
3278                    path_suffixes: vec!["rs".to_string()],
3279                    language_server: Some(language_server_config),
3280                    ..Default::default()
3281                },
3282                Some(tree_sitter_rust::language()),
3283            )));
3284
3285        // Connect to a server as 2 clients.
3286        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3287        let client_a = server.create_client(&mut cx_a, "user_a").await;
3288        let client_b = server.create_client(&mut cx_b, "user_b").await;
3289
3290        // Share a project as client A
3291        let project_a = cx_a.update(|cx| {
3292            Project::local(
3293                client_a.clone(),
3294                client_a.user_store.clone(),
3295                lang_registry.clone(),
3296                fs.clone(),
3297                cx,
3298            )
3299        });
3300
3301        let (worktree_a, _) = project_a
3302            .update(&mut cx_a, |p, cx| {
3303                p.find_or_create_local_worktree("/root", false, cx)
3304            })
3305            .await
3306            .unwrap();
3307        worktree_a
3308            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3309            .await;
3310        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3311        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3312        project_a
3313            .update(&mut cx_a, |p, cx| p.share(cx))
3314            .await
3315            .unwrap();
3316
3317        // Join the worktree as client B.
3318        let project_b = Project::remote(
3319            project_id,
3320            client_b.clone(),
3321            client_b.user_store.clone(),
3322            lang_registry.clone(),
3323            fs.clone(),
3324            &mut cx_b.to_async(),
3325        )
3326        .await
3327        .unwrap();
3328
3329        let buffer_b1 = cx_b
3330            .background()
3331            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3332            .await
3333            .unwrap();
3334
3335        let definitions;
3336        let buffer_b2;
3337        if rng.gen() {
3338            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3339            buffer_b2 =
3340                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3341        } else {
3342            buffer_b2 =
3343                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3344            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3345        }
3346
3347        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3348        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3349            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3350                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3351                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3352            )))
3353        });
3354
3355        let buffer_b2 = buffer_b2.await.unwrap();
3356        let definitions = definitions.await.unwrap();
3357        assert_eq!(definitions.len(), 1);
3358        assert_eq!(definitions[0].buffer, buffer_b2);
3359    }
3360
3361    #[gpui::test(iterations = 10)]
3362    async fn test_collaborating_with_code_actions(
3363        mut cx_a: TestAppContext,
3364        mut cx_b: TestAppContext,
3365    ) {
3366        cx_a.foreground().forbid_parking();
3367        let mut lang_registry = Arc::new(LanguageRegistry::new());
3368        let fs = FakeFs::new(cx_a.background());
3369        let mut path_openers_b = Vec::new();
3370        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3371
3372        // Set up a fake language server.
3373        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3374        Arc::get_mut(&mut lang_registry)
3375            .unwrap()
3376            .add(Arc::new(Language::new(
3377                LanguageConfig {
3378                    name: "Rust".into(),
3379                    path_suffixes: vec!["rs".to_string()],
3380                    language_server: Some(language_server_config),
3381                    ..Default::default()
3382                },
3383                Some(tree_sitter_rust::language()),
3384            )));
3385
3386        // Connect to a server as 2 clients.
3387        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3388        let client_a = server.create_client(&mut cx_a, "user_a").await;
3389        let client_b = server.create_client(&mut cx_b, "user_b").await;
3390
3391        // Share a project as client A
3392        fs.insert_tree(
3393            "/a",
3394            json!({
3395                ".zed.toml": r#"collaborators = ["user_b"]"#,
3396                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3397                "other.rs": "pub fn foo() -> usize { 4 }",
3398            }),
3399        )
3400        .await;
3401        let project_a = cx_a.update(|cx| {
3402            Project::local(
3403                client_a.clone(),
3404                client_a.user_store.clone(),
3405                lang_registry.clone(),
3406                fs.clone(),
3407                cx,
3408            )
3409        });
3410        let (worktree_a, _) = project_a
3411            .update(&mut cx_a, |p, cx| {
3412                p.find_or_create_local_worktree("/a", false, cx)
3413            })
3414            .await
3415            .unwrap();
3416        worktree_a
3417            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3418            .await;
3419        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3420        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3421        project_a
3422            .update(&mut cx_a, |p, cx| p.share(cx))
3423            .await
3424            .unwrap();
3425
3426        // Join the worktree as client B.
3427        let project_b = Project::remote(
3428            project_id,
3429            client_b.clone(),
3430            client_b.user_store.clone(),
3431            lang_registry.clone(),
3432            fs.clone(),
3433            &mut cx_b.to_async(),
3434        )
3435        .await
3436        .unwrap();
3437        let mut params = cx_b.update(WorkspaceParams::test);
3438        params.languages = lang_registry.clone();
3439        params.client = client_b.client.clone();
3440        params.user_store = client_b.user_store.clone();
3441        params.project = project_b;
3442        params.path_openers = path_openers_b.into();
3443
3444        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3445        let editor_b = workspace_b
3446            .update(&mut cx_b, |workspace, cx| {
3447                workspace.open_path((worktree_id, "main.rs").into(), cx)
3448            })
3449            .await
3450            .unwrap()
3451            .downcast::<Editor>()
3452            .unwrap();
3453
3454        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3455        fake_language_server
3456            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3457                assert_eq!(
3458                    params.text_document.uri,
3459                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3460                );
3461                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3462                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3463                None
3464            })
3465            .next()
3466            .await;
3467
3468        // Move cursor to a location that contains code actions.
3469        editor_b.update(&mut cx_b, |editor, cx| {
3470            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3471            cx.focus(&editor_b);
3472        });
3473
3474        fake_language_server
3475            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3476                assert_eq!(
3477                    params.text_document.uri,
3478                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3479                );
3480                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3481                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3482
3483                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3484                    lsp::CodeAction {
3485                        title: "Inline into all callers".to_string(),
3486                        edit: Some(lsp::WorkspaceEdit {
3487                            changes: Some(
3488                                [
3489                                    (
3490                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3491                                        vec![lsp::TextEdit::new(
3492                                            lsp::Range::new(
3493                                                lsp::Position::new(1, 22),
3494                                                lsp::Position::new(1, 34),
3495                                            ),
3496                                            "4".to_string(),
3497                                        )],
3498                                    ),
3499                                    (
3500                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3501                                        vec![lsp::TextEdit::new(
3502                                            lsp::Range::new(
3503                                                lsp::Position::new(0, 0),
3504                                                lsp::Position::new(0, 27),
3505                                            ),
3506                                            "".to_string(),
3507                                        )],
3508                                    ),
3509                                ]
3510                                .into_iter()
3511                                .collect(),
3512                            ),
3513                            ..Default::default()
3514                        }),
3515                        data: Some(json!({
3516                            "codeActionParams": {
3517                                "range": {
3518                                    "start": {"line": 1, "column": 31},
3519                                    "end": {"line": 1, "column": 31},
3520                                }
3521                            }
3522                        })),
3523                        ..Default::default()
3524                    },
3525                )])
3526            })
3527            .next()
3528            .await;
3529
3530        // Toggle code actions and wait for them to display.
3531        editor_b.update(&mut cx_b, |editor, cx| {
3532            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3533        });
3534        editor_b
3535            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3536            .await;
3537
3538        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3539
3540        // Confirming the code action will trigger a resolve request.
3541        let confirm_action = workspace_b
3542            .update(&mut cx_b, |workspace, cx| {
3543                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3544            })
3545            .unwrap();
3546        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3547            lsp::CodeAction {
3548                title: "Inline into all callers".to_string(),
3549                edit: Some(lsp::WorkspaceEdit {
3550                    changes: Some(
3551                        [
3552                            (
3553                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3554                                vec![lsp::TextEdit::new(
3555                                    lsp::Range::new(
3556                                        lsp::Position::new(1, 22),
3557                                        lsp::Position::new(1, 34),
3558                                    ),
3559                                    "4".to_string(),
3560                                )],
3561                            ),
3562                            (
3563                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3564                                vec![lsp::TextEdit::new(
3565                                    lsp::Range::new(
3566                                        lsp::Position::new(0, 0),
3567                                        lsp::Position::new(0, 27),
3568                                    ),
3569                                    "".to_string(),
3570                                )],
3571                            ),
3572                        ]
3573                        .into_iter()
3574                        .collect(),
3575                    ),
3576                    ..Default::default()
3577                }),
3578                ..Default::default()
3579            }
3580        });
3581
3582        // After the action is confirmed, an editor containing both modified files is opened.
3583        confirm_action.await.unwrap();
3584        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3585            workspace
3586                .active_item(cx)
3587                .unwrap()
3588                .downcast::<Editor>()
3589                .unwrap()
3590        });
3591        code_action_editor.update(&mut cx_b, |editor, cx| {
3592            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3593            editor.undo(&Undo, cx);
3594            assert_eq!(
3595                editor.text(cx),
3596                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3597            );
3598            editor.redo(&Redo, cx);
3599            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3600        });
3601    }
3602
3603    #[gpui::test(iterations = 10)]
3604    async fn test_collaborating_with_renames(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3605        cx_a.foreground().forbid_parking();
3606        let mut lang_registry = Arc::new(LanguageRegistry::new());
3607        let fs = FakeFs::new(cx_a.background());
3608        let mut path_openers_b = Vec::new();
3609        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3610
3611        // Set up a fake language server.
3612        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3613        Arc::get_mut(&mut lang_registry)
3614            .unwrap()
3615            .add(Arc::new(Language::new(
3616                LanguageConfig {
3617                    name: "Rust".into(),
3618                    path_suffixes: vec!["rs".to_string()],
3619                    language_server: Some(language_server_config),
3620                    ..Default::default()
3621                },
3622                Some(tree_sitter_rust::language()),
3623            )));
3624
3625        // Connect to a server as 2 clients.
3626        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3627        let client_a = server.create_client(&mut cx_a, "user_a").await;
3628        let client_b = server.create_client(&mut cx_b, "user_b").await;
3629
3630        // Share a project as client A
3631        fs.insert_tree(
3632            "/dir",
3633            json!({
3634                ".zed.toml": r#"collaborators = ["user_b"]"#,
3635                "one.rs": "const ONE: usize = 1;",
3636                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3637            }),
3638        )
3639        .await;
3640        let project_a = cx_a.update(|cx| {
3641            Project::local(
3642                client_a.clone(),
3643                client_a.user_store.clone(),
3644                lang_registry.clone(),
3645                fs.clone(),
3646                cx,
3647            )
3648        });
3649        let (worktree_a, _) = project_a
3650            .update(&mut cx_a, |p, cx| {
3651                p.find_or_create_local_worktree("/dir", false, cx)
3652            })
3653            .await
3654            .unwrap();
3655        worktree_a
3656            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3657            .await;
3658        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
3659        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
3660        project_a
3661            .update(&mut cx_a, |p, cx| p.share(cx))
3662            .await
3663            .unwrap();
3664
3665        // Join the worktree as client B.
3666        let project_b = Project::remote(
3667            project_id,
3668            client_b.clone(),
3669            client_b.user_store.clone(),
3670            lang_registry.clone(),
3671            fs.clone(),
3672            &mut cx_b.to_async(),
3673        )
3674        .await
3675        .unwrap();
3676        let mut params = cx_b.update(WorkspaceParams::test);
3677        params.languages = lang_registry.clone();
3678        params.client = client_b.client.clone();
3679        params.user_store = client_b.user_store.clone();
3680        params.project = project_b;
3681        params.path_openers = path_openers_b.into();
3682
3683        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3684        let editor_b = workspace_b
3685            .update(&mut cx_b, |workspace, cx| {
3686                workspace.open_path((worktree_id, "one.rs").into(), cx)
3687            })
3688            .await
3689            .unwrap()
3690            .downcast::<Editor>()
3691            .unwrap();
3692        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3693
3694        // Move cursor to a location that can be renamed.
3695        let prepare_rename = editor_b.update(&mut cx_b, |editor, cx| {
3696            editor.select_ranges([7..7], None, cx);
3697            editor.rename(&Rename, cx).unwrap()
3698        });
3699
3700        fake_language_server
3701            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3702                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3703                assert_eq!(params.position, lsp::Position::new(0, 7));
3704                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3705                    lsp::Position::new(0, 6),
3706                    lsp::Position::new(0, 9),
3707                )))
3708            })
3709            .next()
3710            .await
3711            .unwrap();
3712        prepare_rename.await.unwrap();
3713        editor_b.update(&mut cx_b, |editor, cx| {
3714            let rename = editor.pending_rename().unwrap();
3715            let buffer = editor.buffer().read(cx).snapshot(cx);
3716            assert_eq!(
3717                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3718                6..9
3719            );
3720            rename.editor.update(cx, |rename_editor, cx| {
3721                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3722                    rename_buffer.edit([0..3], "THREE", cx);
3723                });
3724            });
3725        });
3726
3727        let confirm_rename = workspace_b.update(&mut cx_b, |workspace, cx| {
3728            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3729        });
3730        fake_language_server
3731            .handle_request::<lsp::request::Rename, _>(|params, _| {
3732                assert_eq!(
3733                    params.text_document_position.text_document.uri.as_str(),
3734                    "file:///dir/one.rs"
3735                );
3736                assert_eq!(
3737                    params.text_document_position.position,
3738                    lsp::Position::new(0, 6)
3739                );
3740                assert_eq!(params.new_name, "THREE");
3741                Some(lsp::WorkspaceEdit {
3742                    changes: Some(
3743                        [
3744                            (
3745                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3746                                vec![lsp::TextEdit::new(
3747                                    lsp::Range::new(
3748                                        lsp::Position::new(0, 6),
3749                                        lsp::Position::new(0, 9),
3750                                    ),
3751                                    "THREE".to_string(),
3752                                )],
3753                            ),
3754                            (
3755                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3756                                vec![
3757                                    lsp::TextEdit::new(
3758                                        lsp::Range::new(
3759                                            lsp::Position::new(0, 24),
3760                                            lsp::Position::new(0, 27),
3761                                        ),
3762                                        "THREE".to_string(),
3763                                    ),
3764                                    lsp::TextEdit::new(
3765                                        lsp::Range::new(
3766                                            lsp::Position::new(0, 35),
3767                                            lsp::Position::new(0, 38),
3768                                        ),
3769                                        "THREE".to_string(),
3770                                    ),
3771                                ],
3772                            ),
3773                        ]
3774                        .into_iter()
3775                        .collect(),
3776                    ),
3777                    ..Default::default()
3778                })
3779            })
3780            .next()
3781            .await
3782            .unwrap();
3783        confirm_rename.await.unwrap();
3784
3785        let rename_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
3786            workspace
3787                .active_item(cx)
3788                .unwrap()
3789                .downcast::<Editor>()
3790                .unwrap()
3791        });
3792        rename_editor.update(&mut cx_b, |editor, cx| {
3793            assert_eq!(
3794                editor.text(cx),
3795                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3796            );
3797            editor.undo(&Undo, cx);
3798            assert_eq!(
3799                editor.text(cx),
3800                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3801            );
3802            editor.redo(&Redo, cx);
3803            assert_eq!(
3804                editor.text(cx),
3805                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3806            );
3807        });
3808
3809        // Ensure temporary rename edits cannot be undone/redone.
3810        editor_b.update(&mut cx_b, |editor, cx| {
3811            editor.undo(&Undo, cx);
3812            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3813            editor.undo(&Undo, cx);
3814            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3815            editor.redo(&Redo, cx);
3816            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3817        })
3818    }
3819
3820    #[gpui::test(iterations = 10)]
3821    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3822        cx_a.foreground().forbid_parking();
3823
3824        // Connect to a server as 2 clients.
3825        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3826        let client_a = server.create_client(&mut cx_a, "user_a").await;
3827        let client_b = server.create_client(&mut cx_b, "user_b").await;
3828
3829        // Create an org that includes these 2 users.
3830        let db = &server.app_state.db;
3831        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3832        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3833            .await
3834            .unwrap();
3835        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3836            .await
3837            .unwrap();
3838
3839        // Create a channel that includes all the users.
3840        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3841        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3842            .await
3843            .unwrap();
3844        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3845            .await
3846            .unwrap();
3847        db.create_channel_message(
3848            channel_id,
3849            client_b.current_user_id(&cx_b),
3850            "hello A, it's B.",
3851            OffsetDateTime::now_utc(),
3852            1,
3853        )
3854        .await
3855        .unwrap();
3856
3857        let channels_a = cx_a
3858            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3859        channels_a
3860            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3861            .await;
3862        channels_a.read_with(&cx_a, |list, _| {
3863            assert_eq!(
3864                list.available_channels().unwrap(),
3865                &[ChannelDetails {
3866                    id: channel_id.to_proto(),
3867                    name: "test-channel".to_string()
3868                }]
3869            )
3870        });
3871        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3872            this.get_channel(channel_id.to_proto(), cx).unwrap()
3873        });
3874        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3875        channel_a
3876            .condition(&cx_a, |channel, _| {
3877                channel_messages(channel)
3878                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3879            })
3880            .await;
3881
3882        let channels_b = cx_b
3883            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3884        channels_b
3885            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3886            .await;
3887        channels_b.read_with(&cx_b, |list, _| {
3888            assert_eq!(
3889                list.available_channels().unwrap(),
3890                &[ChannelDetails {
3891                    id: channel_id.to_proto(),
3892                    name: "test-channel".to_string()
3893                }]
3894            )
3895        });
3896
3897        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3898            this.get_channel(channel_id.to_proto(), cx).unwrap()
3899        });
3900        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3901        channel_b
3902            .condition(&cx_b, |channel, _| {
3903                channel_messages(channel)
3904                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3905            })
3906            .await;
3907
3908        channel_a
3909            .update(&mut cx_a, |channel, cx| {
3910                channel
3911                    .send_message("oh, hi B.".to_string(), cx)
3912                    .unwrap()
3913                    .detach();
3914                let task = channel.send_message("sup".to_string(), cx).unwrap();
3915                assert_eq!(
3916                    channel_messages(channel),
3917                    &[
3918                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3919                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3920                        ("user_a".to_string(), "sup".to_string(), true)
3921                    ]
3922                );
3923                task
3924            })
3925            .await
3926            .unwrap();
3927
3928        channel_b
3929            .condition(&cx_b, |channel, _| {
3930                channel_messages(channel)
3931                    == [
3932                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3933                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3934                        ("user_a".to_string(), "sup".to_string(), false),
3935                    ]
3936            })
3937            .await;
3938
3939        assert_eq!(
3940            server
3941                .state()
3942                .await
3943                .channel(channel_id)
3944                .unwrap()
3945                .connection_ids
3946                .len(),
3947            2
3948        );
3949        cx_b.update(|_| drop(channel_b));
3950        server
3951            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3952            .await;
3953
3954        cx_a.update(|_| drop(channel_a));
3955        server
3956            .condition(|state| state.channel(channel_id).is_none())
3957            .await;
3958    }
3959
3960    #[gpui::test(iterations = 10)]
3961    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3962        cx_a.foreground().forbid_parking();
3963
3964        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3965        let client_a = server.create_client(&mut cx_a, "user_a").await;
3966
3967        let db = &server.app_state.db;
3968        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3969        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3970        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3971            .await
3972            .unwrap();
3973        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3974            .await
3975            .unwrap();
3976
3977        let channels_a = cx_a
3978            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3979        channels_a
3980            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3981            .await;
3982        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3983            this.get_channel(channel_id.to_proto(), cx).unwrap()
3984        });
3985
3986        // Messages aren't allowed to be too long.
3987        channel_a
3988            .update(&mut cx_a, |channel, cx| {
3989                let long_body = "this is long.\n".repeat(1024);
3990                channel.send_message(long_body, cx).unwrap()
3991            })
3992            .await
3993            .unwrap_err();
3994
3995        // Messages aren't allowed to be blank.
3996        channel_a.update(&mut cx_a, |channel, cx| {
3997            channel.send_message(String::new(), cx).unwrap_err()
3998        });
3999
4000        // Leading and trailing whitespace are trimmed.
4001        channel_a
4002            .update(&mut cx_a, |channel, cx| {
4003                channel
4004                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
4005                    .unwrap()
4006            })
4007            .await
4008            .unwrap();
4009        assert_eq!(
4010            db.get_channel_messages(channel_id, 10, None)
4011                .await
4012                .unwrap()
4013                .iter()
4014                .map(|m| &m.body)
4015                .collect::<Vec<_>>(),
4016            &["surrounded by whitespace"]
4017        );
4018    }
4019
4020    #[gpui::test(iterations = 10)]
4021    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
4022        cx_a.foreground().forbid_parking();
4023
4024        // Connect to a server as 2 clients.
4025        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4026        let client_a = server.create_client(&mut cx_a, "user_a").await;
4027        let client_b = server.create_client(&mut cx_b, "user_b").await;
4028        let mut status_b = client_b.status();
4029
4030        // Create an org that includes these 2 users.
4031        let db = &server.app_state.db;
4032        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
4033        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
4034            .await
4035            .unwrap();
4036        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
4037            .await
4038            .unwrap();
4039
4040        // Create a channel that includes all the users.
4041        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
4042        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
4043            .await
4044            .unwrap();
4045        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
4046            .await
4047            .unwrap();
4048        db.create_channel_message(
4049            channel_id,
4050            client_b.current_user_id(&cx_b),
4051            "hello A, it's B.",
4052            OffsetDateTime::now_utc(),
4053            2,
4054        )
4055        .await
4056        .unwrap();
4057
4058        let channels_a = cx_a
4059            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
4060        channels_a
4061            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
4062            .await;
4063
4064        channels_a.read_with(&cx_a, |list, _| {
4065            assert_eq!(
4066                list.available_channels().unwrap(),
4067                &[ChannelDetails {
4068                    id: channel_id.to_proto(),
4069                    name: "test-channel".to_string()
4070                }]
4071            )
4072        });
4073        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
4074            this.get_channel(channel_id.to_proto(), cx).unwrap()
4075        });
4076        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
4077        channel_a
4078            .condition(&cx_a, |channel, _| {
4079                channel_messages(channel)
4080                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4081            })
4082            .await;
4083
4084        let channels_b = cx_b
4085            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
4086        channels_b
4087            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
4088            .await;
4089        channels_b.read_with(&cx_b, |list, _| {
4090            assert_eq!(
4091                list.available_channels().unwrap(),
4092                &[ChannelDetails {
4093                    id: channel_id.to_proto(),
4094                    name: "test-channel".to_string()
4095                }]
4096            )
4097        });
4098
4099        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
4100            this.get_channel(channel_id.to_proto(), cx).unwrap()
4101        });
4102        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
4103        channel_b
4104            .condition(&cx_b, |channel, _| {
4105                channel_messages(channel)
4106                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4107            })
4108            .await;
4109
4110        // Disconnect client B, ensuring we can still access its cached channel data.
4111        server.forbid_connections();
4112        server.disconnect_client(client_b.current_user_id(&cx_b));
4113        while !matches!(
4114            status_b.next().await,
4115            Some(client::Status::ReconnectionError { .. })
4116        ) {}
4117
4118        channels_b.read_with(&cx_b, |channels, _| {
4119            assert_eq!(
4120                channels.available_channels().unwrap(),
4121                [ChannelDetails {
4122                    id: channel_id.to_proto(),
4123                    name: "test-channel".to_string()
4124                }]
4125            )
4126        });
4127        channel_b.read_with(&cx_b, |channel, _| {
4128            assert_eq!(
4129                channel_messages(channel),
4130                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
4131            )
4132        });
4133
4134        // Send a message from client B while it is disconnected.
4135        channel_b
4136            .update(&mut cx_b, |channel, cx| {
4137                let task = channel
4138                    .send_message("can you see this?".to_string(), cx)
4139                    .unwrap();
4140                assert_eq!(
4141                    channel_messages(channel),
4142                    &[
4143                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4144                        ("user_b".to_string(), "can you see this?".to_string(), true)
4145                    ]
4146                );
4147                task
4148            })
4149            .await
4150            .unwrap_err();
4151
4152        // Send a message from client A while B is disconnected.
4153        channel_a
4154            .update(&mut cx_a, |channel, cx| {
4155                channel
4156                    .send_message("oh, hi B.".to_string(), cx)
4157                    .unwrap()
4158                    .detach();
4159                let task = channel.send_message("sup".to_string(), cx).unwrap();
4160                assert_eq!(
4161                    channel_messages(channel),
4162                    &[
4163                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4164                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4165                        ("user_a".to_string(), "sup".to_string(), true)
4166                    ]
4167                );
4168                task
4169            })
4170            .await
4171            .unwrap();
4172
4173        // Give client B a chance to reconnect.
4174        server.allow_connections();
4175        cx_b.foreground().advance_clock(Duration::from_secs(10));
4176
4177        // Verify that B sees the new messages upon reconnection, as well as the message client B
4178        // sent while offline.
4179        channel_b
4180            .condition(&cx_b, |channel, _| {
4181                channel_messages(channel)
4182                    == [
4183                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4184                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4185                        ("user_a".to_string(), "sup".to_string(), false),
4186                        ("user_b".to_string(), "can you see this?".to_string(), false),
4187                    ]
4188            })
4189            .await;
4190
4191        // Ensure client A and B can communicate normally after reconnection.
4192        channel_a
4193            .update(&mut cx_a, |channel, cx| {
4194                channel.send_message("you online?".to_string(), cx).unwrap()
4195            })
4196            .await
4197            .unwrap();
4198        channel_b
4199            .condition(&cx_b, |channel, _| {
4200                channel_messages(channel)
4201                    == [
4202                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4203                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4204                        ("user_a".to_string(), "sup".to_string(), false),
4205                        ("user_b".to_string(), "can you see this?".to_string(), false),
4206                        ("user_a".to_string(), "you online?".to_string(), false),
4207                    ]
4208            })
4209            .await;
4210
4211        channel_b
4212            .update(&mut cx_b, |channel, cx| {
4213                channel.send_message("yep".to_string(), cx).unwrap()
4214            })
4215            .await
4216            .unwrap();
4217        channel_a
4218            .condition(&cx_a, |channel, _| {
4219                channel_messages(channel)
4220                    == [
4221                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4222                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4223                        ("user_a".to_string(), "sup".to_string(), false),
4224                        ("user_b".to_string(), "can you see this?".to_string(), false),
4225                        ("user_a".to_string(), "you online?".to_string(), false),
4226                        ("user_b".to_string(), "yep".to_string(), false),
4227                    ]
4228            })
4229            .await;
4230    }
4231
4232    #[gpui::test(iterations = 10)]
4233    async fn test_contacts(
4234        mut cx_a: TestAppContext,
4235        mut cx_b: TestAppContext,
4236        mut cx_c: TestAppContext,
4237    ) {
4238        cx_a.foreground().forbid_parking();
4239        let lang_registry = Arc::new(LanguageRegistry::new());
4240        let fs = FakeFs::new(cx_a.background());
4241
4242        // Connect to a server as 3 clients.
4243        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4244        let client_a = server.create_client(&mut cx_a, "user_a").await;
4245        let client_b = server.create_client(&mut cx_b, "user_b").await;
4246        let client_c = server.create_client(&mut cx_c, "user_c").await;
4247
4248        // Share a worktree as client A.
4249        fs.insert_tree(
4250            "/a",
4251            json!({
4252                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4253            }),
4254        )
4255        .await;
4256
4257        let project_a = cx_a.update(|cx| {
4258            Project::local(
4259                client_a.clone(),
4260                client_a.user_store.clone(),
4261                lang_registry.clone(),
4262                fs.clone(),
4263                cx,
4264            )
4265        });
4266        let (worktree_a, _) = project_a
4267            .update(&mut cx_a, |p, cx| {
4268                p.find_or_create_local_worktree("/a", false, cx)
4269            })
4270            .await
4271            .unwrap();
4272        worktree_a
4273            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4274            .await;
4275
4276        client_a
4277            .user_store
4278            .condition(&cx_a, |user_store, _| {
4279                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4280            })
4281            .await;
4282        client_b
4283            .user_store
4284            .condition(&cx_b, |user_store, _| {
4285                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4286            })
4287            .await;
4288        client_c
4289            .user_store
4290            .condition(&cx_c, |user_store, _| {
4291                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4292            })
4293            .await;
4294
4295        let project_id = project_a
4296            .update(&mut cx_a, |project, _| project.next_remote_id())
4297            .await;
4298        project_a
4299            .update(&mut cx_a, |project, cx| project.share(cx))
4300            .await
4301            .unwrap();
4302
4303        let _project_b = Project::remote(
4304            project_id,
4305            client_b.clone(),
4306            client_b.user_store.clone(),
4307            lang_registry.clone(),
4308            fs.clone(),
4309            &mut cx_b.to_async(),
4310        )
4311        .await
4312        .unwrap();
4313
4314        client_a
4315            .user_store
4316            .condition(&cx_a, |user_store, _| {
4317                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4318            })
4319            .await;
4320        client_b
4321            .user_store
4322            .condition(&cx_b, |user_store, _| {
4323                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4324            })
4325            .await;
4326        client_c
4327            .user_store
4328            .condition(&cx_c, |user_store, _| {
4329                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4330            })
4331            .await;
4332
4333        project_a
4334            .condition(&cx_a, |project, _| {
4335                project.collaborators().contains_key(&client_b.peer_id)
4336            })
4337            .await;
4338
4339        cx_a.update(move |_| drop(project_a));
4340        client_a
4341            .user_store
4342            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4343            .await;
4344        client_b
4345            .user_store
4346            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4347            .await;
4348        client_c
4349            .user_store
4350            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4351            .await;
4352
4353        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4354            user_store
4355                .contacts()
4356                .iter()
4357                .map(|contact| {
4358                    let worktrees = contact
4359                        .projects
4360                        .iter()
4361                        .map(|p| {
4362                            (
4363                                p.worktree_root_names[0].as_str(),
4364                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4365                            )
4366                        })
4367                        .collect();
4368                    (contact.user.github_login.as_str(), worktrees)
4369                })
4370                .collect()
4371        }
4372    }
4373
4374    #[gpui::test(iterations = 100)]
4375    async fn test_random_collaboration(cx: TestAppContext, rng: StdRng) {
4376        cx.foreground().forbid_parking();
4377        let max_peers = env::var("MAX_PEERS")
4378            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4379            .unwrap_or(5);
4380        let max_operations = env::var("OPERATIONS")
4381            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4382            .unwrap_or(10);
4383
4384        let rng = Arc::new(Mutex::new(rng));
4385
4386        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4387        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4388
4389        let fs = FakeFs::new(cx.background());
4390        fs.insert_tree(
4391            "/_collab",
4392            json!({
4393                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4394            }),
4395        )
4396        .await;
4397
4398        let operations = Rc::new(Cell::new(0));
4399        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4400        let mut clients = Vec::new();
4401
4402        let mut next_entity_id = 100000;
4403        let mut host_cx = TestAppContext::new(
4404            cx.foreground_platform(),
4405            cx.platform(),
4406            cx.foreground(),
4407            cx.background(),
4408            cx.font_cache(),
4409            next_entity_id,
4410        );
4411        let host = server.create_client(&mut host_cx, "host").await;
4412        let host_project = host_cx.update(|cx| {
4413            Project::local(
4414                host.client.clone(),
4415                host.user_store.clone(),
4416                Arc::new(LanguageRegistry::new()),
4417                fs.clone(),
4418                cx,
4419            )
4420        });
4421        let host_project_id = host_project
4422            .update(&mut host_cx, |p, _| p.next_remote_id())
4423            .await;
4424
4425        let (collab_worktree, _) = host_project
4426            .update(&mut host_cx, |project, cx| {
4427                project.find_or_create_local_worktree("/_collab", false, cx)
4428            })
4429            .await
4430            .unwrap();
4431        collab_worktree
4432            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4433            .await;
4434        host_project
4435            .update(&mut host_cx, |project, cx| project.share(cx))
4436            .await
4437            .unwrap();
4438
4439        clients.push(cx.foreground().spawn(host.simulate_host(
4440            host_project.clone(),
4441            language_server_config,
4442            operations.clone(),
4443            max_operations,
4444            rng.clone(),
4445            host_cx.clone(),
4446        )));
4447
4448        while operations.get() < max_operations {
4449            cx.background().simulate_random_delay().await;
4450            if clients.len() < max_peers && rng.lock().gen_bool(0.05) {
4451                operations.set(operations.get() + 1);
4452
4453                let guest_id = clients.len();
4454                log::info!("Adding guest {}", guest_id);
4455                next_entity_id += 100000;
4456                let mut guest_cx = TestAppContext::new(
4457                    cx.foreground_platform(),
4458                    cx.platform(),
4459                    cx.foreground(),
4460                    cx.background(),
4461                    cx.font_cache(),
4462                    next_entity_id,
4463                );
4464                let guest = server
4465                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4466                    .await;
4467                let guest_project = Project::remote(
4468                    host_project_id,
4469                    guest.client.clone(),
4470                    guest.user_store.clone(),
4471                    guest_lang_registry.clone(),
4472                    fs.clone(),
4473                    &mut guest_cx.to_async(),
4474                )
4475                .await
4476                .unwrap();
4477                clients.push(cx.foreground().spawn(guest.simulate_guest(
4478                    guest_id,
4479                    guest_project,
4480                    operations.clone(),
4481                    max_operations,
4482                    rng.clone(),
4483                    guest_cx,
4484                )));
4485
4486                log::info!("Guest {} added", guest_id);
4487            }
4488        }
4489
4490        let clients = futures::future::join_all(clients).await;
4491        cx.foreground().run_until_parked();
4492
4493        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4494            project
4495                .worktrees(cx)
4496                .map(|worktree| {
4497                    let snapshot = worktree.read(cx).snapshot();
4498                    (snapshot.id(), snapshot)
4499                })
4500                .collect::<BTreeMap<_, _>>()
4501        });
4502
4503        for (guest_client, guest_cx) in clients.iter().skip(1) {
4504            let guest_id = guest_client.client.id();
4505            let worktree_snapshots =
4506                guest_client
4507                    .project
4508                    .as_ref()
4509                    .unwrap()
4510                    .read_with(guest_cx, |project, cx| {
4511                        project
4512                            .worktrees(cx)
4513                            .map(|worktree| {
4514                                let worktree = worktree.read(cx);
4515                                (worktree.id(), worktree.snapshot())
4516                            })
4517                            .collect::<BTreeMap<_, _>>()
4518                    });
4519
4520            assert_eq!(
4521                worktree_snapshots.keys().collect::<Vec<_>>(),
4522                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4523                "guest {} has different worktrees than the host",
4524                guest_id
4525            );
4526            for (id, host_snapshot) in &host_worktree_snapshots {
4527                let guest_snapshot = &worktree_snapshots[id];
4528                assert_eq!(
4529                    guest_snapshot.root_name(),
4530                    host_snapshot.root_name(),
4531                    "guest {} has different root name than the host for worktree {}",
4532                    guest_id,
4533                    id
4534                );
4535                assert_eq!(
4536                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4537                    host_snapshot.entries(false).collect::<Vec<_>>(),
4538                    "guest {} has different snapshot than the host for worktree {}",
4539                    guest_id,
4540                    id
4541                );
4542            }
4543
4544            guest_client
4545                .project
4546                .as_ref()
4547                .unwrap()
4548                .read_with(guest_cx, |project, _| {
4549                    assert!(
4550                        !project.has_buffered_operations(),
4551                        "guest {} has buffered operations ",
4552                        guest_id,
4553                    );
4554                });
4555
4556            for guest_buffer in &guest_client.buffers {
4557                let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id());
4558                let host_buffer = host_project.read_with(&host_cx, |project, _| {
4559                    project
4560                        .shared_buffer(guest_client.peer_id, buffer_id)
4561                        .expect(&format!(
4562                            "host doest not have buffer for guest:{}, peer:{}, id:{}",
4563                            guest_id, guest_client.peer_id, buffer_id
4564                        ))
4565                });
4566                assert_eq!(
4567                    guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()),
4568                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4569                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4570                    guest_id,
4571                    buffer_id,
4572                    host_buffer
4573                        .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx))
4574                );
4575            }
4576        }
4577    }
4578
4579    struct TestServer {
4580        peer: Arc<Peer>,
4581        app_state: Arc<AppState>,
4582        server: Arc<Server>,
4583        foreground: Rc<executor::Foreground>,
4584        notifications: mpsc::UnboundedReceiver<()>,
4585        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
4586        forbid_connections: Arc<AtomicBool>,
4587        _test_db: TestDb,
4588    }
4589
4590    impl TestServer {
4591        async fn start(
4592            foreground: Rc<executor::Foreground>,
4593            background: Arc<executor::Background>,
4594        ) -> Self {
4595            let test_db = TestDb::fake(background);
4596            let app_state = Self::build_app_state(&test_db).await;
4597            let peer = Peer::new();
4598            let notifications = mpsc::unbounded();
4599            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4600            Self {
4601                peer,
4602                app_state,
4603                server,
4604                foreground,
4605                notifications: notifications.1,
4606                connection_killers: Default::default(),
4607                forbid_connections: Default::default(),
4608                _test_db: test_db,
4609            }
4610        }
4611
4612        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4613            let http = FakeHttpClient::with_404_response();
4614            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4615            let client_name = name.to_string();
4616            let mut client = Client::new(http.clone());
4617            let server = self.server.clone();
4618            let connection_killers = self.connection_killers.clone();
4619            let forbid_connections = self.forbid_connections.clone();
4620            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4621
4622            Arc::get_mut(&mut client)
4623                .unwrap()
4624                .override_authenticate(move |cx| {
4625                    cx.spawn(|_| async move {
4626                        let access_token = "the-token".to_string();
4627                        Ok(Credentials {
4628                            user_id: user_id.0 as u64,
4629                            access_token,
4630                        })
4631                    })
4632                })
4633                .override_establish_connection(move |credentials, cx| {
4634                    assert_eq!(credentials.user_id, user_id.0 as u64);
4635                    assert_eq!(credentials.access_token, "the-token");
4636
4637                    let server = server.clone();
4638                    let connection_killers = connection_killers.clone();
4639                    let forbid_connections = forbid_connections.clone();
4640                    let client_name = client_name.clone();
4641                    let connection_id_tx = connection_id_tx.clone();
4642                    cx.spawn(move |cx| async move {
4643                        if forbid_connections.load(SeqCst) {
4644                            Err(EstablishConnectionError::other(anyhow!(
4645                                "server is forbidding connections"
4646                            )))
4647                        } else {
4648                            let (client_conn, server_conn, kill_conn) =
4649                                Connection::in_memory(cx.background());
4650                            connection_killers.lock().insert(user_id, kill_conn);
4651                            cx.background()
4652                                .spawn(server.handle_connection(
4653                                    server_conn,
4654                                    client_name,
4655                                    user_id,
4656                                    Some(connection_id_tx),
4657                                    cx.background(),
4658                                ))
4659                                .detach();
4660                            Ok(client_conn)
4661                        }
4662                    })
4663                });
4664
4665            client
4666                .authenticate_and_connect(&cx.to_async())
4667                .await
4668                .unwrap();
4669
4670            Channel::init(&client);
4671            Project::init(&client);
4672
4673            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4674            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4675            let mut authed_user =
4676                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4677            while authed_user.next().await.unwrap().is_none() {}
4678
4679            TestClient {
4680                client,
4681                peer_id,
4682                user_store,
4683                project: Default::default(),
4684                buffers: Default::default(),
4685            }
4686        }
4687
4688        fn disconnect_client(&self, user_id: UserId) {
4689            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
4690                let _ = kill_conn.try_send(Some(()));
4691            }
4692        }
4693
4694        fn forbid_connections(&self) {
4695            self.forbid_connections.store(true, SeqCst);
4696        }
4697
4698        fn allow_connections(&self) {
4699            self.forbid_connections.store(false, SeqCst);
4700        }
4701
4702        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4703            let mut config = Config::default();
4704            config.session_secret = "a".repeat(32);
4705            config.database_url = test_db.url.clone();
4706            let github_client = github::AppClient::test();
4707            Arc::new(AppState {
4708                db: test_db.db().clone(),
4709                handlebars: Default::default(),
4710                auth_client: auth::build_client("", ""),
4711                repo_client: github::RepoClient::test(&github_client),
4712                github_client,
4713                config,
4714            })
4715        }
4716
4717        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4718            self.server.store.read()
4719        }
4720
4721        async fn condition<F>(&mut self, mut predicate: F)
4722        where
4723            F: FnMut(&Store) -> bool,
4724        {
4725            async_std::future::timeout(Duration::from_millis(500), async {
4726                while !(predicate)(&*self.server.store.read()) {
4727                    self.foreground.start_waiting();
4728                    self.notifications.next().await;
4729                    self.foreground.finish_waiting();
4730                }
4731            })
4732            .await
4733            .expect("condition timed out");
4734        }
4735    }
4736
4737    impl Drop for TestServer {
4738        fn drop(&mut self) {
4739            self.peer.reset();
4740        }
4741    }
4742
4743    struct TestClient {
4744        client: Arc<Client>,
4745        pub peer_id: PeerId,
4746        pub user_store: ModelHandle<UserStore>,
4747        project: Option<ModelHandle<Project>>,
4748        buffers: HashSet<ModelHandle<zed::language::Buffer>>,
4749    }
4750
4751    impl Deref for TestClient {
4752        type Target = Arc<Client>;
4753
4754        fn deref(&self) -> &Self::Target {
4755            &self.client
4756        }
4757    }
4758
4759    impl TestClient {
4760        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4761            UserId::from_proto(
4762                self.user_store
4763                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4764            )
4765        }
4766
4767        fn simulate_host(
4768            mut self,
4769            project: ModelHandle<Project>,
4770            mut language_server_config: LanguageServerConfig,
4771            operations: Rc<Cell<usize>>,
4772            max_operations: usize,
4773            rng: Arc<Mutex<StdRng>>,
4774            mut cx: TestAppContext,
4775        ) -> impl Future<Output = (Self, TestAppContext)> {
4776            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4777
4778            // Set up a fake language server.
4779            language_server_config.set_fake_initializer({
4780                let rng = rng.clone();
4781                let files = files.clone();
4782                let project = project.clone();
4783                move |fake_server| {
4784                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4785                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4786                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4787                                range: lsp::Range::new(
4788                                    lsp::Position::new(0, 0),
4789                                    lsp::Position::new(0, 0),
4790                                ),
4791                                new_text: "the-new-text".to_string(),
4792                            })),
4793                            ..Default::default()
4794                        }]))
4795                    });
4796
4797                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4798                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4799                            lsp::CodeAction {
4800                                title: "the-code-action".to_string(),
4801                                ..Default::default()
4802                            },
4803                        )])
4804                    });
4805
4806                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4807                        |params, _| {
4808                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4809                                params.position,
4810                                params.position,
4811                            )))
4812                        },
4813                    );
4814
4815                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4816                        let files = files.clone();
4817                        let rng = rng.clone();
4818                        move |_, _| {
4819                            let files = files.lock();
4820                            let mut rng = rng.lock();
4821                            let count = rng.gen_range::<usize, _>(1..3);
4822                            let files = (0..count)
4823                                .map(|_| files.choose(&mut *rng).unwrap())
4824                                .collect::<Vec<_>>();
4825                            log::info!("LSP: Returning definitions in files {:?}", &files);
4826                            Some(lsp::GotoDefinitionResponse::Array(
4827                                files
4828                                    .into_iter()
4829                                    .map(|file| lsp::Location {
4830                                        uri: lsp::Url::from_file_path(file).unwrap(),
4831                                        range: Default::default(),
4832                                    })
4833                                    .collect(),
4834                            ))
4835                        }
4836                    });
4837
4838                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4839                        let rng = rng.clone();
4840                        let project = project.clone();
4841                        move |params, mut cx| {
4842                            project.update(&mut cx, |project, cx| {
4843                                let path = params
4844                                    .text_document_position_params
4845                                    .text_document
4846                                    .uri
4847                                    .to_file_path()
4848                                    .unwrap();
4849                                let (worktree, relative_path) =
4850                                    project.find_local_worktree(&path, cx)?;
4851                                let project_path =
4852                                    ProjectPath::from((worktree.read(cx).id(), relative_path));
4853                                let buffer = project.get_open_buffer(&project_path, cx)?.read(cx);
4854
4855                                let mut highlights = Vec::new();
4856                                let highlight_count = rng.lock().gen_range(1..=5);
4857                                let mut prev_end = 0;
4858                                for _ in 0..highlight_count {
4859                                    let range =
4860                                        buffer.random_byte_range(prev_end, &mut *rng.lock());
4861                                    let start =
4862                                        buffer.offset_to_point_utf16(range.start).to_lsp_position();
4863                                    let end =
4864                                        buffer.offset_to_point_utf16(range.end).to_lsp_position();
4865                                    highlights.push(lsp::DocumentHighlight {
4866                                        range: lsp::Range::new(start, end),
4867                                        kind: Some(lsp::DocumentHighlightKind::READ),
4868                                    });
4869                                    prev_end = range.end;
4870                                }
4871                                Some(highlights)
4872                            })
4873                        }
4874                    });
4875                }
4876            });
4877
4878            project.update(&mut cx, |project, _| {
4879                project.languages().add(Arc::new(Language::new(
4880                    LanguageConfig {
4881                        name: "Rust".into(),
4882                        path_suffixes: vec!["rs".to_string()],
4883                        language_server: Some(language_server_config),
4884                        ..Default::default()
4885                    },
4886                    None,
4887                )));
4888            });
4889
4890            async move {
4891                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4892                while operations.get() < max_operations {
4893                    operations.set(operations.get() + 1);
4894
4895                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4896                    match distribution {
4897                        0..=20 if !files.lock().is_empty() => {
4898                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4899                            let mut path = path.as_path();
4900                            while let Some(parent_path) = path.parent() {
4901                                path = parent_path;
4902                                if rng.lock().gen() {
4903                                    break;
4904                                }
4905                            }
4906
4907                            log::info!("Host: find/create local worktree {:?}", path);
4908                            project
4909                                .update(&mut cx, |project, cx| {
4910                                    project.find_or_create_local_worktree(path, false, cx)
4911                                })
4912                                .await
4913                                .unwrap();
4914                        }
4915                        10..=80 if !files.lock().is_empty() => {
4916                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4917                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4918                                let (worktree, path) = project
4919                                    .update(&mut cx, |project, cx| {
4920                                        project.find_or_create_local_worktree(
4921                                            file.clone(),
4922                                            false,
4923                                            cx,
4924                                        )
4925                                    })
4926                                    .await
4927                                    .unwrap();
4928                                let project_path =
4929                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4930                                log::info!("Host: opening path {:?}, {:?}", file, project_path);
4931                                let buffer = project
4932                                    .update(&mut cx, |project, cx| {
4933                                        project.open_buffer(project_path, cx)
4934                                    })
4935                                    .await
4936                                    .unwrap();
4937                                self.buffers.insert(buffer.clone());
4938                                buffer
4939                            } else {
4940                                self.buffers
4941                                    .iter()
4942                                    .choose(&mut *rng.lock())
4943                                    .unwrap()
4944                                    .clone()
4945                            };
4946
4947                            if rng.lock().gen_bool(0.1) {
4948                                cx.update(|cx| {
4949                                    log::info!(
4950                                        "Host: dropping buffer {:?}",
4951                                        buffer.read(cx).file().unwrap().full_path(cx)
4952                                    );
4953                                    self.buffers.remove(&buffer);
4954                                    drop(buffer);
4955                                });
4956                            } else {
4957                                buffer.update(&mut cx, |buffer, cx| {
4958                                    log::info!(
4959                                        "Host: updating buffer {:?}",
4960                                        buffer.file().unwrap().full_path(cx)
4961                                    );
4962                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4963                                });
4964                            }
4965                        }
4966                        _ => loop {
4967                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4968                            let mut path = PathBuf::new();
4969                            path.push("/");
4970                            for _ in 0..path_component_count {
4971                                let letter = rng.lock().gen_range(b'a'..=b'z');
4972                                path.push(std::str::from_utf8(&[letter]).unwrap());
4973                            }
4974                            path.set_extension("rs");
4975                            let parent_path = path.parent().unwrap();
4976
4977                            log::info!("Host: creating file {:?}", path,);
4978
4979                            if fs.create_dir(&parent_path).await.is_ok()
4980                                && fs.create_file(&path, Default::default()).await.is_ok()
4981                            {
4982                                files.lock().push(path);
4983                                break;
4984                            } else {
4985                                log::info!("Host: cannot create file");
4986                            }
4987                        },
4988                    }
4989
4990                    cx.background().simulate_random_delay().await;
4991                }
4992
4993                log::info!("Host done");
4994
4995                self.project = Some(project);
4996                (self, cx)
4997            }
4998        }
4999
5000        pub async fn simulate_guest(
5001            mut self,
5002            guest_id: usize,
5003            project: ModelHandle<Project>,
5004            operations: Rc<Cell<usize>>,
5005            max_operations: usize,
5006            rng: Arc<Mutex<StdRng>>,
5007            mut cx: TestAppContext,
5008        ) -> (Self, TestAppContext) {
5009            while operations.get() < max_operations {
5010                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
5011                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
5012                        project
5013                            .worktrees(&cx)
5014                            .filter(|worktree| {
5015                                let worktree = worktree.read(cx);
5016                                !worktree.is_weak() && worktree.entries(false).any(|e| e.is_file())
5017                            })
5018                            .choose(&mut *rng.lock())
5019                    }) {
5020                        worktree
5021                    } else {
5022                        cx.background().simulate_random_delay().await;
5023                        continue;
5024                    };
5025
5026                    operations.set(operations.get() + 1);
5027                    let (worktree_root_name, project_path) =
5028                        worktree.read_with(&cx, |worktree, _| {
5029                            let entry = worktree
5030                                .entries(false)
5031                                .filter(|e| e.is_file())
5032                                .choose(&mut *rng.lock())
5033                                .unwrap();
5034                            (
5035                                worktree.root_name().to_string(),
5036                                (worktree.id(), entry.path.clone()),
5037                            )
5038                        });
5039                    log::info!(
5040                        "Guest {}: opening path in worktree {:?} {:?} {:?}",
5041                        guest_id,
5042                        project_path.0,
5043                        worktree_root_name,
5044                        project_path.1
5045                    );
5046                    let buffer = project
5047                        .update(&mut cx, |project, cx| project.open_buffer(project_path, cx))
5048                        .await
5049                        .unwrap();
5050                    self.buffers.insert(buffer.clone());
5051                    buffer
5052                } else {
5053                    operations.set(operations.get() + 1);
5054
5055                    self.buffers
5056                        .iter()
5057                        .choose(&mut *rng.lock())
5058                        .unwrap()
5059                        .clone()
5060                };
5061
5062                let choice = rng.lock().gen_range(0..100);
5063                match choice {
5064                    0..=9 => {
5065                        cx.update(|cx| {
5066                            log::info!(
5067                                "Guest {}: dropping buffer {:?}",
5068                                guest_id,
5069                                buffer.read(cx).file().unwrap().full_path(cx)
5070                            );
5071                            self.buffers.remove(&buffer);
5072                            drop(buffer);
5073                        });
5074                    }
5075                    10..=19 => {
5076                        let completions = project.update(&mut cx, |project, cx| {
5077                            log::info!(
5078                                "Guest {}: requesting completions for buffer {:?}",
5079                                guest_id,
5080                                buffer.read(cx).file().unwrap().full_path(cx)
5081                            );
5082                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5083                            project.completions(&buffer, offset, cx)
5084                        });
5085                        let completions = cx.background().spawn(async move {
5086                            completions.await.expect("completions request failed");
5087                        });
5088                        if rng.lock().gen_bool(0.3) {
5089                            log::info!("Guest {}: detaching completions request", guest_id);
5090                            completions.detach();
5091                        } else {
5092                            completions.await;
5093                        }
5094                    }
5095                    20..=29 => {
5096                        let code_actions = project.update(&mut cx, |project, cx| {
5097                            log::info!(
5098                                "Guest {}: requesting code actions for buffer {:?}",
5099                                guest_id,
5100                                buffer.read(cx).file().unwrap().full_path(cx)
5101                            );
5102                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
5103                            project.code_actions(&buffer, range, cx)
5104                        });
5105                        let code_actions = cx.background().spawn(async move {
5106                            code_actions.await.expect("code actions request failed");
5107                        });
5108                        if rng.lock().gen_bool(0.3) {
5109                            log::info!("Guest {}: detaching code actions request", guest_id);
5110                            code_actions.detach();
5111                        } else {
5112                            code_actions.await;
5113                        }
5114                    }
5115                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5116                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5117                            log::info!(
5118                                "Guest {}: saving buffer {:?}",
5119                                guest_id,
5120                                buffer.file().unwrap().full_path(cx)
5121                            );
5122                            (buffer.version(), buffer.save(cx))
5123                        });
5124                        let save = cx.spawn(|cx| async move {
5125                            let (saved_version, _) = save.await.expect("save request failed");
5126                            buffer.read_with(&cx, |buffer, _| {
5127                                assert!(buffer.version().observed_all(&saved_version));
5128                                assert!(saved_version.observed_all(&requested_version));
5129                            });
5130                        });
5131                        if rng.lock().gen_bool(0.3) {
5132                            log::info!("Guest {}: detaching save request", guest_id);
5133                            save.detach();
5134                        } else {
5135                            save.await;
5136                        }
5137                    }
5138                    40..=45 => {
5139                        let prepare_rename = project.update(&mut cx, |project, cx| {
5140                            log::info!(
5141                                "Guest {}: preparing rename for buffer {:?}",
5142                                guest_id,
5143                                buffer.read(cx).file().unwrap().full_path(cx)
5144                            );
5145                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5146                            project.prepare_rename(buffer, offset, cx)
5147                        });
5148                        let prepare_rename = cx.background().spawn(async move {
5149                            prepare_rename.await.expect("prepare rename request failed");
5150                        });
5151                        if rng.lock().gen_bool(0.3) {
5152                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5153                            prepare_rename.detach();
5154                        } else {
5155                            prepare_rename.await;
5156                        }
5157                    }
5158                    46..=49 => {
5159                        let definitions = project.update(&mut cx, |project, cx| {
5160                            log::info!(
5161                                "Guest {}: requesting defintions for buffer {:?}",
5162                                guest_id,
5163                                buffer.read(cx).file().unwrap().full_path(cx)
5164                            );
5165                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5166                            project.definition(&buffer, offset, cx)
5167                        });
5168                        let definitions = cx.background().spawn(async move {
5169                            definitions.await.expect("definitions request failed")
5170                        });
5171                        if rng.lock().gen_bool(0.3) {
5172                            log::info!("Guest {}: detaching definitions request", guest_id);
5173                            definitions.detach();
5174                        } else {
5175                            self.buffers
5176                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5177                        }
5178                    }
5179                    50..=55 => {
5180                        let highlights = project.update(&mut cx, |project, cx| {
5181                            log::info!(
5182                                "Guest {}: requesting highlights for buffer {:?}",
5183                                guest_id,
5184                                buffer.read(cx).file().unwrap().full_path(cx)
5185                            );
5186                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5187                            project.document_highlights(&buffer, offset, cx)
5188                        });
5189                        let highlights = cx.background().spawn(async move {
5190                            highlights.await.expect("highlights request failed");
5191                        });
5192                        if rng.lock().gen_bool(0.3) {
5193                            log::info!("Guest {}: detaching highlights request", guest_id);
5194                            highlights.detach();
5195                        } else {
5196                            highlights.await;
5197                        }
5198                    }
5199                    _ => {
5200                        buffer.update(&mut cx, |buffer, cx| {
5201                            log::info!(
5202                                "Guest {}: updating buffer {:?}",
5203                                guest_id,
5204                                buffer.file().unwrap().full_path(cx)
5205                            );
5206                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5207                        });
5208                    }
5209                }
5210                cx.background().simulate_random_delay().await;
5211            }
5212
5213            log::info!("Guest {} done", guest_id);
5214
5215            self.project = Some(project);
5216            (self, cx)
5217        }
5218    }
5219
5220    impl Executor for Arc<gpui::executor::Background> {
5221        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5222            self.spawn(future).detach();
5223        }
5224    }
5225
5226    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5227        channel
5228            .messages()
5229            .cursor::<()>()
5230            .map(|m| {
5231                (
5232                    m.sender.github_login.clone(),
5233                    m.body.clone(),
5234                    m.is_pending(),
5235                )
5236            })
5237            .collect()
5238    }
5239
5240    struct EmptyView;
5241
5242    impl gpui::Entity for EmptyView {
5243        type Event = ();
5244    }
5245
5246    impl gpui::View for EmptyView {
5247        fn ui_name() -> &'static str {
5248            "empty view"
5249        }
5250
5251        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5252            gpui::Element::boxed(gpui::elements::Empty)
5253        }
5254    }
5255}