rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::start_language_server)
  87            .add_message_handler(Server::update_language_server)
  88            .add_message_handler(Server::update_diagnostic_summary)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
  96            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
  97            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  98            .add_request_handler(
  99                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 100            )
 101            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 102            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 103            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 105            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 106            .add_request_handler(Server::update_buffer)
 107            .add_message_handler(Server::update_buffer_file)
 108            .add_message_handler(Server::buffer_reloaded)
 109            .add_message_handler(Server::buffer_saved)
 110            .add_request_handler(Server::save_buffer)
 111            .add_request_handler(Server::get_channels)
 112            .add_request_handler(Server::get_users)
 113            .add_request_handler(Server::join_channel)
 114            .add_message_handler(Server::leave_channel)
 115            .add_request_handler(Server::send_channel_message)
 116            .add_request_handler(Server::follow)
 117            .add_message_handler(Server::unfollow)
 118            .add_message_handler(Server::update_followers)
 119            .add_request_handler(Server::get_channel_messages);
 120
 121        Arc::new(server)
 122    }
 123
 124    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 128        M: EnvelopedMessage,
 129    {
 130        let prev_handler = self.handlers.insert(
 131            TypeId::of::<M>(),
 132            Box::new(move |server, envelope| {
 133                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 134                (handler)(server, *envelope).boxed()
 135            }),
 136        );
 137        if prev_handler.is_some() {
 138            panic!("registered a handler for the same message twice");
 139        }
 140        self
 141    }
 142
 143    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 144    where
 145        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 146        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 147        M: RequestMessage,
 148    {
 149        self.add_message_handler(move |server, envelope| {
 150            let receipt = envelope.receipt();
 151            let response = (handler)(server.clone(), envelope);
 152            async move {
 153                match response.await {
 154                    Ok(response) => {
 155                        server.peer.respond(receipt, response)?;
 156                        Ok(())
 157                    }
 158                    Err(error) => {
 159                        server.peer.respond_with_error(
 160                            receipt,
 161                            proto::Error {
 162                                message: error.to_string(),
 163                            },
 164                        )?;
 165                        Err(error)
 166                    }
 167                }
 168            }
 169        })
 170    }
 171
 172    pub fn handle_connection<E: Executor>(
 173        self: &Arc<Self>,
 174        connection: Connection,
 175        addr: String,
 176        user_id: UserId,
 177        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 178        executor: E,
 179    ) -> impl Future<Output = ()> {
 180        let mut this = self.clone();
 181        async move {
 182            let (connection_id, handle_io, mut incoming_rx) = this
 183                .peer
 184                .add_connection(connection, {
 185                    let executor = executor.clone();
 186                    move |duration| {
 187                        let timer = executor.timer(duration);
 188                        async move {
 189                            timer.await;
 190                        }
 191                    }
 192                })
 193                .await;
 194
 195            if let Some(send_connection_id) = send_connection_id.as_mut() {
 196                let _ = send_connection_id.send(connection_id).await;
 197            }
 198
 199            this.state_mut().add_connection(connection_id, user_id);
 200            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 201                log::error!("error updating contacts for {:?}: {}", user_id, err);
 202            }
 203
 204            let handle_io = handle_io.fuse();
 205            futures::pin_mut!(handle_io);
 206            loop {
 207                let next_message = incoming_rx.next().fuse();
 208                futures::pin_mut!(next_message);
 209                futures::select_biased! {
 210                    result = handle_io => {
 211                        if let Err(err) = result {
 212                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 213                        }
 214                        break;
 215                    }
 216                    message = next_message => {
 217                        if let Some(message) = message {
 218                            let start_time = Instant::now();
 219                            let type_name = message.payload_type_name();
 220                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 221                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 222                                let notifications = this.notifications.clone();
 223                                let is_background = message.is_background();
 224                                let handle_message = (handler)(this.clone(), message);
 225                                let handle_message = async move {
 226                                    if let Err(err) = handle_message.await {
 227                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 228                                    } else {
 229                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 230                                    }
 231                                    if let Some(mut notifications) = notifications {
 232                                        let _ = notifications.send(()).await;
 233                                    }
 234                                };
 235                                if is_background {
 236                                    executor.spawn_detached(handle_message);
 237                                } else {
 238                                    handle_message.await;
 239                                }
 240                            } else {
 241                                log::warn!("unhandled message: {}", type_name);
 242                            }
 243                        } else {
 244                            log::info!("rpc connection closed {:?}", addr);
 245                            break;
 246                        }
 247                    }
 248                }
 249            }
 250
 251            if let Err(err) = this.sign_out(connection_id).await {
 252                log::error!("error signing out connection {:?} - {:?}", addr, err);
 253            }
 254        }
 255    }
 256
 257    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 258        self.peer.disconnect(connection_id);
 259        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 260
 261        for (project_id, project) in removed_connection.hosted_projects {
 262            if let Some(share) = project.share {
 263                broadcast(
 264                    connection_id,
 265                    share.guests.keys().copied().collect(),
 266                    |conn_id| {
 267                        self.peer
 268                            .send(conn_id, proto::UnshareProject { project_id })
 269                    },
 270                )?;
 271            }
 272        }
 273
 274        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 275            broadcast(connection_id, peer_ids, |conn_id| {
 276                self.peer.send(
 277                    conn_id,
 278                    proto::RemoveProjectCollaborator {
 279                        project_id,
 280                        peer_id: connection_id.0,
 281                    },
 282                )
 283            })?;
 284        }
 285
 286        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 287        Ok(())
 288    }
 289
 290    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 291        Ok(proto::Ack {})
 292    }
 293
 294    async fn register_project(
 295        mut self: Arc<Server>,
 296        request: TypedEnvelope<proto::RegisterProject>,
 297    ) -> tide::Result<proto::RegisterProjectResponse> {
 298        let project_id = {
 299            let mut state = self.state_mut();
 300            let user_id = state.user_id_for_connection(request.sender_id)?;
 301            state.register_project(request.sender_id, user_id)
 302        };
 303        Ok(proto::RegisterProjectResponse { project_id })
 304    }
 305
 306    async fn unregister_project(
 307        mut self: Arc<Server>,
 308        request: TypedEnvelope<proto::UnregisterProject>,
 309    ) -> tide::Result<()> {
 310        let project = self
 311            .state_mut()
 312            .unregister_project(request.payload.project_id, request.sender_id)?;
 313        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 314        Ok(())
 315    }
 316
 317    async fn share_project(
 318        mut self: Arc<Server>,
 319        request: TypedEnvelope<proto::ShareProject>,
 320    ) -> tide::Result<proto::Ack> {
 321        self.state_mut()
 322            .share_project(request.payload.project_id, request.sender_id);
 323        Ok(proto::Ack {})
 324    }
 325
 326    async fn unshare_project(
 327        mut self: Arc<Server>,
 328        request: TypedEnvelope<proto::UnshareProject>,
 329    ) -> tide::Result<()> {
 330        let project_id = request.payload.project_id;
 331        let project = self
 332            .state_mut()
 333            .unshare_project(project_id, request.sender_id)?;
 334
 335        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 336            self.peer
 337                .send(conn_id, proto::UnshareProject { project_id })
 338        })?;
 339        self.update_contacts_for_users(&project.authorized_user_ids)?;
 340        Ok(())
 341    }
 342
 343    async fn join_project(
 344        mut self: Arc<Server>,
 345        request: TypedEnvelope<proto::JoinProject>,
 346    ) -> tide::Result<proto::JoinProjectResponse> {
 347        let project_id = request.payload.project_id;
 348
 349        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 350        let (response, connection_ids, contact_user_ids) = self
 351            .state_mut()
 352            .join_project(request.sender_id, user_id, project_id)
 353            .and_then(|joined| {
 354                let share = joined.project.share()?;
 355                let peer_count = share.guests.len();
 356                let mut collaborators = Vec::with_capacity(peer_count);
 357                collaborators.push(proto::Collaborator {
 358                    peer_id: joined.project.host_connection_id.0,
 359                    replica_id: 0,
 360                    user_id: joined.project.host_user_id.to_proto(),
 361                });
 362                let worktrees = share
 363                    .worktrees
 364                    .iter()
 365                    .filter_map(|(id, shared_worktree)| {
 366                        let worktree = joined.project.worktrees.get(&id)?;
 367                        Some(proto::Worktree {
 368                            id: *id,
 369                            root_name: worktree.root_name.clone(),
 370                            entries: shared_worktree.entries.values().cloned().collect(),
 371                            diagnostic_summaries: shared_worktree
 372                                .diagnostic_summaries
 373                                .values()
 374                                .cloned()
 375                                .collect(),
 376                            visible: worktree.visible,
 377                        })
 378                    })
 379                    .collect();
 380                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 381                    if *peer_conn_id != request.sender_id {
 382                        collaborators.push(proto::Collaborator {
 383                            peer_id: peer_conn_id.0,
 384                            replica_id: *peer_replica_id as u32,
 385                            user_id: peer_user_id.to_proto(),
 386                        });
 387                    }
 388                }
 389                let response = proto::JoinProjectResponse {
 390                    worktrees,
 391                    replica_id: joined.replica_id as u32,
 392                    collaborators,
 393                    language_servers: joined.project.language_servers.clone(),
 394                };
 395                let connection_ids = joined.project.connection_ids();
 396                let contact_user_ids = joined.project.authorized_user_ids();
 397                Ok((response, connection_ids, contact_user_ids))
 398            })?;
 399
 400        broadcast(request.sender_id, connection_ids, |conn_id| {
 401            self.peer.send(
 402                conn_id,
 403                proto::AddProjectCollaborator {
 404                    project_id,
 405                    collaborator: Some(proto::Collaborator {
 406                        peer_id: request.sender_id.0,
 407                        replica_id: response.replica_id,
 408                        user_id: user_id.to_proto(),
 409                    }),
 410                },
 411            )
 412        })?;
 413        self.update_contacts_for_users(&contact_user_ids)?;
 414        Ok(response)
 415    }
 416
 417    async fn leave_project(
 418        mut self: Arc<Server>,
 419        request: TypedEnvelope<proto::LeaveProject>,
 420    ) -> tide::Result<()> {
 421        let sender_id = request.sender_id;
 422        let project_id = request.payload.project_id;
 423        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 424
 425        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 426            self.peer.send(
 427                conn_id,
 428                proto::RemoveProjectCollaborator {
 429                    project_id,
 430                    peer_id: sender_id.0,
 431                },
 432            )
 433        })?;
 434        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 435
 436        Ok(())
 437    }
 438
 439    async fn register_worktree(
 440        mut self: Arc<Server>,
 441        request: TypedEnvelope<proto::RegisterWorktree>,
 442    ) -> tide::Result<proto::Ack> {
 443        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 444
 445        let mut contact_user_ids = HashSet::default();
 446        contact_user_ids.insert(host_user_id);
 447        for github_login in &request.payload.authorized_logins {
 448            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 449            contact_user_ids.insert(contact_user_id);
 450        }
 451
 452        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 453        let guest_connection_ids;
 454        {
 455            let mut state = self.state_mut();
 456            guest_connection_ids = state
 457                .read_project(request.payload.project_id, request.sender_id)?
 458                .guest_connection_ids();
 459            state.register_worktree(
 460                request.payload.project_id,
 461                request.payload.worktree_id,
 462                request.sender_id,
 463                Worktree {
 464                    authorized_user_ids: contact_user_ids.clone(),
 465                    root_name: request.payload.root_name.clone(),
 466                    visible: request.payload.visible,
 467                },
 468            )?;
 469        }
 470        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 471            self.peer
 472                .forward_send(request.sender_id, connection_id, request.payload.clone())
 473        })?;
 474        self.update_contacts_for_users(&contact_user_ids)?;
 475        Ok(proto::Ack {})
 476    }
 477
 478    async fn unregister_worktree(
 479        mut self: Arc<Server>,
 480        request: TypedEnvelope<proto::UnregisterWorktree>,
 481    ) -> tide::Result<()> {
 482        let project_id = request.payload.project_id;
 483        let worktree_id = request.payload.worktree_id;
 484        let (worktree, guest_connection_ids) =
 485            self.state_mut()
 486                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 487        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 488            self.peer.send(
 489                conn_id,
 490                proto::UnregisterWorktree {
 491                    project_id,
 492                    worktree_id,
 493                },
 494            )
 495        })?;
 496        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 497        Ok(())
 498    }
 499
 500    async fn update_worktree(
 501        mut self: Arc<Server>,
 502        request: TypedEnvelope<proto::UpdateWorktree>,
 503    ) -> tide::Result<proto::Ack> {
 504        let connection_ids = self.state_mut().update_worktree(
 505            request.sender_id,
 506            request.payload.project_id,
 507            request.payload.worktree_id,
 508            &request.payload.removed_entries,
 509            &request.payload.updated_entries,
 510        )?;
 511
 512        broadcast(request.sender_id, connection_ids, |connection_id| {
 513            self.peer
 514                .forward_send(request.sender_id, connection_id, request.payload.clone())
 515        })?;
 516
 517        Ok(proto::Ack {})
 518    }
 519
 520    async fn update_diagnostic_summary(
 521        mut self: Arc<Server>,
 522        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 523    ) -> tide::Result<()> {
 524        let summary = request
 525            .payload
 526            .summary
 527            .clone()
 528            .ok_or_else(|| anyhow!("invalid summary"))?;
 529        let receiver_ids = self.state_mut().update_diagnostic_summary(
 530            request.payload.project_id,
 531            request.payload.worktree_id,
 532            request.sender_id,
 533            summary,
 534        )?;
 535
 536        broadcast(request.sender_id, receiver_ids, |connection_id| {
 537            self.peer
 538                .forward_send(request.sender_id, connection_id, request.payload.clone())
 539        })?;
 540        Ok(())
 541    }
 542
 543    async fn start_language_server(
 544        mut self: Arc<Server>,
 545        request: TypedEnvelope<proto::StartLanguageServer>,
 546    ) -> tide::Result<()> {
 547        let receiver_ids = self.state_mut().start_language_server(
 548            request.payload.project_id,
 549            request.sender_id,
 550            request
 551                .payload
 552                .server
 553                .clone()
 554                .ok_or_else(|| anyhow!("invalid language server"))?,
 555        )?;
 556        broadcast(request.sender_id, receiver_ids, |connection_id| {
 557            self.peer
 558                .forward_send(request.sender_id, connection_id, request.payload.clone())
 559        })?;
 560        Ok(())
 561    }
 562
 563    async fn update_language_server(
 564        self: Arc<Server>,
 565        request: TypedEnvelope<proto::UpdateLanguageServer>,
 566    ) -> tide::Result<()> {
 567        let receiver_ids = self
 568            .state()
 569            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 570        broadcast(request.sender_id, receiver_ids, |connection_id| {
 571            self.peer
 572                .forward_send(request.sender_id, connection_id, request.payload.clone())
 573        })?;
 574        Ok(())
 575    }
 576
 577    async fn forward_project_request<T>(
 578        self: Arc<Server>,
 579        request: TypedEnvelope<T>,
 580    ) -> tide::Result<T::Response>
 581    where
 582        T: EntityMessage + RequestMessage,
 583    {
 584        let host_connection_id = self
 585            .state()
 586            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 587            .host_connection_id;
 588        Ok(self
 589            .peer
 590            .forward_request(request.sender_id, host_connection_id, request.payload)
 591            .await?)
 592    }
 593
 594    async fn save_buffer(
 595        self: Arc<Server>,
 596        request: TypedEnvelope<proto::SaveBuffer>,
 597    ) -> tide::Result<proto::BufferSaved> {
 598        let host;
 599        let mut guests;
 600        {
 601            let state = self.state();
 602            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 603            host = project.host_connection_id;
 604            guests = project.guest_connection_ids()
 605        }
 606
 607        let response = self
 608            .peer
 609            .forward_request(request.sender_id, host, request.payload.clone())
 610            .await?;
 611
 612        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 613        broadcast(host, guests, |conn_id| {
 614            self.peer.forward_send(host, conn_id, response.clone())
 615        })?;
 616
 617        Ok(response)
 618    }
 619
 620    async fn update_buffer(
 621        self: Arc<Server>,
 622        request: TypedEnvelope<proto::UpdateBuffer>,
 623    ) -> tide::Result<proto::Ack> {
 624        let receiver_ids = self
 625            .state()
 626            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 627        broadcast(request.sender_id, receiver_ids, |connection_id| {
 628            self.peer
 629                .forward_send(request.sender_id, connection_id, request.payload.clone())
 630        })?;
 631        Ok(proto::Ack {})
 632    }
 633
 634    async fn update_buffer_file(
 635        self: Arc<Server>,
 636        request: TypedEnvelope<proto::UpdateBufferFile>,
 637    ) -> tide::Result<()> {
 638        let receiver_ids = self
 639            .state()
 640            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 641        broadcast(request.sender_id, receiver_ids, |connection_id| {
 642            self.peer
 643                .forward_send(request.sender_id, connection_id, request.payload.clone())
 644        })?;
 645        Ok(())
 646    }
 647
 648    async fn buffer_reloaded(
 649        self: Arc<Server>,
 650        request: TypedEnvelope<proto::BufferReloaded>,
 651    ) -> tide::Result<()> {
 652        let receiver_ids = self
 653            .state()
 654            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 655        broadcast(request.sender_id, receiver_ids, |connection_id| {
 656            self.peer
 657                .forward_send(request.sender_id, connection_id, request.payload.clone())
 658        })?;
 659        Ok(())
 660    }
 661
 662    async fn buffer_saved(
 663        self: Arc<Server>,
 664        request: TypedEnvelope<proto::BufferSaved>,
 665    ) -> tide::Result<()> {
 666        let receiver_ids = self
 667            .state()
 668            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 669        broadcast(request.sender_id, receiver_ids, |connection_id| {
 670            self.peer
 671                .forward_send(request.sender_id, connection_id, request.payload.clone())
 672        })?;
 673        Ok(())
 674    }
 675
 676    async fn follow(
 677        self: Arc<Self>,
 678        request: TypedEnvelope<proto::Follow>,
 679    ) -> tide::Result<proto::FollowResponse> {
 680        let leader_id = ConnectionId(request.payload.leader_id);
 681        let follower_id = request.sender_id;
 682        if !self
 683            .state()
 684            .project_connection_ids(request.payload.project_id, follower_id)?
 685            .contains(&leader_id)
 686        {
 687            Err(anyhow!("no such peer"))?;
 688        }
 689        let mut response = self
 690            .peer
 691            .forward_request(request.sender_id, leader_id, request.payload)
 692            .await?;
 693        response
 694            .views
 695            .retain(|view| view.leader_id != Some(follower_id.0));
 696        Ok(response)
 697    }
 698
 699    async fn unfollow(
 700        self: Arc<Self>,
 701        request: TypedEnvelope<proto::Unfollow>,
 702    ) -> tide::Result<()> {
 703        let leader_id = ConnectionId(request.payload.leader_id);
 704        if !self
 705            .state()
 706            .project_connection_ids(request.payload.project_id, request.sender_id)?
 707            .contains(&leader_id)
 708        {
 709            Err(anyhow!("no such peer"))?;
 710        }
 711        self.peer
 712            .forward_send(request.sender_id, leader_id, request.payload)?;
 713        Ok(())
 714    }
 715
 716    async fn update_followers(
 717        self: Arc<Self>,
 718        request: TypedEnvelope<proto::UpdateFollowers>,
 719    ) -> tide::Result<()> {
 720        let connection_ids = self
 721            .state()
 722            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 723        let leader_id = request
 724            .payload
 725            .variant
 726            .as_ref()
 727            .and_then(|variant| match variant {
 728                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 729                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 730                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 731            });
 732        for follower_id in &request.payload.follower_ids {
 733            let follower_id = ConnectionId(*follower_id);
 734            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 735                self.peer
 736                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 737            }
 738        }
 739        Ok(())
 740    }
 741
 742    async fn get_channels(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::GetChannels>,
 745    ) -> tide::Result<proto::GetChannelsResponse> {
 746        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 747        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 748        Ok(proto::GetChannelsResponse {
 749            channels: channels
 750                .into_iter()
 751                .map(|chan| proto::Channel {
 752                    id: chan.id.to_proto(),
 753                    name: chan.name,
 754                })
 755                .collect(),
 756        })
 757    }
 758
 759    async fn get_users(
 760        self: Arc<Server>,
 761        request: TypedEnvelope<proto::GetUsers>,
 762    ) -> tide::Result<proto::GetUsersResponse> {
 763        let user_ids = request
 764            .payload
 765            .user_ids
 766            .into_iter()
 767            .map(UserId::from_proto)
 768            .collect();
 769        let users = self
 770            .app_state
 771            .db
 772            .get_users_by_ids(user_ids)
 773            .await?
 774            .into_iter()
 775            .map(|user| proto::User {
 776                id: user.id.to_proto(),
 777                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 778                github_login: user.github_login,
 779            })
 780            .collect();
 781        Ok(proto::GetUsersResponse { users })
 782    }
 783
 784    fn update_contacts_for_users<'a>(
 785        self: &Arc<Server>,
 786        user_ids: impl IntoIterator<Item = &'a UserId>,
 787    ) -> anyhow::Result<()> {
 788        let mut result = Ok(());
 789        let state = self.state();
 790        for user_id in user_ids {
 791            let contacts = state.contacts_for_user(*user_id);
 792            for connection_id in state.connection_ids_for_user(*user_id) {
 793                if let Err(error) = self.peer.send(
 794                    connection_id,
 795                    proto::UpdateContacts {
 796                        contacts: contacts.clone(),
 797                    },
 798                ) {
 799                    result = Err(error);
 800                }
 801            }
 802        }
 803        result
 804    }
 805
 806    async fn join_channel(
 807        mut self: Arc<Self>,
 808        request: TypedEnvelope<proto::JoinChannel>,
 809    ) -> tide::Result<proto::JoinChannelResponse> {
 810        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 811        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 812        if !self
 813            .app_state
 814            .db
 815            .can_user_access_channel(user_id, channel_id)
 816            .await?
 817        {
 818            Err(anyhow!("access denied"))?;
 819        }
 820
 821        self.state_mut().join_channel(request.sender_id, channel_id);
 822        let messages = self
 823            .app_state
 824            .db
 825            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 826            .await?
 827            .into_iter()
 828            .map(|msg| proto::ChannelMessage {
 829                id: msg.id.to_proto(),
 830                body: msg.body,
 831                timestamp: msg.sent_at.unix_timestamp() as u64,
 832                sender_id: msg.sender_id.to_proto(),
 833                nonce: Some(msg.nonce.as_u128().into()),
 834            })
 835            .collect::<Vec<_>>();
 836        Ok(proto::JoinChannelResponse {
 837            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 838            messages,
 839        })
 840    }
 841
 842    async fn leave_channel(
 843        mut self: Arc<Self>,
 844        request: TypedEnvelope<proto::LeaveChannel>,
 845    ) -> tide::Result<()> {
 846        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 847        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 848        if !self
 849            .app_state
 850            .db
 851            .can_user_access_channel(user_id, channel_id)
 852            .await?
 853        {
 854            Err(anyhow!("access denied"))?;
 855        }
 856
 857        self.state_mut()
 858            .leave_channel(request.sender_id, channel_id);
 859
 860        Ok(())
 861    }
 862
 863    async fn send_channel_message(
 864        self: Arc<Self>,
 865        request: TypedEnvelope<proto::SendChannelMessage>,
 866    ) -> tide::Result<proto::SendChannelMessageResponse> {
 867        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 868        let user_id;
 869        let connection_ids;
 870        {
 871            let state = self.state();
 872            user_id = state.user_id_for_connection(request.sender_id)?;
 873            connection_ids = state.channel_connection_ids(channel_id)?;
 874        }
 875
 876        // Validate the message body.
 877        let body = request.payload.body.trim().to_string();
 878        if body.len() > MAX_MESSAGE_LEN {
 879            return Err(anyhow!("message is too long"))?;
 880        }
 881        if body.is_empty() {
 882            return Err(anyhow!("message can't be blank"))?;
 883        }
 884
 885        let timestamp = OffsetDateTime::now_utc();
 886        let nonce = request
 887            .payload
 888            .nonce
 889            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 890
 891        let message_id = self
 892            .app_state
 893            .db
 894            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 895            .await?
 896            .to_proto();
 897        let message = proto::ChannelMessage {
 898            sender_id: user_id.to_proto(),
 899            id: message_id,
 900            body,
 901            timestamp: timestamp.unix_timestamp() as u64,
 902            nonce: Some(nonce),
 903        };
 904        broadcast(request.sender_id, connection_ids, |conn_id| {
 905            self.peer.send(
 906                conn_id,
 907                proto::ChannelMessageSent {
 908                    channel_id: channel_id.to_proto(),
 909                    message: Some(message.clone()),
 910                },
 911            )
 912        })?;
 913        Ok(proto::SendChannelMessageResponse {
 914            message: Some(message),
 915        })
 916    }
 917
 918    async fn get_channel_messages(
 919        self: Arc<Self>,
 920        request: TypedEnvelope<proto::GetChannelMessages>,
 921    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 922        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 923        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 924        if !self
 925            .app_state
 926            .db
 927            .can_user_access_channel(user_id, channel_id)
 928            .await?
 929        {
 930            Err(anyhow!("access denied"))?;
 931        }
 932
 933        let messages = self
 934            .app_state
 935            .db
 936            .get_channel_messages(
 937                channel_id,
 938                MESSAGE_COUNT_PER_PAGE,
 939                Some(MessageId::from_proto(request.payload.before_message_id)),
 940            )
 941            .await?
 942            .into_iter()
 943            .map(|msg| proto::ChannelMessage {
 944                id: msg.id.to_proto(),
 945                body: msg.body,
 946                timestamp: msg.sent_at.unix_timestamp() as u64,
 947                sender_id: msg.sender_id.to_proto(),
 948                nonce: Some(msg.nonce.as_u128().into()),
 949            })
 950            .collect::<Vec<_>>();
 951
 952        Ok(proto::GetChannelMessagesResponse {
 953            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 954            messages,
 955        })
 956    }
 957
 958    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 959        self.store.read()
 960    }
 961
 962    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 963        self.store.write()
 964    }
 965}
 966
 967impl Executor for RealExecutor {
 968    type Timer = Timer;
 969
 970    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 971        task::spawn(future);
 972    }
 973
 974    fn timer(&self, duration: Duration) -> Self::Timer {
 975        Timer::after(duration)
 976    }
 977}
 978
 979fn broadcast<F>(
 980    sender_id: ConnectionId,
 981    receiver_ids: Vec<ConnectionId>,
 982    mut f: F,
 983) -> anyhow::Result<()>
 984where
 985    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 986{
 987    let mut result = Ok(());
 988    for receiver_id in receiver_ids {
 989        if receiver_id != sender_id {
 990            if let Err(error) = f(receiver_id) {
 991                if result.is_ok() {
 992                    result = Err(error);
 993                }
 994            }
 995        }
 996    }
 997    result
 998}
 999
1000pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1001    let server = Server::new(app.state().clone(), rpc.clone(), None);
1002    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1003        let server = server.clone();
1004        async move {
1005            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1006
1007            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1008            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1009            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1010            let client_protocol_version: Option<u32> = request
1011                .header("X-Zed-Protocol-Version")
1012                .and_then(|v| v.as_str().parse().ok());
1013
1014            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1015                return Ok(Response::new(StatusCode::UpgradeRequired));
1016            }
1017
1018            let header = match request.header("Sec-Websocket-Key") {
1019                Some(h) => h.as_str(),
1020                None => return Err(anyhow!("expected sec-websocket-key"))?,
1021            };
1022
1023            let user_id = process_auth_header(&request).await?;
1024
1025            let mut response = Response::new(StatusCode::SwitchingProtocols);
1026            response.insert_header(UPGRADE, "websocket");
1027            response.insert_header(CONNECTION, "Upgrade");
1028            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1029            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1030            response.insert_header("Sec-Websocket-Version", "13");
1031
1032            let http_res: &mut tide::http::Response = response.as_mut();
1033            let upgrade_receiver = http_res.recv_upgrade().await;
1034            let addr = request.remote().unwrap_or("unknown").to_string();
1035            task::spawn(async move {
1036                if let Some(stream) = upgrade_receiver.await {
1037                    server
1038                        .handle_connection(
1039                            Connection::new(
1040                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1041                            ),
1042                            addr,
1043                            user_id,
1044                            None,
1045                            RealExecutor,
1046                        )
1047                        .await;
1048                }
1049            });
1050
1051            Ok(response)
1052        }
1053    });
1054}
1055
1056fn header_contains_ignore_case<T>(
1057    request: &tide::Request<T>,
1058    header_name: HeaderName,
1059    value: &str,
1060) -> bool {
1061    request
1062        .header(header_name)
1063        .map(|h| {
1064            h.as_str()
1065                .split(',')
1066                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1067        })
1068        .unwrap_or(false)
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use super::*;
1074    use crate::{
1075        auth,
1076        db::{tests::TestDb, UserId},
1077        github, AppState, Config,
1078    };
1079    use ::rpc::Peer;
1080    use client::{
1081        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1082        EstablishConnectionError, UserStore,
1083    };
1084    use collections::BTreeMap;
1085    use editor::{
1086        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1087        ToOffset, ToggleCodeActions, Undo,
1088    };
1089    use gpui::{executor, geometry::vector::vec2f, ModelHandle, TestAppContext, ViewHandle};
1090    use language::{
1091        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1092        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1093    };
1094    use lsp;
1095    use parking_lot::Mutex;
1096    use postage::barrier;
1097    use project::{
1098        fs::{FakeFs, Fs as _},
1099        search::SearchQuery,
1100        worktree::WorktreeHandle,
1101        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1102    };
1103    use rand::prelude::*;
1104    use rpc::PeerId;
1105    use serde_json::json;
1106    use sqlx::types::time::OffsetDateTime;
1107    use std::{
1108        cell::Cell,
1109        env,
1110        ops::Deref,
1111        path::{Path, PathBuf},
1112        rc::Rc,
1113        sync::{
1114            atomic::{AtomicBool, Ordering::SeqCst},
1115            Arc,
1116        },
1117        time::Duration,
1118    };
1119    use workspace::{Item, Settings, SplitDirection, Workspace, WorkspaceParams};
1120
1121    #[cfg(test)]
1122    #[ctor::ctor]
1123    fn init_logger() {
1124        if std::env::var("RUST_LOG").is_ok() {
1125            env_logger::init();
1126        }
1127    }
1128
1129    #[gpui::test(iterations = 10)]
1130    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1131        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1132        let lang_registry = Arc::new(LanguageRegistry::test());
1133        let fs = FakeFs::new(cx_a.background());
1134        cx_a.foreground().forbid_parking();
1135
1136        // Connect to a server as 2 clients.
1137        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1138        let client_a = server.create_client(cx_a, "user_a").await;
1139        let client_b = server.create_client(cx_b, "user_b").await;
1140
1141        // Share a project as client A
1142        fs.insert_tree(
1143            "/a",
1144            json!({
1145                ".zed.toml": r#"collaborators = ["user_b"]"#,
1146                "a.txt": "a-contents",
1147                "b.txt": "b-contents",
1148            }),
1149        )
1150        .await;
1151        let project_a = cx_a.update(|cx| {
1152            Project::local(
1153                client_a.clone(),
1154                client_a.user_store.clone(),
1155                lang_registry.clone(),
1156                fs.clone(),
1157                cx,
1158            )
1159        });
1160        let (worktree_a, _) = project_a
1161            .update(cx_a, |p, cx| {
1162                p.find_or_create_local_worktree("/a", true, cx)
1163            })
1164            .await
1165            .unwrap();
1166        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1167        worktree_a
1168            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1169            .await;
1170        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1171        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1172
1173        // Join that project as client B
1174        let project_b = Project::remote(
1175            project_id,
1176            client_b.clone(),
1177            client_b.user_store.clone(),
1178            lang_registry.clone(),
1179            fs.clone(),
1180            &mut cx_b.to_async(),
1181        )
1182        .await
1183        .unwrap();
1184
1185        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1186            assert_eq!(
1187                project
1188                    .collaborators()
1189                    .get(&client_a.peer_id)
1190                    .unwrap()
1191                    .user
1192                    .github_login,
1193                "user_a"
1194            );
1195            project.replica_id()
1196        });
1197        project_a
1198            .condition(&cx_a, |tree, _| {
1199                tree.collaborators()
1200                    .get(&client_b.peer_id)
1201                    .map_or(false, |collaborator| {
1202                        collaborator.replica_id == replica_id_b
1203                            && collaborator.user.github_login == "user_b"
1204                    })
1205            })
1206            .await;
1207
1208        // Open the same file as client B and client A.
1209        let buffer_b = project_b
1210            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1211            .await
1212            .unwrap();
1213        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1214        project_a.read_with(cx_a, |project, cx| {
1215            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1216        });
1217        let buffer_a = project_a
1218            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1219            .await
1220            .unwrap();
1221
1222        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1223
1224        // TODO
1225        // // Create a selection set as client B and see that selection set as client A.
1226        // buffer_a
1227        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1228        //     .await;
1229
1230        // Edit the buffer as client B and see that edit as client A.
1231        editor_b.update(cx_b, |editor, cx| {
1232            editor.handle_input(&Input("ok, ".into()), cx)
1233        });
1234        buffer_a
1235            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1236            .await;
1237
1238        // TODO
1239        // // Remove the selection set as client B, see those selections disappear as client A.
1240        cx_b.update(move |_| drop(editor_b));
1241        // buffer_a
1242        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1243        //     .await;
1244
1245        // Dropping the client B's project removes client B from client A's collaborators.
1246        cx_b.update(move |_| drop(project_b));
1247        project_a
1248            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1249            .await;
1250    }
1251
1252    #[gpui::test(iterations = 10)]
1253    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1254        let lang_registry = Arc::new(LanguageRegistry::test());
1255        let fs = FakeFs::new(cx_a.background());
1256        cx_a.foreground().forbid_parking();
1257
1258        // Connect to a server as 2 clients.
1259        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1260        let client_a = server.create_client(cx_a, "user_a").await;
1261        let client_b = server.create_client(cx_b, "user_b").await;
1262
1263        // Share a project as client A
1264        fs.insert_tree(
1265            "/a",
1266            json!({
1267                ".zed.toml": r#"collaborators = ["user_b"]"#,
1268                "a.txt": "a-contents",
1269                "b.txt": "b-contents",
1270            }),
1271        )
1272        .await;
1273        let project_a = cx_a.update(|cx| {
1274            Project::local(
1275                client_a.clone(),
1276                client_a.user_store.clone(),
1277                lang_registry.clone(),
1278                fs.clone(),
1279                cx,
1280            )
1281        });
1282        let (worktree_a, _) = project_a
1283            .update(cx_a, |p, cx| {
1284                p.find_or_create_local_worktree("/a", true, cx)
1285            })
1286            .await
1287            .unwrap();
1288        worktree_a
1289            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1290            .await;
1291        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1292        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1293        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1294        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1295
1296        // Join that project as client B
1297        let project_b = Project::remote(
1298            project_id,
1299            client_b.clone(),
1300            client_b.user_store.clone(),
1301            lang_registry.clone(),
1302            fs.clone(),
1303            &mut cx_b.to_async(),
1304        )
1305        .await
1306        .unwrap();
1307        project_b
1308            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1309            .await
1310            .unwrap();
1311
1312        // Unshare the project as client A
1313        project_a.update(cx_a, |project, cx| project.unshare(cx));
1314        project_b
1315            .condition(cx_b, |project, _| project.is_read_only())
1316            .await;
1317        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1318        cx_b.update(|_| {
1319            drop(project_b);
1320        });
1321
1322        // Share the project again and ensure guests can still join.
1323        project_a
1324            .update(cx_a, |project, cx| project.share(cx))
1325            .await
1326            .unwrap();
1327        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1328
1329        let project_b2 = Project::remote(
1330            project_id,
1331            client_b.clone(),
1332            client_b.user_store.clone(),
1333            lang_registry.clone(),
1334            fs.clone(),
1335            &mut cx_b.to_async(),
1336        )
1337        .await
1338        .unwrap();
1339        project_b2
1340            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1341            .await
1342            .unwrap();
1343    }
1344
1345    #[gpui::test(iterations = 10)]
1346    async fn test_propagate_saves_and_fs_changes(
1347        cx_a: &mut TestAppContext,
1348        cx_b: &mut TestAppContext,
1349        cx_c: &mut TestAppContext,
1350    ) {
1351        let lang_registry = Arc::new(LanguageRegistry::test());
1352        let fs = FakeFs::new(cx_a.background());
1353        cx_a.foreground().forbid_parking();
1354
1355        // Connect to a server as 3 clients.
1356        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1357        let client_a = server.create_client(cx_a, "user_a").await;
1358        let client_b = server.create_client(cx_b, "user_b").await;
1359        let client_c = server.create_client(cx_c, "user_c").await;
1360
1361        // Share a worktree as client A.
1362        fs.insert_tree(
1363            "/a",
1364            json!({
1365                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1366                "file1": "",
1367                "file2": ""
1368            }),
1369        )
1370        .await;
1371        let project_a = cx_a.update(|cx| {
1372            Project::local(
1373                client_a.clone(),
1374                client_a.user_store.clone(),
1375                lang_registry.clone(),
1376                fs.clone(),
1377                cx,
1378            )
1379        });
1380        let (worktree_a, _) = project_a
1381            .update(cx_a, |p, cx| {
1382                p.find_or_create_local_worktree("/a", true, cx)
1383            })
1384            .await
1385            .unwrap();
1386        worktree_a
1387            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1388            .await;
1389        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1390        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1391        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1392
1393        // Join that worktree as clients B and C.
1394        let project_b = Project::remote(
1395            project_id,
1396            client_b.clone(),
1397            client_b.user_store.clone(),
1398            lang_registry.clone(),
1399            fs.clone(),
1400            &mut cx_b.to_async(),
1401        )
1402        .await
1403        .unwrap();
1404        let project_c = Project::remote(
1405            project_id,
1406            client_c.clone(),
1407            client_c.user_store.clone(),
1408            lang_registry.clone(),
1409            fs.clone(),
1410            &mut cx_c.to_async(),
1411        )
1412        .await
1413        .unwrap();
1414        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1415        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1416
1417        // Open and edit a buffer as both guests B and C.
1418        let buffer_b = project_b
1419            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1420            .await
1421            .unwrap();
1422        let buffer_c = project_c
1423            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1424            .await
1425            .unwrap();
1426        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1427        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1428
1429        // Open and edit that buffer as the host.
1430        let buffer_a = project_a
1431            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1432            .await
1433            .unwrap();
1434
1435        buffer_a
1436            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1437            .await;
1438        buffer_a.update(cx_a, |buf, cx| {
1439            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1440        });
1441
1442        // Wait for edits to propagate
1443        buffer_a
1444            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1445            .await;
1446        buffer_b
1447            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1448            .await;
1449        buffer_c
1450            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1451            .await;
1452
1453        // Edit the buffer as the host and concurrently save as guest B.
1454        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1455        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1456        save_b.await.unwrap();
1457        assert_eq!(
1458            fs.load("/a/file1".as_ref()).await.unwrap(),
1459            "hi-a, i-am-c, i-am-b, i-am-a"
1460        );
1461        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1462        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1463        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1464
1465        worktree_a.flush_fs_events(cx_a).await;
1466
1467        // Make changes on host's file system, see those changes on guest worktrees.
1468        fs.rename(
1469            "/a/file1".as_ref(),
1470            "/a/file1-renamed".as_ref(),
1471            Default::default(),
1472        )
1473        .await
1474        .unwrap();
1475
1476        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1477            .await
1478            .unwrap();
1479        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1480
1481        worktree_a
1482            .condition(&cx_a, |tree, _| {
1483                tree.paths()
1484                    .map(|p| p.to_string_lossy())
1485                    .collect::<Vec<_>>()
1486                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1487            })
1488            .await;
1489        worktree_b
1490            .condition(&cx_b, |tree, _| {
1491                tree.paths()
1492                    .map(|p| p.to_string_lossy())
1493                    .collect::<Vec<_>>()
1494                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1495            })
1496            .await;
1497        worktree_c
1498            .condition(&cx_c, |tree, _| {
1499                tree.paths()
1500                    .map(|p| p.to_string_lossy())
1501                    .collect::<Vec<_>>()
1502                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1503            })
1504            .await;
1505
1506        // Ensure buffer files are updated as well.
1507        buffer_a
1508            .condition(&cx_a, |buf, _| {
1509                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1510            })
1511            .await;
1512        buffer_b
1513            .condition(&cx_b, |buf, _| {
1514                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1515            })
1516            .await;
1517        buffer_c
1518            .condition(&cx_c, |buf, _| {
1519                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1520            })
1521            .await;
1522    }
1523
1524    #[gpui::test(iterations = 10)]
1525    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1526        cx_a.foreground().forbid_parking();
1527        let lang_registry = Arc::new(LanguageRegistry::test());
1528        let fs = FakeFs::new(cx_a.background());
1529
1530        // Connect to a server as 2 clients.
1531        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1532        let client_a = server.create_client(cx_a, "user_a").await;
1533        let client_b = server.create_client(cx_b, "user_b").await;
1534
1535        // Share a project as client A
1536        fs.insert_tree(
1537            "/dir",
1538            json!({
1539                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1540                "a.txt": "a-contents",
1541            }),
1542        )
1543        .await;
1544
1545        let project_a = cx_a.update(|cx| {
1546            Project::local(
1547                client_a.clone(),
1548                client_a.user_store.clone(),
1549                lang_registry.clone(),
1550                fs.clone(),
1551                cx,
1552            )
1553        });
1554        let (worktree_a, _) = project_a
1555            .update(cx_a, |p, cx| {
1556                p.find_or_create_local_worktree("/dir", true, cx)
1557            })
1558            .await
1559            .unwrap();
1560        worktree_a
1561            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1562            .await;
1563        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1564        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1565        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1566
1567        // Join that project as client B
1568        let project_b = Project::remote(
1569            project_id,
1570            client_b.clone(),
1571            client_b.user_store.clone(),
1572            lang_registry.clone(),
1573            fs.clone(),
1574            &mut cx_b.to_async(),
1575        )
1576        .await
1577        .unwrap();
1578
1579        // Open a buffer as client B
1580        let buffer_b = project_b
1581            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1582            .await
1583            .unwrap();
1584
1585        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1586        buffer_b.read_with(cx_b, |buf, _| {
1587            assert!(buf.is_dirty());
1588            assert!(!buf.has_conflict());
1589        });
1590
1591        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1592        buffer_b
1593            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1594            .await;
1595        buffer_b.read_with(cx_b, |buf, _| {
1596            assert!(!buf.has_conflict());
1597        });
1598
1599        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1600        buffer_b.read_with(cx_b, |buf, _| {
1601            assert!(buf.is_dirty());
1602            assert!(!buf.has_conflict());
1603        });
1604    }
1605
1606    #[gpui::test(iterations = 10)]
1607    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1608        cx_a.foreground().forbid_parking();
1609        let lang_registry = Arc::new(LanguageRegistry::test());
1610        let fs = FakeFs::new(cx_a.background());
1611
1612        // Connect to a server as 2 clients.
1613        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1614        let client_a = server.create_client(cx_a, "user_a").await;
1615        let client_b = server.create_client(cx_b, "user_b").await;
1616
1617        // Share a project as client A
1618        fs.insert_tree(
1619            "/dir",
1620            json!({
1621                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1622                "a.txt": "a-contents",
1623            }),
1624        )
1625        .await;
1626
1627        let project_a = cx_a.update(|cx| {
1628            Project::local(
1629                client_a.clone(),
1630                client_a.user_store.clone(),
1631                lang_registry.clone(),
1632                fs.clone(),
1633                cx,
1634            )
1635        });
1636        let (worktree_a, _) = project_a
1637            .update(cx_a, |p, cx| {
1638                p.find_or_create_local_worktree("/dir", true, cx)
1639            })
1640            .await
1641            .unwrap();
1642        worktree_a
1643            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1644            .await;
1645        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1646        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1647        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1648
1649        // Join that project as client B
1650        let project_b = Project::remote(
1651            project_id,
1652            client_b.clone(),
1653            client_b.user_store.clone(),
1654            lang_registry.clone(),
1655            fs.clone(),
1656            &mut cx_b.to_async(),
1657        )
1658        .await
1659        .unwrap();
1660        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1661
1662        // Open a buffer as client B
1663        let buffer_b = project_b
1664            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1665            .await
1666            .unwrap();
1667        buffer_b.read_with(cx_b, |buf, _| {
1668            assert!(!buf.is_dirty());
1669            assert!(!buf.has_conflict());
1670        });
1671
1672        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1673            .await
1674            .unwrap();
1675        buffer_b
1676            .condition(&cx_b, |buf, _| {
1677                buf.text() == "new contents" && !buf.is_dirty()
1678            })
1679            .await;
1680        buffer_b.read_with(cx_b, |buf, _| {
1681            assert!(!buf.has_conflict());
1682        });
1683    }
1684
1685    #[gpui::test(iterations = 10)]
1686    async fn test_editing_while_guest_opens_buffer(
1687        cx_a: &mut TestAppContext,
1688        cx_b: &mut TestAppContext,
1689    ) {
1690        cx_a.foreground().forbid_parking();
1691        let lang_registry = Arc::new(LanguageRegistry::test());
1692        let fs = FakeFs::new(cx_a.background());
1693
1694        // Connect to a server as 2 clients.
1695        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1696        let client_a = server.create_client(cx_a, "user_a").await;
1697        let client_b = server.create_client(cx_b, "user_b").await;
1698
1699        // Share a project as client A
1700        fs.insert_tree(
1701            "/dir",
1702            json!({
1703                ".zed.toml": r#"collaborators = ["user_b"]"#,
1704                "a.txt": "a-contents",
1705            }),
1706        )
1707        .await;
1708        let project_a = cx_a.update(|cx| {
1709            Project::local(
1710                client_a.clone(),
1711                client_a.user_store.clone(),
1712                lang_registry.clone(),
1713                fs.clone(),
1714                cx,
1715            )
1716        });
1717        let (worktree_a, _) = project_a
1718            .update(cx_a, |p, cx| {
1719                p.find_or_create_local_worktree("/dir", true, cx)
1720            })
1721            .await
1722            .unwrap();
1723        worktree_a
1724            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1725            .await;
1726        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1727        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1728        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1729
1730        // Join that project as client B
1731        let project_b = Project::remote(
1732            project_id,
1733            client_b.clone(),
1734            client_b.user_store.clone(),
1735            lang_registry.clone(),
1736            fs.clone(),
1737            &mut cx_b.to_async(),
1738        )
1739        .await
1740        .unwrap();
1741
1742        // Open a buffer as client A
1743        let buffer_a = project_a
1744            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1745            .await
1746            .unwrap();
1747
1748        // Start opening the same buffer as client B
1749        let buffer_b = cx_b
1750            .background()
1751            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1752
1753        // Edit the buffer as client A while client B is still opening it.
1754        cx_b.background().simulate_random_delay().await;
1755        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1756        cx_b.background().simulate_random_delay().await;
1757        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1758
1759        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1760        let buffer_b = buffer_b.await.unwrap();
1761        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1762    }
1763
1764    #[gpui::test(iterations = 10)]
1765    async fn test_leaving_worktree_while_opening_buffer(
1766        cx_a: &mut TestAppContext,
1767        cx_b: &mut TestAppContext,
1768    ) {
1769        cx_a.foreground().forbid_parking();
1770        let lang_registry = Arc::new(LanguageRegistry::test());
1771        let fs = FakeFs::new(cx_a.background());
1772
1773        // Connect to a server as 2 clients.
1774        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1775        let client_a = server.create_client(cx_a, "user_a").await;
1776        let client_b = server.create_client(cx_b, "user_b").await;
1777
1778        // Share a project as client A
1779        fs.insert_tree(
1780            "/dir",
1781            json!({
1782                ".zed.toml": r#"collaborators = ["user_b"]"#,
1783                "a.txt": "a-contents",
1784            }),
1785        )
1786        .await;
1787        let project_a = cx_a.update(|cx| {
1788            Project::local(
1789                client_a.clone(),
1790                client_a.user_store.clone(),
1791                lang_registry.clone(),
1792                fs.clone(),
1793                cx,
1794            )
1795        });
1796        let (worktree_a, _) = project_a
1797            .update(cx_a, |p, cx| {
1798                p.find_or_create_local_worktree("/dir", true, cx)
1799            })
1800            .await
1801            .unwrap();
1802        worktree_a
1803            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1804            .await;
1805        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1806        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1807        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1808
1809        // Join that project as client B
1810        let project_b = Project::remote(
1811            project_id,
1812            client_b.clone(),
1813            client_b.user_store.clone(),
1814            lang_registry.clone(),
1815            fs.clone(),
1816            &mut cx_b.to_async(),
1817        )
1818        .await
1819        .unwrap();
1820
1821        // See that a guest has joined as client A.
1822        project_a
1823            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1824            .await;
1825
1826        // Begin opening a buffer as client B, but leave the project before the open completes.
1827        let buffer_b = cx_b
1828            .background()
1829            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1830        cx_b.update(|_| drop(project_b));
1831        drop(buffer_b);
1832
1833        // See that the guest has left.
1834        project_a
1835            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1836            .await;
1837    }
1838
1839    #[gpui::test(iterations = 10)]
1840    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1841        cx_a.foreground().forbid_parking();
1842        let lang_registry = Arc::new(LanguageRegistry::test());
1843        let fs = FakeFs::new(cx_a.background());
1844
1845        // Connect to a server as 2 clients.
1846        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1847        let client_a = server.create_client(cx_a, "user_a").await;
1848        let client_b = server.create_client(cx_b, "user_b").await;
1849
1850        // Share a project as client A
1851        fs.insert_tree(
1852            "/a",
1853            json!({
1854                ".zed.toml": r#"collaborators = ["user_b"]"#,
1855                "a.txt": "a-contents",
1856                "b.txt": "b-contents",
1857            }),
1858        )
1859        .await;
1860        let project_a = cx_a.update(|cx| {
1861            Project::local(
1862                client_a.clone(),
1863                client_a.user_store.clone(),
1864                lang_registry.clone(),
1865                fs.clone(),
1866                cx,
1867            )
1868        });
1869        let (worktree_a, _) = project_a
1870            .update(cx_a, |p, cx| {
1871                p.find_or_create_local_worktree("/a", true, cx)
1872            })
1873            .await
1874            .unwrap();
1875        worktree_a
1876            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1877            .await;
1878        let project_id = project_a
1879            .update(cx_a, |project, _| project.next_remote_id())
1880            .await;
1881        project_a
1882            .update(cx_a, |project, cx| project.share(cx))
1883            .await
1884            .unwrap();
1885
1886        // Join that project as client B
1887        let _project_b = Project::remote(
1888            project_id,
1889            client_b.clone(),
1890            client_b.user_store.clone(),
1891            lang_registry.clone(),
1892            fs.clone(),
1893            &mut cx_b.to_async(),
1894        )
1895        .await
1896        .unwrap();
1897
1898        // Client A sees that a guest has joined.
1899        project_a
1900            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1901            .await;
1902
1903        // Drop client B's connection and ensure client A observes client B leaving the project.
1904        client_b.disconnect(&cx_b.to_async()).unwrap();
1905        project_a
1906            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1907            .await;
1908
1909        // Rejoin the project as client B
1910        let _project_b = Project::remote(
1911            project_id,
1912            client_b.clone(),
1913            client_b.user_store.clone(),
1914            lang_registry.clone(),
1915            fs.clone(),
1916            &mut cx_b.to_async(),
1917        )
1918        .await
1919        .unwrap();
1920
1921        // Client A sees that a guest has re-joined.
1922        project_a
1923            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1924            .await;
1925
1926        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1927        client_b.wait_for_current_user(cx_b).await;
1928        server.disconnect_client(client_b.current_user_id(cx_b));
1929        cx_a.foreground().advance_clock(Duration::from_secs(3));
1930        project_a
1931            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1932            .await;
1933    }
1934
1935    #[gpui::test(iterations = 10)]
1936    async fn test_collaborating_with_diagnostics(
1937        cx_a: &mut TestAppContext,
1938        cx_b: &mut TestAppContext,
1939    ) {
1940        cx_a.foreground().forbid_parking();
1941        let mut lang_registry = Arc::new(LanguageRegistry::test());
1942        let fs = FakeFs::new(cx_a.background());
1943
1944        // Set up a fake language server.
1945        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1946        Arc::get_mut(&mut lang_registry)
1947            .unwrap()
1948            .add(Arc::new(Language::new(
1949                LanguageConfig {
1950                    name: "Rust".into(),
1951                    path_suffixes: vec!["rs".to_string()],
1952                    language_server: Some(language_server_config),
1953                    ..Default::default()
1954                },
1955                Some(tree_sitter_rust::language()),
1956            )));
1957
1958        // Connect to a server as 2 clients.
1959        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1960        let client_a = server.create_client(cx_a, "user_a").await;
1961        let client_b = server.create_client(cx_b, "user_b").await;
1962
1963        // Share a project as client A
1964        fs.insert_tree(
1965            "/a",
1966            json!({
1967                ".zed.toml": r#"collaborators = ["user_b"]"#,
1968                "a.rs": "let one = two",
1969                "other.rs": "",
1970            }),
1971        )
1972        .await;
1973        let project_a = cx_a.update(|cx| {
1974            Project::local(
1975                client_a.clone(),
1976                client_a.user_store.clone(),
1977                lang_registry.clone(),
1978                fs.clone(),
1979                cx,
1980            )
1981        });
1982        let (worktree_a, _) = project_a
1983            .update(cx_a, |p, cx| {
1984                p.find_or_create_local_worktree("/a", true, cx)
1985            })
1986            .await
1987            .unwrap();
1988        worktree_a
1989            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1990            .await;
1991        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1992        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1993        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1994
1995        // Cause the language server to start.
1996        let _ = cx_a
1997            .background()
1998            .spawn(project_a.update(cx_a, |project, cx| {
1999                project.open_buffer(
2000                    ProjectPath {
2001                        worktree_id,
2002                        path: Path::new("other.rs").into(),
2003                    },
2004                    cx,
2005                )
2006            }))
2007            .await
2008            .unwrap();
2009
2010        // Simulate a language server reporting errors for a file.
2011        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2012        fake_language_server
2013            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2014            .await;
2015        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2016            lsp::PublishDiagnosticsParams {
2017                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2018                version: None,
2019                diagnostics: vec![lsp::Diagnostic {
2020                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2021                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2022                    message: "message 1".to_string(),
2023                    ..Default::default()
2024                }],
2025            },
2026        );
2027
2028        // Wait for server to see the diagnostics update.
2029        server
2030            .condition(|store| {
2031                let worktree = store
2032                    .project(project_id)
2033                    .unwrap()
2034                    .share
2035                    .as_ref()
2036                    .unwrap()
2037                    .worktrees
2038                    .get(&worktree_id.to_proto())
2039                    .unwrap();
2040
2041                !worktree.diagnostic_summaries.is_empty()
2042            })
2043            .await;
2044
2045        // Join the worktree as client B.
2046        let project_b = Project::remote(
2047            project_id,
2048            client_b.clone(),
2049            client_b.user_store.clone(),
2050            lang_registry.clone(),
2051            fs.clone(),
2052            &mut cx_b.to_async(),
2053        )
2054        .await
2055        .unwrap();
2056
2057        project_b.read_with(cx_b, |project, cx| {
2058            assert_eq!(
2059                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2060                &[(
2061                    ProjectPath {
2062                        worktree_id,
2063                        path: Arc::from(Path::new("a.rs")),
2064                    },
2065                    DiagnosticSummary {
2066                        error_count: 1,
2067                        warning_count: 0,
2068                        ..Default::default()
2069                    },
2070                )]
2071            )
2072        });
2073
2074        // Simulate a language server reporting more errors for a file.
2075        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2076            lsp::PublishDiagnosticsParams {
2077                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2078                version: None,
2079                diagnostics: vec![
2080                    lsp::Diagnostic {
2081                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2082                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2083                        message: "message 1".to_string(),
2084                        ..Default::default()
2085                    },
2086                    lsp::Diagnostic {
2087                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2088                        range: lsp::Range::new(
2089                            lsp::Position::new(0, 10),
2090                            lsp::Position::new(0, 13),
2091                        ),
2092                        message: "message 2".to_string(),
2093                        ..Default::default()
2094                    },
2095                ],
2096            },
2097        );
2098
2099        // Client b gets the updated summaries
2100        project_b
2101            .condition(&cx_b, |project, cx| {
2102                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2103                    == &[(
2104                        ProjectPath {
2105                            worktree_id,
2106                            path: Arc::from(Path::new("a.rs")),
2107                        },
2108                        DiagnosticSummary {
2109                            error_count: 1,
2110                            warning_count: 1,
2111                            ..Default::default()
2112                        },
2113                    )]
2114            })
2115            .await;
2116
2117        // Open the file with the errors on client B. They should be present.
2118        let buffer_b = cx_b
2119            .background()
2120            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2121            .await
2122            .unwrap();
2123
2124        buffer_b.read_with(cx_b, |buffer, _| {
2125            assert_eq!(
2126                buffer
2127                    .snapshot()
2128                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2129                    .map(|entry| entry)
2130                    .collect::<Vec<_>>(),
2131                &[
2132                    DiagnosticEntry {
2133                        range: Point::new(0, 4)..Point::new(0, 7),
2134                        diagnostic: Diagnostic {
2135                            group_id: 0,
2136                            message: "message 1".to_string(),
2137                            severity: lsp::DiagnosticSeverity::ERROR,
2138                            is_primary: true,
2139                            ..Default::default()
2140                        }
2141                    },
2142                    DiagnosticEntry {
2143                        range: Point::new(0, 10)..Point::new(0, 13),
2144                        diagnostic: Diagnostic {
2145                            group_id: 1,
2146                            severity: lsp::DiagnosticSeverity::WARNING,
2147                            message: "message 2".to_string(),
2148                            is_primary: true,
2149                            ..Default::default()
2150                        }
2151                    }
2152                ]
2153            );
2154        });
2155    }
2156
2157    #[gpui::test(iterations = 10)]
2158    async fn test_collaborating_with_completion(
2159        cx_a: &mut TestAppContext,
2160        cx_b: &mut TestAppContext,
2161    ) {
2162        cx_a.foreground().forbid_parking();
2163        let mut lang_registry = Arc::new(LanguageRegistry::test());
2164        let fs = FakeFs::new(cx_a.background());
2165
2166        // Set up a fake language server.
2167        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2168        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2169            completion_provider: Some(lsp::CompletionOptions {
2170                trigger_characters: Some(vec![".".to_string()]),
2171                ..Default::default()
2172            }),
2173            ..Default::default()
2174        });
2175        Arc::get_mut(&mut lang_registry)
2176            .unwrap()
2177            .add(Arc::new(Language::new(
2178                LanguageConfig {
2179                    name: "Rust".into(),
2180                    path_suffixes: vec!["rs".to_string()],
2181                    language_server: Some(language_server_config),
2182                    ..Default::default()
2183                },
2184                Some(tree_sitter_rust::language()),
2185            )));
2186
2187        // Connect to a server as 2 clients.
2188        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2189        let client_a = server.create_client(cx_a, "user_a").await;
2190        let client_b = server.create_client(cx_b, "user_b").await;
2191
2192        // Share a project as client A
2193        fs.insert_tree(
2194            "/a",
2195            json!({
2196                ".zed.toml": r#"collaborators = ["user_b"]"#,
2197                "main.rs": "fn main() { a }",
2198                "other.rs": "",
2199            }),
2200        )
2201        .await;
2202        let project_a = cx_a.update(|cx| {
2203            Project::local(
2204                client_a.clone(),
2205                client_a.user_store.clone(),
2206                lang_registry.clone(),
2207                fs.clone(),
2208                cx,
2209            )
2210        });
2211        let (worktree_a, _) = project_a
2212            .update(cx_a, |p, cx| {
2213                p.find_or_create_local_worktree("/a", true, cx)
2214            })
2215            .await
2216            .unwrap();
2217        worktree_a
2218            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2219            .await;
2220        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2221        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2222        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2223
2224        // Join the worktree as client B.
2225        let project_b = Project::remote(
2226            project_id,
2227            client_b.clone(),
2228            client_b.user_store.clone(),
2229            lang_registry.clone(),
2230            fs.clone(),
2231            &mut cx_b.to_async(),
2232        )
2233        .await
2234        .unwrap();
2235
2236        // Open a file in an editor as the guest.
2237        let buffer_b = project_b
2238            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2239            .await
2240            .unwrap();
2241        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2242        let editor_b = cx_b.add_view(window_b, |cx| {
2243            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2244        });
2245
2246        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2247        buffer_b
2248            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2249            .await;
2250
2251        // Type a completion trigger character as the guest.
2252        editor_b.update(cx_b, |editor, cx| {
2253            editor.select_ranges([13..13], None, cx);
2254            editor.handle_input(&Input(".".into()), cx);
2255            cx.focus(&editor_b);
2256        });
2257
2258        // Receive a completion request as the host's language server.
2259        // Return some completions from the host's language server.
2260        cx_a.foreground().start_waiting();
2261        fake_language_server
2262            .handle_request::<lsp::request::Completion, _>(|params, _| {
2263                assert_eq!(
2264                    params.text_document_position.text_document.uri,
2265                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2266                );
2267                assert_eq!(
2268                    params.text_document_position.position,
2269                    lsp::Position::new(0, 14),
2270                );
2271
2272                Some(lsp::CompletionResponse::Array(vec![
2273                    lsp::CompletionItem {
2274                        label: "first_method(…)".into(),
2275                        detail: Some("fn(&mut self, B) -> C".into()),
2276                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2277                            new_text: "first_method($1)".to_string(),
2278                            range: lsp::Range::new(
2279                                lsp::Position::new(0, 14),
2280                                lsp::Position::new(0, 14),
2281                            ),
2282                        })),
2283                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2284                        ..Default::default()
2285                    },
2286                    lsp::CompletionItem {
2287                        label: "second_method(…)".into(),
2288                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2289                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2290                            new_text: "second_method()".to_string(),
2291                            range: lsp::Range::new(
2292                                lsp::Position::new(0, 14),
2293                                lsp::Position::new(0, 14),
2294                            ),
2295                        })),
2296                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2297                        ..Default::default()
2298                    },
2299                ]))
2300            })
2301            .next()
2302            .await
2303            .unwrap();
2304        cx_a.foreground().finish_waiting();
2305
2306        // Open the buffer on the host.
2307        let buffer_a = project_a
2308            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2309            .await
2310            .unwrap();
2311        buffer_a
2312            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2313            .await;
2314
2315        // Confirm a completion on the guest.
2316        editor_b
2317            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2318            .await;
2319        editor_b.update(cx_b, |editor, cx| {
2320            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2321            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2322        });
2323
2324        // Return a resolved completion from the host's language server.
2325        // The resolved completion has an additional text edit.
2326        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2327            |params, _| {
2328                assert_eq!(params.label, "first_method(…)");
2329                lsp::CompletionItem {
2330                    label: "first_method(…)".into(),
2331                    detail: Some("fn(&mut self, B) -> C".into()),
2332                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2333                        new_text: "first_method($1)".to_string(),
2334                        range: lsp::Range::new(
2335                            lsp::Position::new(0, 14),
2336                            lsp::Position::new(0, 14),
2337                        ),
2338                    })),
2339                    additional_text_edits: Some(vec![lsp::TextEdit {
2340                        new_text: "use d::SomeTrait;\n".to_string(),
2341                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2342                    }]),
2343                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2344                    ..Default::default()
2345                }
2346            },
2347        );
2348
2349        // The additional edit is applied.
2350        buffer_a
2351            .condition(&cx_a, |buffer, _| {
2352                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2353            })
2354            .await;
2355        buffer_b
2356            .condition(&cx_b, |buffer, _| {
2357                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2358            })
2359            .await;
2360    }
2361
2362    #[gpui::test(iterations = 10)]
2363    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2364        cx_a.foreground().forbid_parking();
2365        let mut lang_registry = Arc::new(LanguageRegistry::test());
2366        let fs = FakeFs::new(cx_a.background());
2367
2368        // Set up a fake language server.
2369        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2370        Arc::get_mut(&mut lang_registry)
2371            .unwrap()
2372            .add(Arc::new(Language::new(
2373                LanguageConfig {
2374                    name: "Rust".into(),
2375                    path_suffixes: vec!["rs".to_string()],
2376                    language_server: Some(language_server_config),
2377                    ..Default::default()
2378                },
2379                Some(tree_sitter_rust::language()),
2380            )));
2381
2382        // Connect to a server as 2 clients.
2383        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2384        let client_a = server.create_client(cx_a, "user_a").await;
2385        let client_b = server.create_client(cx_b, "user_b").await;
2386
2387        // Share a project as client A
2388        fs.insert_tree(
2389            "/a",
2390            json!({
2391                ".zed.toml": r#"collaborators = ["user_b"]"#,
2392                "a.rs": "let one = two",
2393            }),
2394        )
2395        .await;
2396        let project_a = cx_a.update(|cx| {
2397            Project::local(
2398                client_a.clone(),
2399                client_a.user_store.clone(),
2400                lang_registry.clone(),
2401                fs.clone(),
2402                cx,
2403            )
2404        });
2405        let (worktree_a, _) = project_a
2406            .update(cx_a, |p, cx| {
2407                p.find_or_create_local_worktree("/a", true, cx)
2408            })
2409            .await
2410            .unwrap();
2411        worktree_a
2412            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2413            .await;
2414        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2415        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2416        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2417
2418        // Join the worktree as client B.
2419        let project_b = Project::remote(
2420            project_id,
2421            client_b.clone(),
2422            client_b.user_store.clone(),
2423            lang_registry.clone(),
2424            fs.clone(),
2425            &mut cx_b.to_async(),
2426        )
2427        .await
2428        .unwrap();
2429
2430        let buffer_b = cx_b
2431            .background()
2432            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2433            .await
2434            .unwrap();
2435
2436        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2437        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2438            Some(vec![
2439                lsp::TextEdit {
2440                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2441                    new_text: "h".to_string(),
2442                },
2443                lsp::TextEdit {
2444                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2445                    new_text: "y".to_string(),
2446                },
2447            ])
2448        });
2449
2450        project_b
2451            .update(cx_b, |project, cx| {
2452                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2453            })
2454            .await
2455            .unwrap();
2456        assert_eq!(
2457            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2458            "let honey = two"
2459        );
2460    }
2461
2462    #[gpui::test(iterations = 10)]
2463    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2464        cx_a.foreground().forbid_parking();
2465        let mut lang_registry = Arc::new(LanguageRegistry::test());
2466        let fs = FakeFs::new(cx_a.background());
2467        fs.insert_tree(
2468            "/root-1",
2469            json!({
2470                ".zed.toml": r#"collaborators = ["user_b"]"#,
2471                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2472            }),
2473        )
2474        .await;
2475        fs.insert_tree(
2476            "/root-2",
2477            json!({
2478                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2479            }),
2480        )
2481        .await;
2482
2483        // Set up a fake language server.
2484        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2485        Arc::get_mut(&mut lang_registry)
2486            .unwrap()
2487            .add(Arc::new(Language::new(
2488                LanguageConfig {
2489                    name: "Rust".into(),
2490                    path_suffixes: vec!["rs".to_string()],
2491                    language_server: Some(language_server_config),
2492                    ..Default::default()
2493                },
2494                Some(tree_sitter_rust::language()),
2495            )));
2496
2497        // Connect to a server as 2 clients.
2498        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2499        let client_a = server.create_client(cx_a, "user_a").await;
2500        let client_b = server.create_client(cx_b, "user_b").await;
2501
2502        // Share a project as client A
2503        let project_a = cx_a.update(|cx| {
2504            Project::local(
2505                client_a.clone(),
2506                client_a.user_store.clone(),
2507                lang_registry.clone(),
2508                fs.clone(),
2509                cx,
2510            )
2511        });
2512        let (worktree_a, _) = project_a
2513            .update(cx_a, |p, cx| {
2514                p.find_or_create_local_worktree("/root-1", true, cx)
2515            })
2516            .await
2517            .unwrap();
2518        worktree_a
2519            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2520            .await;
2521        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2522        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2523        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2524
2525        // Join the worktree as client B.
2526        let project_b = Project::remote(
2527            project_id,
2528            client_b.clone(),
2529            client_b.user_store.clone(),
2530            lang_registry.clone(),
2531            fs.clone(),
2532            &mut cx_b.to_async(),
2533        )
2534        .await
2535        .unwrap();
2536
2537        // Open the file on client B.
2538        let buffer_b = cx_b
2539            .background()
2540            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2541            .await
2542            .unwrap();
2543
2544        // Request the definition of a symbol as the guest.
2545        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2546        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2547            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2548                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2549                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2550            )))
2551        });
2552
2553        let definitions_1 = project_b
2554            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2555            .await
2556            .unwrap();
2557        cx_b.read(|cx| {
2558            assert_eq!(definitions_1.len(), 1);
2559            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2560            let target_buffer = definitions_1[0].buffer.read(cx);
2561            assert_eq!(
2562                target_buffer.text(),
2563                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2564            );
2565            assert_eq!(
2566                definitions_1[0].range.to_point(target_buffer),
2567                Point::new(0, 6)..Point::new(0, 9)
2568            );
2569        });
2570
2571        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2572        // the previous call to `definition`.
2573        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2574            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2575                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2576                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2577            )))
2578        });
2579
2580        let definitions_2 = project_b
2581            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2582            .await
2583            .unwrap();
2584        cx_b.read(|cx| {
2585            assert_eq!(definitions_2.len(), 1);
2586            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2587            let target_buffer = definitions_2[0].buffer.read(cx);
2588            assert_eq!(
2589                target_buffer.text(),
2590                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2591            );
2592            assert_eq!(
2593                definitions_2[0].range.to_point(target_buffer),
2594                Point::new(1, 6)..Point::new(1, 11)
2595            );
2596        });
2597        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2598    }
2599
2600    #[gpui::test(iterations = 10)]
2601    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2602        cx_a.foreground().forbid_parking();
2603        let mut lang_registry = Arc::new(LanguageRegistry::test());
2604        let fs = FakeFs::new(cx_a.background());
2605        fs.insert_tree(
2606            "/root-1",
2607            json!({
2608                ".zed.toml": r#"collaborators = ["user_b"]"#,
2609                "one.rs": "const ONE: usize = 1;",
2610                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2611            }),
2612        )
2613        .await;
2614        fs.insert_tree(
2615            "/root-2",
2616            json!({
2617                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2618            }),
2619        )
2620        .await;
2621
2622        // Set up a fake language server.
2623        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2624        Arc::get_mut(&mut lang_registry)
2625            .unwrap()
2626            .add(Arc::new(Language::new(
2627                LanguageConfig {
2628                    name: "Rust".into(),
2629                    path_suffixes: vec!["rs".to_string()],
2630                    language_server: Some(language_server_config),
2631                    ..Default::default()
2632                },
2633                Some(tree_sitter_rust::language()),
2634            )));
2635
2636        // Connect to a server as 2 clients.
2637        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2638        let client_a = server.create_client(cx_a, "user_a").await;
2639        let client_b = server.create_client(cx_b, "user_b").await;
2640
2641        // Share a project as client A
2642        let project_a = cx_a.update(|cx| {
2643            Project::local(
2644                client_a.clone(),
2645                client_a.user_store.clone(),
2646                lang_registry.clone(),
2647                fs.clone(),
2648                cx,
2649            )
2650        });
2651        let (worktree_a, _) = project_a
2652            .update(cx_a, |p, cx| {
2653                p.find_or_create_local_worktree("/root-1", true, cx)
2654            })
2655            .await
2656            .unwrap();
2657        worktree_a
2658            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2659            .await;
2660        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2661        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2662        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2663
2664        // Join the worktree as client B.
2665        let project_b = Project::remote(
2666            project_id,
2667            client_b.clone(),
2668            client_b.user_store.clone(),
2669            lang_registry.clone(),
2670            fs.clone(),
2671            &mut cx_b.to_async(),
2672        )
2673        .await
2674        .unwrap();
2675
2676        // Open the file on client B.
2677        let buffer_b = cx_b
2678            .background()
2679            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2680            .await
2681            .unwrap();
2682
2683        // Request references to a symbol as the guest.
2684        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2685        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2686            assert_eq!(
2687                params.text_document_position.text_document.uri.as_str(),
2688                "file:///root-1/one.rs"
2689            );
2690            Some(vec![
2691                lsp::Location {
2692                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2693                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2694                },
2695                lsp::Location {
2696                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2697                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2698                },
2699                lsp::Location {
2700                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2701                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2702                },
2703            ])
2704        });
2705
2706        let references = project_b
2707            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2708            .await
2709            .unwrap();
2710        cx_b.read(|cx| {
2711            assert_eq!(references.len(), 3);
2712            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2713
2714            let two_buffer = references[0].buffer.read(cx);
2715            let three_buffer = references[2].buffer.read(cx);
2716            assert_eq!(
2717                two_buffer.file().unwrap().path().as_ref(),
2718                Path::new("two.rs")
2719            );
2720            assert_eq!(references[1].buffer, references[0].buffer);
2721            assert_eq!(
2722                three_buffer.file().unwrap().full_path(cx),
2723                Path::new("three.rs")
2724            );
2725
2726            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2727            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2728            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2729        });
2730    }
2731
2732    #[gpui::test(iterations = 10)]
2733    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2734        cx_a.foreground().forbid_parking();
2735        let lang_registry = Arc::new(LanguageRegistry::test());
2736        let fs = FakeFs::new(cx_a.background());
2737        fs.insert_tree(
2738            "/root-1",
2739            json!({
2740                ".zed.toml": r#"collaborators = ["user_b"]"#,
2741                "a": "hello world",
2742                "b": "goodnight moon",
2743                "c": "a world of goo",
2744                "d": "world champion of clown world",
2745            }),
2746        )
2747        .await;
2748        fs.insert_tree(
2749            "/root-2",
2750            json!({
2751                "e": "disney world is fun",
2752            }),
2753        )
2754        .await;
2755
2756        // Connect to a server as 2 clients.
2757        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2758        let client_a = server.create_client(cx_a, "user_a").await;
2759        let client_b = server.create_client(cx_b, "user_b").await;
2760
2761        // Share a project as client A
2762        let project_a = cx_a.update(|cx| {
2763            Project::local(
2764                client_a.clone(),
2765                client_a.user_store.clone(),
2766                lang_registry.clone(),
2767                fs.clone(),
2768                cx,
2769            )
2770        });
2771        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2772
2773        let (worktree_1, _) = project_a
2774            .update(cx_a, |p, cx| {
2775                p.find_or_create_local_worktree("/root-1", true, cx)
2776            })
2777            .await
2778            .unwrap();
2779        worktree_1
2780            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2781            .await;
2782        let (worktree_2, _) = project_a
2783            .update(cx_a, |p, cx| {
2784                p.find_or_create_local_worktree("/root-2", true, cx)
2785            })
2786            .await
2787            .unwrap();
2788        worktree_2
2789            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2790            .await;
2791
2792        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2793
2794        // Join the worktree as client B.
2795        let project_b = Project::remote(
2796            project_id,
2797            client_b.clone(),
2798            client_b.user_store.clone(),
2799            lang_registry.clone(),
2800            fs.clone(),
2801            &mut cx_b.to_async(),
2802        )
2803        .await
2804        .unwrap();
2805
2806        let results = project_b
2807            .update(cx_b, |project, cx| {
2808                project.search(SearchQuery::text("world", false, false), cx)
2809            })
2810            .await
2811            .unwrap();
2812
2813        let mut ranges_by_path = results
2814            .into_iter()
2815            .map(|(buffer, ranges)| {
2816                buffer.read_with(cx_b, |buffer, cx| {
2817                    let path = buffer.file().unwrap().full_path(cx);
2818                    let offset_ranges = ranges
2819                        .into_iter()
2820                        .map(|range| range.to_offset(buffer))
2821                        .collect::<Vec<_>>();
2822                    (path, offset_ranges)
2823                })
2824            })
2825            .collect::<Vec<_>>();
2826        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2827
2828        assert_eq!(
2829            ranges_by_path,
2830            &[
2831                (PathBuf::from("root-1/a"), vec![6..11]),
2832                (PathBuf::from("root-1/c"), vec![2..7]),
2833                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2834                (PathBuf::from("root-2/e"), vec![7..12]),
2835            ]
2836        );
2837    }
2838
2839    #[gpui::test(iterations = 10)]
2840    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2841        cx_a.foreground().forbid_parking();
2842        let lang_registry = Arc::new(LanguageRegistry::test());
2843        let fs = FakeFs::new(cx_a.background());
2844        fs.insert_tree(
2845            "/root-1",
2846            json!({
2847                ".zed.toml": r#"collaborators = ["user_b"]"#,
2848                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2849            }),
2850        )
2851        .await;
2852
2853        // Set up a fake language server.
2854        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2855        lang_registry.add(Arc::new(Language::new(
2856            LanguageConfig {
2857                name: "Rust".into(),
2858                path_suffixes: vec!["rs".to_string()],
2859                language_server: Some(language_server_config),
2860                ..Default::default()
2861            },
2862            Some(tree_sitter_rust::language()),
2863        )));
2864
2865        // Connect to a server as 2 clients.
2866        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2867        let client_a = server.create_client(cx_a, "user_a").await;
2868        let client_b = server.create_client(cx_b, "user_b").await;
2869
2870        // Share a project as client A
2871        let project_a = cx_a.update(|cx| {
2872            Project::local(
2873                client_a.clone(),
2874                client_a.user_store.clone(),
2875                lang_registry.clone(),
2876                fs.clone(),
2877                cx,
2878            )
2879        });
2880        let (worktree_a, _) = project_a
2881            .update(cx_a, |p, cx| {
2882                p.find_or_create_local_worktree("/root-1", true, cx)
2883            })
2884            .await
2885            .unwrap();
2886        worktree_a
2887            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2888            .await;
2889        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2890        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2891        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2892
2893        // Join the worktree as client B.
2894        let project_b = Project::remote(
2895            project_id,
2896            client_b.clone(),
2897            client_b.user_store.clone(),
2898            lang_registry.clone(),
2899            fs.clone(),
2900            &mut cx_b.to_async(),
2901        )
2902        .await
2903        .unwrap();
2904
2905        // Open the file on client B.
2906        let buffer_b = cx_b
2907            .background()
2908            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2909            .await
2910            .unwrap();
2911
2912        // Request document highlights as the guest.
2913        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2914        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2915            |params, _| {
2916                assert_eq!(
2917                    params
2918                        .text_document_position_params
2919                        .text_document
2920                        .uri
2921                        .as_str(),
2922                    "file:///root-1/main.rs"
2923                );
2924                assert_eq!(
2925                    params.text_document_position_params.position,
2926                    lsp::Position::new(0, 34)
2927                );
2928                Some(vec![
2929                    lsp::DocumentHighlight {
2930                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2931                        range: lsp::Range::new(
2932                            lsp::Position::new(0, 10),
2933                            lsp::Position::new(0, 16),
2934                        ),
2935                    },
2936                    lsp::DocumentHighlight {
2937                        kind: Some(lsp::DocumentHighlightKind::READ),
2938                        range: lsp::Range::new(
2939                            lsp::Position::new(0, 32),
2940                            lsp::Position::new(0, 38),
2941                        ),
2942                    },
2943                    lsp::DocumentHighlight {
2944                        kind: Some(lsp::DocumentHighlightKind::READ),
2945                        range: lsp::Range::new(
2946                            lsp::Position::new(0, 41),
2947                            lsp::Position::new(0, 47),
2948                        ),
2949                    },
2950                ])
2951            },
2952        );
2953
2954        let highlights = project_b
2955            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2956            .await
2957            .unwrap();
2958        buffer_b.read_with(cx_b, |buffer, _| {
2959            let snapshot = buffer.snapshot();
2960
2961            let highlights = highlights
2962                .into_iter()
2963                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2964                .collect::<Vec<_>>();
2965            assert_eq!(
2966                highlights,
2967                &[
2968                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2969                    (lsp::DocumentHighlightKind::READ, 32..38),
2970                    (lsp::DocumentHighlightKind::READ, 41..47)
2971                ]
2972            )
2973        });
2974    }
2975
2976    #[gpui::test(iterations = 10)]
2977    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2978        cx_a.foreground().forbid_parking();
2979        let mut lang_registry = Arc::new(LanguageRegistry::test());
2980        let fs = FakeFs::new(cx_a.background());
2981        fs.insert_tree(
2982            "/code",
2983            json!({
2984                "crate-1": {
2985                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2986                    "one.rs": "const ONE: usize = 1;",
2987                },
2988                "crate-2": {
2989                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2990                },
2991                "private": {
2992                    "passwords.txt": "the-password",
2993                }
2994            }),
2995        )
2996        .await;
2997
2998        // Set up a fake language server.
2999        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3000        Arc::get_mut(&mut lang_registry)
3001            .unwrap()
3002            .add(Arc::new(Language::new(
3003                LanguageConfig {
3004                    name: "Rust".into(),
3005                    path_suffixes: vec!["rs".to_string()],
3006                    language_server: Some(language_server_config),
3007                    ..Default::default()
3008                },
3009                Some(tree_sitter_rust::language()),
3010            )));
3011
3012        // Connect to a server as 2 clients.
3013        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3014        let client_a = server.create_client(cx_a, "user_a").await;
3015        let client_b = server.create_client(cx_b, "user_b").await;
3016
3017        // Share a project as client A
3018        let project_a = cx_a.update(|cx| {
3019            Project::local(
3020                client_a.clone(),
3021                client_a.user_store.clone(),
3022                lang_registry.clone(),
3023                fs.clone(),
3024                cx,
3025            )
3026        });
3027        let (worktree_a, _) = project_a
3028            .update(cx_a, |p, cx| {
3029                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3030            })
3031            .await
3032            .unwrap();
3033        worktree_a
3034            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3035            .await;
3036        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3037        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3038        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3039
3040        // Join the worktree as client B.
3041        let project_b = Project::remote(
3042            project_id,
3043            client_b.clone(),
3044            client_b.user_store.clone(),
3045            lang_registry.clone(),
3046            fs.clone(),
3047            &mut cx_b.to_async(),
3048        )
3049        .await
3050        .unwrap();
3051
3052        // Cause the language server to start.
3053        let _buffer = cx_b
3054            .background()
3055            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3056            .await
3057            .unwrap();
3058
3059        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3060        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3061            #[allow(deprecated)]
3062            Some(vec![lsp::SymbolInformation {
3063                name: "TWO".into(),
3064                location: lsp::Location {
3065                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3066                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3067                },
3068                kind: lsp::SymbolKind::CONSTANT,
3069                tags: None,
3070                container_name: None,
3071                deprecated: None,
3072            }])
3073        });
3074
3075        // Request the definition of a symbol as the guest.
3076        let symbols = project_b
3077            .update(cx_b, |p, cx| p.symbols("two", cx))
3078            .await
3079            .unwrap();
3080        assert_eq!(symbols.len(), 1);
3081        assert_eq!(symbols[0].name, "TWO");
3082
3083        // Open one of the returned symbols.
3084        let buffer_b_2 = project_b
3085            .update(cx_b, |project, cx| {
3086                project.open_buffer_for_symbol(&symbols[0], cx)
3087            })
3088            .await
3089            .unwrap();
3090        buffer_b_2.read_with(cx_b, |buffer, _| {
3091            assert_eq!(
3092                buffer.file().unwrap().path().as_ref(),
3093                Path::new("../crate-2/two.rs")
3094            );
3095        });
3096
3097        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3098        let mut fake_symbol = symbols[0].clone();
3099        fake_symbol.path = Path::new("/code/secrets").into();
3100        let error = project_b
3101            .update(cx_b, |project, cx| {
3102                project.open_buffer_for_symbol(&fake_symbol, cx)
3103            })
3104            .await
3105            .unwrap_err();
3106        assert!(error.to_string().contains("invalid symbol signature"));
3107    }
3108
3109    #[gpui::test(iterations = 10)]
3110    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3111        cx_a: &mut TestAppContext,
3112        cx_b: &mut TestAppContext,
3113        mut rng: StdRng,
3114    ) {
3115        cx_a.foreground().forbid_parking();
3116        let mut lang_registry = Arc::new(LanguageRegistry::test());
3117        let fs = FakeFs::new(cx_a.background());
3118        fs.insert_tree(
3119            "/root",
3120            json!({
3121                ".zed.toml": r#"collaborators = ["user_b"]"#,
3122                "a.rs": "const ONE: usize = b::TWO;",
3123                "b.rs": "const TWO: usize = 2",
3124            }),
3125        )
3126        .await;
3127
3128        // Set up a fake language server.
3129        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3130
3131        Arc::get_mut(&mut lang_registry)
3132            .unwrap()
3133            .add(Arc::new(Language::new(
3134                LanguageConfig {
3135                    name: "Rust".into(),
3136                    path_suffixes: vec!["rs".to_string()],
3137                    language_server: Some(language_server_config),
3138                    ..Default::default()
3139                },
3140                Some(tree_sitter_rust::language()),
3141            )));
3142
3143        // Connect to a server as 2 clients.
3144        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3145        let client_a = server.create_client(cx_a, "user_a").await;
3146        let client_b = server.create_client(cx_b, "user_b").await;
3147
3148        // Share a project as client A
3149        let project_a = cx_a.update(|cx| {
3150            Project::local(
3151                client_a.clone(),
3152                client_a.user_store.clone(),
3153                lang_registry.clone(),
3154                fs.clone(),
3155                cx,
3156            )
3157        });
3158
3159        let (worktree_a, _) = project_a
3160            .update(cx_a, |p, cx| {
3161                p.find_or_create_local_worktree("/root", true, cx)
3162            })
3163            .await
3164            .unwrap();
3165        worktree_a
3166            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3167            .await;
3168        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3169        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3170        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3171
3172        // Join the worktree as client B.
3173        let project_b = Project::remote(
3174            project_id,
3175            client_b.clone(),
3176            client_b.user_store.clone(),
3177            lang_registry.clone(),
3178            fs.clone(),
3179            &mut cx_b.to_async(),
3180        )
3181        .await
3182        .unwrap();
3183
3184        let buffer_b1 = cx_b
3185            .background()
3186            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3187            .await
3188            .unwrap();
3189
3190        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3191        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3192            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3193                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3194                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3195            )))
3196        });
3197
3198        let definitions;
3199        let buffer_b2;
3200        if rng.gen() {
3201            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3202            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3203        } else {
3204            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3205            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3206        }
3207
3208        let buffer_b2 = buffer_b2.await.unwrap();
3209        let definitions = definitions.await.unwrap();
3210        assert_eq!(definitions.len(), 1);
3211        assert_eq!(definitions[0].buffer, buffer_b2);
3212    }
3213
3214    #[gpui::test(iterations = 10)]
3215    async fn test_collaborating_with_code_actions(
3216        cx_a: &mut TestAppContext,
3217        cx_b: &mut TestAppContext,
3218    ) {
3219        cx_a.foreground().forbid_parking();
3220        let mut lang_registry = Arc::new(LanguageRegistry::test());
3221        let fs = FakeFs::new(cx_a.background());
3222        cx_b.update(|cx| editor::init(cx));
3223
3224        // Set up a fake language server.
3225        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3226        Arc::get_mut(&mut lang_registry)
3227            .unwrap()
3228            .add(Arc::new(Language::new(
3229                LanguageConfig {
3230                    name: "Rust".into(),
3231                    path_suffixes: vec!["rs".to_string()],
3232                    language_server: Some(language_server_config),
3233                    ..Default::default()
3234                },
3235                Some(tree_sitter_rust::language()),
3236            )));
3237
3238        // Connect to a server as 2 clients.
3239        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3240        let client_a = server.create_client(cx_a, "user_a").await;
3241        let client_b = server.create_client(cx_b, "user_b").await;
3242
3243        // Share a project as client A
3244        fs.insert_tree(
3245            "/a",
3246            json!({
3247                ".zed.toml": r#"collaborators = ["user_b"]"#,
3248                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3249                "other.rs": "pub fn foo() -> usize { 4 }",
3250            }),
3251        )
3252        .await;
3253        let project_a = cx_a.update(|cx| {
3254            Project::local(
3255                client_a.clone(),
3256                client_a.user_store.clone(),
3257                lang_registry.clone(),
3258                fs.clone(),
3259                cx,
3260            )
3261        });
3262        let (worktree_a, _) = project_a
3263            .update(cx_a, |p, cx| {
3264                p.find_or_create_local_worktree("/a", true, cx)
3265            })
3266            .await
3267            .unwrap();
3268        worktree_a
3269            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3270            .await;
3271        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3272        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3273        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3274
3275        // Join the worktree as client B.
3276        let project_b = Project::remote(
3277            project_id,
3278            client_b.clone(),
3279            client_b.user_store.clone(),
3280            lang_registry.clone(),
3281            fs.clone(),
3282            &mut cx_b.to_async(),
3283        )
3284        .await
3285        .unwrap();
3286        let mut params = cx_b.update(WorkspaceParams::test);
3287        params.languages = lang_registry.clone();
3288        params.client = client_b.client.clone();
3289        params.user_store = client_b.user_store.clone();
3290        params.project = project_b;
3291
3292        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3293        let editor_b = workspace_b
3294            .update(cx_b, |workspace, cx| {
3295                workspace.open_path((worktree_id, "main.rs"), cx)
3296            })
3297            .await
3298            .unwrap()
3299            .downcast::<Editor>()
3300            .unwrap();
3301
3302        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3303        fake_language_server
3304            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3305                assert_eq!(
3306                    params.text_document.uri,
3307                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3308                );
3309                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3310                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3311                None
3312            })
3313            .next()
3314            .await;
3315
3316        // Move cursor to a location that contains code actions.
3317        editor_b.update(cx_b, |editor, cx| {
3318            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3319            cx.focus(&editor_b);
3320        });
3321
3322        fake_language_server
3323            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3324                assert_eq!(
3325                    params.text_document.uri,
3326                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3327                );
3328                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3329                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3330
3331                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3332                    lsp::CodeAction {
3333                        title: "Inline into all callers".to_string(),
3334                        edit: Some(lsp::WorkspaceEdit {
3335                            changes: Some(
3336                                [
3337                                    (
3338                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3339                                        vec![lsp::TextEdit::new(
3340                                            lsp::Range::new(
3341                                                lsp::Position::new(1, 22),
3342                                                lsp::Position::new(1, 34),
3343                                            ),
3344                                            "4".to_string(),
3345                                        )],
3346                                    ),
3347                                    (
3348                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3349                                        vec![lsp::TextEdit::new(
3350                                            lsp::Range::new(
3351                                                lsp::Position::new(0, 0),
3352                                                lsp::Position::new(0, 27),
3353                                            ),
3354                                            "".to_string(),
3355                                        )],
3356                                    ),
3357                                ]
3358                                .into_iter()
3359                                .collect(),
3360                            ),
3361                            ..Default::default()
3362                        }),
3363                        data: Some(json!({
3364                            "codeActionParams": {
3365                                "range": {
3366                                    "start": {"line": 1, "column": 31},
3367                                    "end": {"line": 1, "column": 31},
3368                                }
3369                            }
3370                        })),
3371                        ..Default::default()
3372                    },
3373                )])
3374            })
3375            .next()
3376            .await;
3377
3378        // Toggle code actions and wait for them to display.
3379        editor_b.update(cx_b, |editor, cx| {
3380            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3381        });
3382        editor_b
3383            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3384            .await;
3385
3386        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3387
3388        // Confirming the code action will trigger a resolve request.
3389        let confirm_action = workspace_b
3390            .update(cx_b, |workspace, cx| {
3391                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3392            })
3393            .unwrap();
3394        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3395            lsp::CodeAction {
3396                title: "Inline into all callers".to_string(),
3397                edit: Some(lsp::WorkspaceEdit {
3398                    changes: Some(
3399                        [
3400                            (
3401                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3402                                vec![lsp::TextEdit::new(
3403                                    lsp::Range::new(
3404                                        lsp::Position::new(1, 22),
3405                                        lsp::Position::new(1, 34),
3406                                    ),
3407                                    "4".to_string(),
3408                                )],
3409                            ),
3410                            (
3411                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3412                                vec![lsp::TextEdit::new(
3413                                    lsp::Range::new(
3414                                        lsp::Position::new(0, 0),
3415                                        lsp::Position::new(0, 27),
3416                                    ),
3417                                    "".to_string(),
3418                                )],
3419                            ),
3420                        ]
3421                        .into_iter()
3422                        .collect(),
3423                    ),
3424                    ..Default::default()
3425                }),
3426                ..Default::default()
3427            }
3428        });
3429
3430        // After the action is confirmed, an editor containing both modified files is opened.
3431        confirm_action.await.unwrap();
3432        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3433            workspace
3434                .active_item(cx)
3435                .unwrap()
3436                .downcast::<Editor>()
3437                .unwrap()
3438        });
3439        code_action_editor.update(cx_b, |editor, cx| {
3440            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3441            editor.undo(&Undo, cx);
3442            assert_eq!(
3443                editor.text(cx),
3444                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3445            );
3446            editor.redo(&Redo, cx);
3447            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3448        });
3449    }
3450
3451    #[gpui::test(iterations = 10)]
3452    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3453        cx_a.foreground().forbid_parking();
3454        let mut lang_registry = Arc::new(LanguageRegistry::test());
3455        let fs = FakeFs::new(cx_a.background());
3456        cx_b.update(|cx| editor::init(cx));
3457
3458        // Set up a fake language server.
3459        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3460        Arc::get_mut(&mut lang_registry)
3461            .unwrap()
3462            .add(Arc::new(Language::new(
3463                LanguageConfig {
3464                    name: "Rust".into(),
3465                    path_suffixes: vec!["rs".to_string()],
3466                    language_server: Some(language_server_config),
3467                    ..Default::default()
3468                },
3469                Some(tree_sitter_rust::language()),
3470            )));
3471
3472        // Connect to a server as 2 clients.
3473        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3474        let client_a = server.create_client(cx_a, "user_a").await;
3475        let client_b = server.create_client(cx_b, "user_b").await;
3476
3477        // Share a project as client A
3478        fs.insert_tree(
3479            "/dir",
3480            json!({
3481                ".zed.toml": r#"collaborators = ["user_b"]"#,
3482                "one.rs": "const ONE: usize = 1;",
3483                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3484            }),
3485        )
3486        .await;
3487        let project_a = cx_a.update(|cx| {
3488            Project::local(
3489                client_a.clone(),
3490                client_a.user_store.clone(),
3491                lang_registry.clone(),
3492                fs.clone(),
3493                cx,
3494            )
3495        });
3496        let (worktree_a, _) = project_a
3497            .update(cx_a, |p, cx| {
3498                p.find_or_create_local_worktree("/dir", true, cx)
3499            })
3500            .await
3501            .unwrap();
3502        worktree_a
3503            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3504            .await;
3505        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3506        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3507        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3508
3509        // Join the worktree as client B.
3510        let project_b = Project::remote(
3511            project_id,
3512            client_b.clone(),
3513            client_b.user_store.clone(),
3514            lang_registry.clone(),
3515            fs.clone(),
3516            &mut cx_b.to_async(),
3517        )
3518        .await
3519        .unwrap();
3520        let mut params = cx_b.update(WorkspaceParams::test);
3521        params.languages = lang_registry.clone();
3522        params.client = client_b.client.clone();
3523        params.user_store = client_b.user_store.clone();
3524        params.project = project_b;
3525
3526        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3527        let editor_b = workspace_b
3528            .update(cx_b, |workspace, cx| {
3529                workspace.open_path((worktree_id, "one.rs"), cx)
3530            })
3531            .await
3532            .unwrap()
3533            .downcast::<Editor>()
3534            .unwrap();
3535        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3536
3537        // Move cursor to a location that can be renamed.
3538        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3539            editor.select_ranges([7..7], None, cx);
3540            editor.rename(&Rename, cx).unwrap()
3541        });
3542
3543        fake_language_server
3544            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3545                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3546                assert_eq!(params.position, lsp::Position::new(0, 7));
3547                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3548                    lsp::Position::new(0, 6),
3549                    lsp::Position::new(0, 9),
3550                )))
3551            })
3552            .next()
3553            .await
3554            .unwrap();
3555        prepare_rename.await.unwrap();
3556        editor_b.update(cx_b, |editor, cx| {
3557            let rename = editor.pending_rename().unwrap();
3558            let buffer = editor.buffer().read(cx).snapshot(cx);
3559            assert_eq!(
3560                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3561                6..9
3562            );
3563            rename.editor.update(cx, |rename_editor, cx| {
3564                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3565                    rename_buffer.edit([0..3], "THREE", cx);
3566                });
3567            });
3568        });
3569
3570        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3571            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3572        });
3573        fake_language_server
3574            .handle_request::<lsp::request::Rename, _>(|params, _| {
3575                assert_eq!(
3576                    params.text_document_position.text_document.uri.as_str(),
3577                    "file:///dir/one.rs"
3578                );
3579                assert_eq!(
3580                    params.text_document_position.position,
3581                    lsp::Position::new(0, 6)
3582                );
3583                assert_eq!(params.new_name, "THREE");
3584                Some(lsp::WorkspaceEdit {
3585                    changes: Some(
3586                        [
3587                            (
3588                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3589                                vec![lsp::TextEdit::new(
3590                                    lsp::Range::new(
3591                                        lsp::Position::new(0, 6),
3592                                        lsp::Position::new(0, 9),
3593                                    ),
3594                                    "THREE".to_string(),
3595                                )],
3596                            ),
3597                            (
3598                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3599                                vec![
3600                                    lsp::TextEdit::new(
3601                                        lsp::Range::new(
3602                                            lsp::Position::new(0, 24),
3603                                            lsp::Position::new(0, 27),
3604                                        ),
3605                                        "THREE".to_string(),
3606                                    ),
3607                                    lsp::TextEdit::new(
3608                                        lsp::Range::new(
3609                                            lsp::Position::new(0, 35),
3610                                            lsp::Position::new(0, 38),
3611                                        ),
3612                                        "THREE".to_string(),
3613                                    ),
3614                                ],
3615                            ),
3616                        ]
3617                        .into_iter()
3618                        .collect(),
3619                    ),
3620                    ..Default::default()
3621                })
3622            })
3623            .next()
3624            .await
3625            .unwrap();
3626        confirm_rename.await.unwrap();
3627
3628        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3629            workspace
3630                .active_item(cx)
3631                .unwrap()
3632                .downcast::<Editor>()
3633                .unwrap()
3634        });
3635        rename_editor.update(cx_b, |editor, cx| {
3636            assert_eq!(
3637                editor.text(cx),
3638                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3639            );
3640            editor.undo(&Undo, cx);
3641            assert_eq!(
3642                editor.text(cx),
3643                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3644            );
3645            editor.redo(&Redo, cx);
3646            assert_eq!(
3647                editor.text(cx),
3648                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3649            );
3650        });
3651
3652        // Ensure temporary rename edits cannot be undone/redone.
3653        editor_b.update(cx_b, |editor, cx| {
3654            editor.undo(&Undo, cx);
3655            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3656            editor.undo(&Undo, cx);
3657            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3658            editor.redo(&Redo, cx);
3659            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3660        })
3661    }
3662
3663    #[gpui::test(iterations = 10)]
3664    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3665        cx_a.foreground().forbid_parking();
3666
3667        // Connect to a server as 2 clients.
3668        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3669        let client_a = server.create_client(cx_a, "user_a").await;
3670        let client_b = server.create_client(cx_b, "user_b").await;
3671
3672        // Create an org that includes these 2 users.
3673        let db = &server.app_state.db;
3674        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3675        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3676            .await
3677            .unwrap();
3678        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3679            .await
3680            .unwrap();
3681
3682        // Create a channel that includes all the users.
3683        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3684        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3685            .await
3686            .unwrap();
3687        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3688            .await
3689            .unwrap();
3690        db.create_channel_message(
3691            channel_id,
3692            client_b.current_user_id(&cx_b),
3693            "hello A, it's B.",
3694            OffsetDateTime::now_utc(),
3695            1,
3696        )
3697        .await
3698        .unwrap();
3699
3700        let channels_a = cx_a
3701            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3702        channels_a
3703            .condition(cx_a, |list, _| list.available_channels().is_some())
3704            .await;
3705        channels_a.read_with(cx_a, |list, _| {
3706            assert_eq!(
3707                list.available_channels().unwrap(),
3708                &[ChannelDetails {
3709                    id: channel_id.to_proto(),
3710                    name: "test-channel".to_string()
3711                }]
3712            )
3713        });
3714        let channel_a = channels_a.update(cx_a, |this, cx| {
3715            this.get_channel(channel_id.to_proto(), cx).unwrap()
3716        });
3717        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3718        channel_a
3719            .condition(&cx_a, |channel, _| {
3720                channel_messages(channel)
3721                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3722            })
3723            .await;
3724
3725        let channels_b = cx_b
3726            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3727        channels_b
3728            .condition(cx_b, |list, _| list.available_channels().is_some())
3729            .await;
3730        channels_b.read_with(cx_b, |list, _| {
3731            assert_eq!(
3732                list.available_channels().unwrap(),
3733                &[ChannelDetails {
3734                    id: channel_id.to_proto(),
3735                    name: "test-channel".to_string()
3736                }]
3737            )
3738        });
3739
3740        let channel_b = channels_b.update(cx_b, |this, cx| {
3741            this.get_channel(channel_id.to_proto(), cx).unwrap()
3742        });
3743        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3744        channel_b
3745            .condition(&cx_b, |channel, _| {
3746                channel_messages(channel)
3747                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3748            })
3749            .await;
3750
3751        channel_a
3752            .update(cx_a, |channel, cx| {
3753                channel
3754                    .send_message("oh, hi B.".to_string(), cx)
3755                    .unwrap()
3756                    .detach();
3757                let task = channel.send_message("sup".to_string(), cx).unwrap();
3758                assert_eq!(
3759                    channel_messages(channel),
3760                    &[
3761                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3762                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3763                        ("user_a".to_string(), "sup".to_string(), true)
3764                    ]
3765                );
3766                task
3767            })
3768            .await
3769            .unwrap();
3770
3771        channel_b
3772            .condition(&cx_b, |channel, _| {
3773                channel_messages(channel)
3774                    == [
3775                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3776                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3777                        ("user_a".to_string(), "sup".to_string(), false),
3778                    ]
3779            })
3780            .await;
3781
3782        assert_eq!(
3783            server
3784                .state()
3785                .await
3786                .channel(channel_id)
3787                .unwrap()
3788                .connection_ids
3789                .len(),
3790            2
3791        );
3792        cx_b.update(|_| drop(channel_b));
3793        server
3794            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3795            .await;
3796
3797        cx_a.update(|_| drop(channel_a));
3798        server
3799            .condition(|state| state.channel(channel_id).is_none())
3800            .await;
3801    }
3802
3803    #[gpui::test(iterations = 10)]
3804    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3805        cx_a.foreground().forbid_parking();
3806
3807        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3808        let client_a = server.create_client(cx_a, "user_a").await;
3809
3810        let db = &server.app_state.db;
3811        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3812        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3813        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3814            .await
3815            .unwrap();
3816        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3817            .await
3818            .unwrap();
3819
3820        let channels_a = cx_a
3821            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3822        channels_a
3823            .condition(cx_a, |list, _| list.available_channels().is_some())
3824            .await;
3825        let channel_a = channels_a.update(cx_a, |this, cx| {
3826            this.get_channel(channel_id.to_proto(), cx).unwrap()
3827        });
3828
3829        // Messages aren't allowed to be too long.
3830        channel_a
3831            .update(cx_a, |channel, cx| {
3832                let long_body = "this is long.\n".repeat(1024);
3833                channel.send_message(long_body, cx).unwrap()
3834            })
3835            .await
3836            .unwrap_err();
3837
3838        // Messages aren't allowed to be blank.
3839        channel_a.update(cx_a, |channel, cx| {
3840            channel.send_message(String::new(), cx).unwrap_err()
3841        });
3842
3843        // Leading and trailing whitespace are trimmed.
3844        channel_a
3845            .update(cx_a, |channel, cx| {
3846                channel
3847                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3848                    .unwrap()
3849            })
3850            .await
3851            .unwrap();
3852        assert_eq!(
3853            db.get_channel_messages(channel_id, 10, None)
3854                .await
3855                .unwrap()
3856                .iter()
3857                .map(|m| &m.body)
3858                .collect::<Vec<_>>(),
3859            &["surrounded by whitespace"]
3860        );
3861    }
3862
3863    #[gpui::test(iterations = 10)]
3864    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3865        cx_a.foreground().forbid_parking();
3866
3867        // Connect to a server as 2 clients.
3868        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3869        let client_a = server.create_client(cx_a, "user_a").await;
3870        let client_b = server.create_client(cx_b, "user_b").await;
3871        let mut status_b = client_b.status();
3872
3873        // Create an org that includes these 2 users.
3874        let db = &server.app_state.db;
3875        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3876        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3877            .await
3878            .unwrap();
3879        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3880            .await
3881            .unwrap();
3882
3883        // Create a channel that includes all the users.
3884        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3885        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3886            .await
3887            .unwrap();
3888        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3889            .await
3890            .unwrap();
3891        db.create_channel_message(
3892            channel_id,
3893            client_b.current_user_id(&cx_b),
3894            "hello A, it's B.",
3895            OffsetDateTime::now_utc(),
3896            2,
3897        )
3898        .await
3899        .unwrap();
3900
3901        let channels_a = cx_a
3902            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3903        channels_a
3904            .condition(cx_a, |list, _| list.available_channels().is_some())
3905            .await;
3906
3907        channels_a.read_with(cx_a, |list, _| {
3908            assert_eq!(
3909                list.available_channels().unwrap(),
3910                &[ChannelDetails {
3911                    id: channel_id.to_proto(),
3912                    name: "test-channel".to_string()
3913                }]
3914            )
3915        });
3916        let channel_a = channels_a.update(cx_a, |this, cx| {
3917            this.get_channel(channel_id.to_proto(), cx).unwrap()
3918        });
3919        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3920        channel_a
3921            .condition(&cx_a, |channel, _| {
3922                channel_messages(channel)
3923                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3924            })
3925            .await;
3926
3927        let channels_b = cx_b
3928            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3929        channels_b
3930            .condition(cx_b, |list, _| list.available_channels().is_some())
3931            .await;
3932        channels_b.read_with(cx_b, |list, _| {
3933            assert_eq!(
3934                list.available_channels().unwrap(),
3935                &[ChannelDetails {
3936                    id: channel_id.to_proto(),
3937                    name: "test-channel".to_string()
3938                }]
3939            )
3940        });
3941
3942        let channel_b = channels_b.update(cx_b, |this, cx| {
3943            this.get_channel(channel_id.to_proto(), cx).unwrap()
3944        });
3945        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3946        channel_b
3947            .condition(&cx_b, |channel, _| {
3948                channel_messages(channel)
3949                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3950            })
3951            .await;
3952
3953        // Disconnect client B, ensuring we can still access its cached channel data.
3954        server.forbid_connections();
3955        server.disconnect_client(client_b.current_user_id(&cx_b));
3956        cx_b.foreground().advance_clock(Duration::from_secs(3));
3957        while !matches!(
3958            status_b.next().await,
3959            Some(client::Status::ReconnectionError { .. })
3960        ) {}
3961
3962        channels_b.read_with(cx_b, |channels, _| {
3963            assert_eq!(
3964                channels.available_channels().unwrap(),
3965                [ChannelDetails {
3966                    id: channel_id.to_proto(),
3967                    name: "test-channel".to_string()
3968                }]
3969            )
3970        });
3971        channel_b.read_with(cx_b, |channel, _| {
3972            assert_eq!(
3973                channel_messages(channel),
3974                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3975            )
3976        });
3977
3978        // Send a message from client B while it is disconnected.
3979        channel_b
3980            .update(cx_b, |channel, cx| {
3981                let task = channel
3982                    .send_message("can you see this?".to_string(), cx)
3983                    .unwrap();
3984                assert_eq!(
3985                    channel_messages(channel),
3986                    &[
3987                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3988                        ("user_b".to_string(), "can you see this?".to_string(), true)
3989                    ]
3990                );
3991                task
3992            })
3993            .await
3994            .unwrap_err();
3995
3996        // Send a message from client A while B is disconnected.
3997        channel_a
3998            .update(cx_a, |channel, cx| {
3999                channel
4000                    .send_message("oh, hi B.".to_string(), cx)
4001                    .unwrap()
4002                    .detach();
4003                let task = channel.send_message("sup".to_string(), cx).unwrap();
4004                assert_eq!(
4005                    channel_messages(channel),
4006                    &[
4007                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4008                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4009                        ("user_a".to_string(), "sup".to_string(), true)
4010                    ]
4011                );
4012                task
4013            })
4014            .await
4015            .unwrap();
4016
4017        // Give client B a chance to reconnect.
4018        server.allow_connections();
4019        cx_b.foreground().advance_clock(Duration::from_secs(10));
4020
4021        // Verify that B sees the new messages upon reconnection, as well as the message client B
4022        // sent while offline.
4023        channel_b
4024            .condition(&cx_b, |channel, _| {
4025                channel_messages(channel)
4026                    == [
4027                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4028                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4029                        ("user_a".to_string(), "sup".to_string(), false),
4030                        ("user_b".to_string(), "can you see this?".to_string(), false),
4031                    ]
4032            })
4033            .await;
4034
4035        // Ensure client A and B can communicate normally after reconnection.
4036        channel_a
4037            .update(cx_a, |channel, cx| {
4038                channel.send_message("you online?".to_string(), cx).unwrap()
4039            })
4040            .await
4041            .unwrap();
4042        channel_b
4043            .condition(&cx_b, |channel, _| {
4044                channel_messages(channel)
4045                    == [
4046                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4047                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4048                        ("user_a".to_string(), "sup".to_string(), false),
4049                        ("user_b".to_string(), "can you see this?".to_string(), false),
4050                        ("user_a".to_string(), "you online?".to_string(), false),
4051                    ]
4052            })
4053            .await;
4054
4055        channel_b
4056            .update(cx_b, |channel, cx| {
4057                channel.send_message("yep".to_string(), cx).unwrap()
4058            })
4059            .await
4060            .unwrap();
4061        channel_a
4062            .condition(&cx_a, |channel, _| {
4063                channel_messages(channel)
4064                    == [
4065                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4066                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4067                        ("user_a".to_string(), "sup".to_string(), false),
4068                        ("user_b".to_string(), "can you see this?".to_string(), false),
4069                        ("user_a".to_string(), "you online?".to_string(), false),
4070                        ("user_b".to_string(), "yep".to_string(), false),
4071                    ]
4072            })
4073            .await;
4074    }
4075
4076    #[gpui::test(iterations = 10)]
4077    async fn test_contacts(
4078        cx_a: &mut TestAppContext,
4079        cx_b: &mut TestAppContext,
4080        cx_c: &mut TestAppContext,
4081    ) {
4082        cx_a.foreground().forbid_parking();
4083        let lang_registry = Arc::new(LanguageRegistry::test());
4084        let fs = FakeFs::new(cx_a.background());
4085
4086        // Connect to a server as 3 clients.
4087        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4088        let client_a = server.create_client(cx_a, "user_a").await;
4089        let client_b = server.create_client(cx_b, "user_b").await;
4090        let client_c = server.create_client(cx_c, "user_c").await;
4091
4092        // Share a worktree as client A.
4093        fs.insert_tree(
4094            "/a",
4095            json!({
4096                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4097            }),
4098        )
4099        .await;
4100
4101        let project_a = cx_a.update(|cx| {
4102            Project::local(
4103                client_a.clone(),
4104                client_a.user_store.clone(),
4105                lang_registry.clone(),
4106                fs.clone(),
4107                cx,
4108            )
4109        });
4110        let (worktree_a, _) = project_a
4111            .update(cx_a, |p, cx| {
4112                p.find_or_create_local_worktree("/a", true, cx)
4113            })
4114            .await
4115            .unwrap();
4116        worktree_a
4117            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4118            .await;
4119
4120        client_a
4121            .user_store
4122            .condition(&cx_a, |user_store, _| {
4123                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4124            })
4125            .await;
4126        client_b
4127            .user_store
4128            .condition(&cx_b, |user_store, _| {
4129                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4130            })
4131            .await;
4132        client_c
4133            .user_store
4134            .condition(&cx_c, |user_store, _| {
4135                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4136            })
4137            .await;
4138
4139        let project_id = project_a
4140            .update(cx_a, |project, _| project.next_remote_id())
4141            .await;
4142        project_a
4143            .update(cx_a, |project, cx| project.share(cx))
4144            .await
4145            .unwrap();
4146
4147        let _project_b = Project::remote(
4148            project_id,
4149            client_b.clone(),
4150            client_b.user_store.clone(),
4151            lang_registry.clone(),
4152            fs.clone(),
4153            &mut cx_b.to_async(),
4154        )
4155        .await
4156        .unwrap();
4157
4158        client_a
4159            .user_store
4160            .condition(&cx_a, |user_store, _| {
4161                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4162            })
4163            .await;
4164        client_b
4165            .user_store
4166            .condition(&cx_b, |user_store, _| {
4167                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4168            })
4169            .await;
4170        client_c
4171            .user_store
4172            .condition(&cx_c, |user_store, _| {
4173                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4174            })
4175            .await;
4176
4177        project_a
4178            .condition(&cx_a, |project, _| {
4179                project.collaborators().contains_key(&client_b.peer_id)
4180            })
4181            .await;
4182
4183        cx_a.update(move |_| drop(project_a));
4184        client_a
4185            .user_store
4186            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4187            .await;
4188        client_b
4189            .user_store
4190            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4191            .await;
4192        client_c
4193            .user_store
4194            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4195            .await;
4196
4197        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4198            user_store
4199                .contacts()
4200                .iter()
4201                .map(|contact| {
4202                    let worktrees = contact
4203                        .projects
4204                        .iter()
4205                        .map(|p| {
4206                            (
4207                                p.worktree_root_names[0].as_str(),
4208                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4209                            )
4210                        })
4211                        .collect();
4212                    (contact.user.github_login.as_str(), worktrees)
4213                })
4214                .collect()
4215        }
4216    }
4217
4218    #[gpui::test(iterations = 10)]
4219    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4220        cx_a.foreground().forbid_parking();
4221        let fs = FakeFs::new(cx_a.background());
4222
4223        // 2 clients connect to a server.
4224        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4225        let mut client_a = server.create_client(cx_a, "user_a").await;
4226        let mut client_b = server.create_client(cx_b, "user_b").await;
4227        cx_a.update(editor::init);
4228        cx_b.update(editor::init);
4229
4230        // Client A shares a project.
4231        fs.insert_tree(
4232            "/a",
4233            json!({
4234                ".zed.toml": r#"collaborators = ["user_b"]"#,
4235                "1.txt": "one",
4236                "2.txt": "two",
4237                "3.txt": "three",
4238            }),
4239        )
4240        .await;
4241        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4242        project_a
4243            .update(cx_a, |project, cx| project.share(cx))
4244            .await
4245            .unwrap();
4246
4247        // Client B joins the project.
4248        let project_b = client_b
4249            .build_remote_project(
4250                project_a
4251                    .read_with(cx_a, |project, _| project.remote_id())
4252                    .unwrap(),
4253                cx_b,
4254            )
4255            .await;
4256
4257        // Client A opens some editors.
4258        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4259        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4260        let editor_a1 = workspace_a
4261            .update(cx_a, |workspace, cx| {
4262                workspace.open_path((worktree_id, "1.txt"), cx)
4263            })
4264            .await
4265            .unwrap()
4266            .downcast::<Editor>()
4267            .unwrap();
4268        let editor_a2 = workspace_a
4269            .update(cx_a, |workspace, cx| {
4270                workspace.open_path((worktree_id, "2.txt"), cx)
4271            })
4272            .await
4273            .unwrap()
4274            .downcast::<Editor>()
4275            .unwrap();
4276
4277        // Client B opens an editor.
4278        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4279        let editor_b1 = workspace_b
4280            .update(cx_b, |workspace, cx| {
4281                workspace.open_path((worktree_id, "1.txt"), cx)
4282            })
4283            .await
4284            .unwrap()
4285            .downcast::<Editor>()
4286            .unwrap();
4287
4288        let client_a_id = project_b.read_with(cx_b, |project, _| {
4289            project.collaborators().values().next().unwrap().peer_id
4290        });
4291        let client_b_id = project_a.read_with(cx_a, |project, _| {
4292            project.collaborators().values().next().unwrap().peer_id
4293        });
4294
4295        // When client B starts following client A, all visible view states are replicated to client B.
4296        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4297        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4298        workspace_b
4299            .update(cx_b, |workspace, cx| {
4300                workspace.toggle_follow(&client_a_id.into(), cx).unwrap()
4301            })
4302            .await
4303            .unwrap();
4304        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4305            workspace
4306                .active_item(cx)
4307                .unwrap()
4308                .downcast::<Editor>()
4309                .unwrap()
4310        });
4311        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4312        assert_eq!(
4313            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4314            Some((worktree_id, "2.txt").into())
4315        );
4316        assert_eq!(
4317            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4318            vec![2..3]
4319        );
4320        assert_eq!(
4321            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4322            vec![0..1]
4323        );
4324
4325        // When client A activates a different editor, client B does so as well.
4326        workspace_a.update(cx_a, |workspace, cx| {
4327            workspace.activate_item(&editor_a1, cx)
4328        });
4329        workspace_b
4330            .condition(cx_b, |workspace, cx| {
4331                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4332            })
4333            .await;
4334
4335        // Changes to client A's editor are reflected on client B.
4336        editor_a1.update(cx_a, |editor, cx| {
4337            editor.select_ranges([1..1, 2..2], None, cx);
4338        });
4339        editor_b1
4340            .condition(cx_b, |editor, cx| {
4341                editor.selected_ranges(cx) == vec![1..1, 2..2]
4342            })
4343            .await;
4344
4345        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4346        editor_b1
4347            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4348            .await;
4349
4350        editor_a1.update(cx_a, |editor, cx| {
4351            editor.select_ranges([3..3], None, cx);
4352            editor.set_scroll_position(vec2f(0., 100.), cx);
4353        });
4354        editor_b1
4355            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4356            .await;
4357
4358        // After unfollowing, client B stops receiving updates from client A.
4359        workspace_b.update(cx_b, |workspace, cx| {
4360            workspace.unfollow(&workspace.active_pane().clone(), cx)
4361        });
4362        workspace_a.update(cx_a, |workspace, cx| {
4363            workspace.activate_item(&editor_a2, cx)
4364        });
4365        cx_a.foreground().run_until_parked();
4366        assert_eq!(
4367            workspace_b.read_with(cx_b, |workspace, cx| workspace
4368                .active_item(cx)
4369                .unwrap()
4370                .id()),
4371            editor_b1.id()
4372        );
4373
4374        // Client A starts following client B.
4375        workspace_a
4376            .update(cx_a, |workspace, cx| {
4377                workspace.toggle_follow(&client_b_id.into(), cx).unwrap()
4378            })
4379            .await
4380            .unwrap();
4381        assert_eq!(
4382            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4383            Some(client_b_id)
4384        );
4385        assert_eq!(
4386            workspace_a.read_with(cx_a, |workspace, cx| workspace
4387                .active_item(cx)
4388                .unwrap()
4389                .id()),
4390            editor_a1.id()
4391        );
4392
4393        // Following interrupts when client B disconnects.
4394        client_b.disconnect(&cx_b.to_async()).unwrap();
4395        cx_a.foreground().run_until_parked();
4396        assert_eq!(
4397            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4398            None
4399        );
4400    }
4401
4402    #[gpui::test(iterations = 10)]
4403    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4404        cx_a.foreground().forbid_parking();
4405        let fs = FakeFs::new(cx_a.background());
4406
4407        // 2 clients connect to a server.
4408        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4409        let mut client_a = server.create_client(cx_a, "user_a").await;
4410        let mut client_b = server.create_client(cx_b, "user_b").await;
4411        cx_a.update(editor::init);
4412        cx_b.update(editor::init);
4413
4414        // Client A shares a project.
4415        fs.insert_tree(
4416            "/a",
4417            json!({
4418                ".zed.toml": r#"collaborators = ["user_b"]"#,
4419                "1.txt": "one",
4420                "2.txt": "two",
4421                "3.txt": "three",
4422                "4.txt": "four",
4423            }),
4424        )
4425        .await;
4426        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4427        project_a
4428            .update(cx_a, |project, cx| project.share(cx))
4429            .await
4430            .unwrap();
4431
4432        // Client B joins the project.
4433        let project_b = client_b
4434            .build_remote_project(
4435                project_a
4436                    .read_with(cx_a, |project, _| project.remote_id())
4437                    .unwrap(),
4438                cx_b,
4439            )
4440            .await;
4441
4442        // Client A opens some editors.
4443        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4444        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4445        let _editor_a1 = workspace_a
4446            .update(cx_a, |workspace, cx| {
4447                workspace.open_path((worktree_id, "1.txt"), cx)
4448            })
4449            .await
4450            .unwrap()
4451            .downcast::<Editor>()
4452            .unwrap();
4453
4454        // Client B opens an editor.
4455        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4456        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4457        let _editor_b1 = workspace_b
4458            .update(cx_b, |workspace, cx| {
4459                workspace.open_path((worktree_id, "2.txt"), cx)
4460            })
4461            .await
4462            .unwrap()
4463            .downcast::<Editor>()
4464            .unwrap();
4465
4466        // Clients A and B follow each other in split panes
4467        workspace_a
4468            .update(cx_a, |workspace, cx| {
4469                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4470                assert_ne!(*workspace.active_pane(), pane_a1);
4471                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4472                workspace
4473                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4474                    .unwrap()
4475            })
4476            .await
4477            .unwrap();
4478        workspace_b
4479            .update(cx_b, |workspace, cx| {
4480                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4481                assert_ne!(*workspace.active_pane(), pane_b1);
4482                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4483                workspace
4484                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4485                    .unwrap()
4486            })
4487            .await
4488            .unwrap();
4489
4490        workspace_a
4491            .update(cx_a, |workspace, cx| {
4492                workspace.activate_next_pane(cx);
4493                assert_eq!(*workspace.active_pane(), pane_a1);
4494                workspace.open_path((worktree_id, "3.txt"), cx)
4495            })
4496            .await
4497            .unwrap();
4498        workspace_b
4499            .update(cx_b, |workspace, cx| {
4500                workspace.activate_next_pane(cx);
4501                assert_eq!(*workspace.active_pane(), pane_b1);
4502                workspace.open_path((worktree_id, "4.txt"), cx)
4503            })
4504            .await
4505            .unwrap();
4506        cx_a.foreground().run_until_parked();
4507
4508        // Ensure leader updates don't change the active pane of followers
4509        workspace_a.read_with(cx_a, |workspace, _| {
4510            assert_eq!(*workspace.active_pane(), pane_a1);
4511        });
4512        workspace_b.read_with(cx_b, |workspace, _| {
4513            assert_eq!(*workspace.active_pane(), pane_b1);
4514        });
4515
4516        // Ensure peers following each other doesn't cause an infinite loop.
4517        assert_eq!(
4518            workspace_a.read_with(cx_a, |workspace, cx| workspace
4519                .active_item(cx)
4520                .unwrap()
4521                .project_path(cx)),
4522            Some((worktree_id, "3.txt").into())
4523        );
4524        workspace_a.update(cx_a, |workspace, cx| {
4525            assert_eq!(
4526                workspace.active_item(cx).unwrap().project_path(cx),
4527                Some((worktree_id, "3.txt").into())
4528            );
4529            workspace.activate_next_pane(cx);
4530            assert_eq!(
4531                workspace.active_item(cx).unwrap().project_path(cx),
4532                Some((worktree_id, "4.txt").into())
4533            );
4534        });
4535        workspace_b.update(cx_b, |workspace, cx| {
4536            assert_eq!(
4537                workspace.active_item(cx).unwrap().project_path(cx),
4538                Some((worktree_id, "4.txt").into())
4539            );
4540            workspace.activate_next_pane(cx);
4541            assert_eq!(
4542                workspace.active_item(cx).unwrap().project_path(cx),
4543                Some((worktree_id, "3.txt").into())
4544            );
4545        });
4546    }
4547
4548    #[gpui::test(iterations = 10)]
4549    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4550        cx_a.foreground().forbid_parking();
4551        let fs = FakeFs::new(cx_a.background());
4552
4553        // 2 clients connect to a server.
4554        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4555        let mut client_a = server.create_client(cx_a, "user_a").await;
4556        let mut client_b = server.create_client(cx_b, "user_b").await;
4557        cx_a.update(editor::init);
4558        cx_b.update(editor::init);
4559
4560        // Client A shares a project.
4561        fs.insert_tree(
4562            "/a",
4563            json!({
4564                ".zed.toml": r#"collaborators = ["user_b"]"#,
4565                "1.txt": "one",
4566                "2.txt": "two",
4567                "3.txt": "three",
4568            }),
4569        )
4570        .await;
4571        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4572        project_a
4573            .update(cx_a, |project, cx| project.share(cx))
4574            .await
4575            .unwrap();
4576
4577        // Client B joins the project.
4578        let project_b = client_b
4579            .build_remote_project(
4580                project_a
4581                    .read_with(cx_a, |project, _| project.remote_id())
4582                    .unwrap(),
4583                cx_b,
4584            )
4585            .await;
4586
4587        // Client A opens some editors.
4588        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4589        let _editor_a1 = workspace_a
4590            .update(cx_a, |workspace, cx| {
4591                workspace.open_path((worktree_id, "1.txt"), cx)
4592            })
4593            .await
4594            .unwrap()
4595            .downcast::<Editor>()
4596            .unwrap();
4597
4598        // Client B starts following client A.
4599        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4600        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4601        let leader_id = project_b.read_with(cx_b, |project, _| {
4602            project.collaborators().values().next().unwrap().peer_id
4603        });
4604        workspace_b
4605            .update(cx_b, |workspace, cx| {
4606                workspace.toggle_follow(&leader_id.into(), cx).unwrap()
4607            })
4608            .await
4609            .unwrap();
4610        assert_eq!(
4611            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4612            Some(leader_id)
4613        );
4614        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4615            workspace
4616                .active_item(cx)
4617                .unwrap()
4618                .downcast::<Editor>()
4619                .unwrap()
4620        });
4621
4622        // When client B moves, it automatically stops following client A.
4623        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
4624        assert_eq!(
4625            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4626            None
4627        );
4628
4629        workspace_b
4630            .update(cx_b, |workspace, cx| {
4631                workspace.toggle_follow(&leader_id.into(), cx).unwrap()
4632            })
4633            .await
4634            .unwrap();
4635        assert_eq!(
4636            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4637            Some(leader_id)
4638        );
4639
4640        // When client B edits, it automatically stops following client A.
4641        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
4642        assert_eq!(
4643            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4644            None
4645        );
4646
4647        workspace_b
4648            .update(cx_b, |workspace, cx| {
4649                workspace.toggle_follow(&leader_id.into(), cx).unwrap()
4650            })
4651            .await
4652            .unwrap();
4653        assert_eq!(
4654            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4655            Some(leader_id)
4656        );
4657
4658        // When client B scrolls, it automatically stops following client A.
4659        editor_b2.update(cx_b, |editor, cx| {
4660            editor.set_scroll_position(vec2f(0., 3.), cx)
4661        });
4662        assert_eq!(
4663            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4664            None
4665        );
4666
4667        workspace_b
4668            .update(cx_b, |workspace, cx| {
4669                workspace.toggle_follow(&leader_id.into(), cx).unwrap()
4670            })
4671            .await
4672            .unwrap();
4673        assert_eq!(
4674            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4675            Some(leader_id)
4676        );
4677
4678        // When client B activates a different pane, it continues following client A in the original pane.
4679        workspace_b.update(cx_b, |workspace, cx| {
4680            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
4681        });
4682        assert_eq!(
4683            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4684            Some(leader_id)
4685        );
4686
4687        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
4688        assert_eq!(
4689            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4690            Some(leader_id)
4691        );
4692
4693        // When client B activates a different item in the original pane, it automatically stops following client A.
4694        workspace_b
4695            .update(cx_b, |workspace, cx| {
4696                workspace.open_path((worktree_id, "2.txt"), cx)
4697            })
4698            .await
4699            .unwrap();
4700        assert_eq!(
4701            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4702            None
4703        );
4704    }
4705
4706    #[gpui::test(iterations = 100)]
4707    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4708        cx.foreground().forbid_parking();
4709        let max_peers = env::var("MAX_PEERS")
4710            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4711            .unwrap_or(5);
4712        let max_operations = env::var("OPERATIONS")
4713            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4714            .unwrap_or(10);
4715
4716        let rng = Arc::new(Mutex::new(rng));
4717
4718        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4719        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4720
4721        let fs = FakeFs::new(cx.background());
4722        fs.insert_tree(
4723            "/_collab",
4724            json!({
4725                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4726            }),
4727        )
4728        .await;
4729
4730        let operations = Rc::new(Cell::new(0));
4731        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4732        let mut clients = Vec::new();
4733
4734        let mut next_entity_id = 100000;
4735        let mut host_cx = TestAppContext::new(
4736            cx.foreground_platform(),
4737            cx.platform(),
4738            cx.foreground(),
4739            cx.background(),
4740            cx.font_cache(),
4741            cx.leak_detector(),
4742            next_entity_id,
4743        );
4744        let host = server.create_client(&mut host_cx, "host").await;
4745        let host_project = host_cx.update(|cx| {
4746            Project::local(
4747                host.client.clone(),
4748                host.user_store.clone(),
4749                Arc::new(LanguageRegistry::test()),
4750                fs.clone(),
4751                cx,
4752            )
4753        });
4754        let host_project_id = host_project
4755            .update(&mut host_cx, |p, _| p.next_remote_id())
4756            .await;
4757
4758        let (collab_worktree, _) = host_project
4759            .update(&mut host_cx, |project, cx| {
4760                project.find_or_create_local_worktree("/_collab", true, cx)
4761            })
4762            .await
4763            .unwrap();
4764        collab_worktree
4765            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4766            .await;
4767        host_project
4768            .update(&mut host_cx, |project, cx| project.share(cx))
4769            .await
4770            .unwrap();
4771
4772        clients.push(cx.foreground().spawn(host.simulate_host(
4773            host_project,
4774            language_server_config,
4775            operations.clone(),
4776            max_operations,
4777            rng.clone(),
4778            host_cx,
4779        )));
4780
4781        while operations.get() < max_operations {
4782            cx.background().simulate_random_delay().await;
4783            if clients.len() >= max_peers {
4784                break;
4785            } else if rng.lock().gen_bool(0.05) {
4786                operations.set(operations.get() + 1);
4787
4788                let guest_id = clients.len();
4789                log::info!("Adding guest {}", guest_id);
4790                next_entity_id += 100000;
4791                let mut guest_cx = TestAppContext::new(
4792                    cx.foreground_platform(),
4793                    cx.platform(),
4794                    cx.foreground(),
4795                    cx.background(),
4796                    cx.font_cache(),
4797                    cx.leak_detector(),
4798                    next_entity_id,
4799                );
4800                let guest = server
4801                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4802                    .await;
4803                let guest_project = Project::remote(
4804                    host_project_id,
4805                    guest.client.clone(),
4806                    guest.user_store.clone(),
4807                    guest_lang_registry.clone(),
4808                    FakeFs::new(cx.background()),
4809                    &mut guest_cx.to_async(),
4810                )
4811                .await
4812                .unwrap();
4813                clients.push(cx.foreground().spawn(guest.simulate_guest(
4814                    guest_id,
4815                    guest_project,
4816                    operations.clone(),
4817                    max_operations,
4818                    rng.clone(),
4819                    guest_cx,
4820                )));
4821
4822                log::info!("Guest {} added", guest_id);
4823            }
4824        }
4825
4826        let mut clients = futures::future::join_all(clients).await;
4827        cx.foreground().run_until_parked();
4828
4829        let (host_client, mut host_cx) = clients.remove(0);
4830        let host_project = host_client.project.as_ref().unwrap();
4831        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4832            project
4833                .worktrees(cx)
4834                .map(|worktree| {
4835                    let snapshot = worktree.read(cx).snapshot();
4836                    (snapshot.id(), snapshot)
4837                })
4838                .collect::<BTreeMap<_, _>>()
4839        });
4840
4841        host_client
4842            .project
4843            .as_ref()
4844            .unwrap()
4845            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4846
4847        for (guest_client, mut guest_cx) in clients.into_iter() {
4848            let guest_id = guest_client.client.id();
4849            let worktree_snapshots =
4850                guest_client
4851                    .project
4852                    .as_ref()
4853                    .unwrap()
4854                    .read_with(&guest_cx, |project, cx| {
4855                        project
4856                            .worktrees(cx)
4857                            .map(|worktree| {
4858                                let worktree = worktree.read(cx);
4859                                (worktree.id(), worktree.snapshot())
4860                            })
4861                            .collect::<BTreeMap<_, _>>()
4862                    });
4863
4864            assert_eq!(
4865                worktree_snapshots.keys().collect::<Vec<_>>(),
4866                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4867                "guest {} has different worktrees than the host",
4868                guest_id
4869            );
4870            for (id, host_snapshot) in &host_worktree_snapshots {
4871                let guest_snapshot = &worktree_snapshots[id];
4872                assert_eq!(
4873                    guest_snapshot.root_name(),
4874                    host_snapshot.root_name(),
4875                    "guest {} has different root name than the host for worktree {}",
4876                    guest_id,
4877                    id
4878                );
4879                assert_eq!(
4880                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4881                    host_snapshot.entries(false).collect::<Vec<_>>(),
4882                    "guest {} has different snapshot than the host for worktree {}",
4883                    guest_id,
4884                    id
4885                );
4886            }
4887
4888            guest_client
4889                .project
4890                .as_ref()
4891                .unwrap()
4892                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4893
4894            for guest_buffer in &guest_client.buffers {
4895                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4896                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4897                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4898                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4899                        guest_id, guest_client.peer_id, buffer_id
4900                    ))
4901                });
4902                let path = host_buffer
4903                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4904
4905                assert_eq!(
4906                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4907                    0,
4908                    "guest {}, buffer {}, path {:?} has deferred operations",
4909                    guest_id,
4910                    buffer_id,
4911                    path,
4912                );
4913                assert_eq!(
4914                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4915                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4916                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4917                    guest_id,
4918                    buffer_id,
4919                    path
4920                );
4921            }
4922
4923            guest_cx.update(|_| drop(guest_client));
4924        }
4925
4926        host_cx.update(|_| drop(host_client));
4927    }
4928
4929    struct TestServer {
4930        peer: Arc<Peer>,
4931        app_state: Arc<AppState>,
4932        server: Arc<Server>,
4933        foreground: Rc<executor::Foreground>,
4934        notifications: mpsc::UnboundedReceiver<()>,
4935        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4936        forbid_connections: Arc<AtomicBool>,
4937        _test_db: TestDb,
4938    }
4939
4940    impl TestServer {
4941        async fn start(
4942            foreground: Rc<executor::Foreground>,
4943            background: Arc<executor::Background>,
4944        ) -> Self {
4945            let test_db = TestDb::fake(background);
4946            let app_state = Self::build_app_state(&test_db).await;
4947            let peer = Peer::new();
4948            let notifications = mpsc::unbounded();
4949            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4950            Self {
4951                peer,
4952                app_state,
4953                server,
4954                foreground,
4955                notifications: notifications.1,
4956                connection_killers: Default::default(),
4957                forbid_connections: Default::default(),
4958                _test_db: test_db,
4959            }
4960        }
4961
4962        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4963            cx.update(|cx| {
4964                let settings = Settings::test(cx);
4965                cx.set_global(settings);
4966            });
4967
4968            let http = FakeHttpClient::with_404_response();
4969            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4970            let client_name = name.to_string();
4971            let mut client = Client::new(http.clone());
4972            let server = self.server.clone();
4973            let connection_killers = self.connection_killers.clone();
4974            let forbid_connections = self.forbid_connections.clone();
4975            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4976
4977            Arc::get_mut(&mut client)
4978                .unwrap()
4979                .override_authenticate(move |cx| {
4980                    cx.spawn(|_| async move {
4981                        let access_token = "the-token".to_string();
4982                        Ok(Credentials {
4983                            user_id: user_id.0 as u64,
4984                            access_token,
4985                        })
4986                    })
4987                })
4988                .override_establish_connection(move |credentials, cx| {
4989                    assert_eq!(credentials.user_id, user_id.0 as u64);
4990                    assert_eq!(credentials.access_token, "the-token");
4991
4992                    let server = server.clone();
4993                    let connection_killers = connection_killers.clone();
4994                    let forbid_connections = forbid_connections.clone();
4995                    let client_name = client_name.clone();
4996                    let connection_id_tx = connection_id_tx.clone();
4997                    cx.spawn(move |cx| async move {
4998                        if forbid_connections.load(SeqCst) {
4999                            Err(EstablishConnectionError::other(anyhow!(
5000                                "server is forbidding connections"
5001                            )))
5002                        } else {
5003                            let (client_conn, server_conn, kill_conn) =
5004                                Connection::in_memory(cx.background());
5005                            connection_killers.lock().insert(user_id, kill_conn);
5006                            cx.background()
5007                                .spawn(server.handle_connection(
5008                                    server_conn,
5009                                    client_name,
5010                                    user_id,
5011                                    Some(connection_id_tx),
5012                                    cx.background(),
5013                                ))
5014                                .detach();
5015                            Ok(client_conn)
5016                        }
5017                    })
5018                });
5019
5020            client
5021                .authenticate_and_connect(false, &cx.to_async())
5022                .await
5023                .unwrap();
5024
5025            Channel::init(&client);
5026            Project::init(&client);
5027            cx.update(|cx| {
5028                workspace::init(&client, cx);
5029            });
5030
5031            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5032            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5033
5034            let client = TestClient {
5035                client,
5036                peer_id,
5037                user_store,
5038                language_registry: Arc::new(LanguageRegistry::test()),
5039                project: Default::default(),
5040                buffers: Default::default(),
5041            };
5042            client.wait_for_current_user(cx).await;
5043            client
5044        }
5045
5046        fn disconnect_client(&self, user_id: UserId) {
5047            self.connection_killers.lock().remove(&user_id);
5048        }
5049
5050        fn forbid_connections(&self) {
5051            self.forbid_connections.store(true, SeqCst);
5052        }
5053
5054        fn allow_connections(&self) {
5055            self.forbid_connections.store(false, SeqCst);
5056        }
5057
5058        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5059            let mut config = Config::default();
5060            config.session_secret = "a".repeat(32);
5061            config.database_url = test_db.url.clone();
5062            let github_client = github::AppClient::test();
5063            Arc::new(AppState {
5064                db: test_db.db().clone(),
5065                handlebars: Default::default(),
5066                auth_client: auth::build_client("", ""),
5067                repo_client: github::RepoClient::test(&github_client),
5068                github_client,
5069                config,
5070            })
5071        }
5072
5073        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5074            self.server.store.read()
5075        }
5076
5077        async fn condition<F>(&mut self, mut predicate: F)
5078        where
5079            F: FnMut(&Store) -> bool,
5080        {
5081            async_std::future::timeout(Duration::from_millis(500), async {
5082                while !(predicate)(&*self.server.store.read()) {
5083                    self.foreground.start_waiting();
5084                    self.notifications.next().await;
5085                    self.foreground.finish_waiting();
5086                }
5087            })
5088            .await
5089            .expect("condition timed out");
5090        }
5091    }
5092
5093    impl Drop for TestServer {
5094        fn drop(&mut self) {
5095            self.peer.reset();
5096        }
5097    }
5098
5099    struct TestClient {
5100        client: Arc<Client>,
5101        pub peer_id: PeerId,
5102        pub user_store: ModelHandle<UserStore>,
5103        language_registry: Arc<LanguageRegistry>,
5104        project: Option<ModelHandle<Project>>,
5105        buffers: HashSet<ModelHandle<language::Buffer>>,
5106    }
5107
5108    impl Deref for TestClient {
5109        type Target = Arc<Client>;
5110
5111        fn deref(&self) -> &Self::Target {
5112            &self.client
5113        }
5114    }
5115
5116    impl TestClient {
5117        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5118            UserId::from_proto(
5119                self.user_store
5120                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5121            )
5122        }
5123
5124        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5125            let mut authed_user = self
5126                .user_store
5127                .read_with(cx, |user_store, _| user_store.watch_current_user());
5128            while authed_user.next().await.unwrap().is_none() {}
5129        }
5130
5131        async fn build_local_project(
5132            &mut self,
5133            fs: Arc<FakeFs>,
5134            root_path: impl AsRef<Path>,
5135            cx: &mut TestAppContext,
5136        ) -> (ModelHandle<Project>, WorktreeId) {
5137            let project = cx.update(|cx| {
5138                Project::local(
5139                    self.client.clone(),
5140                    self.user_store.clone(),
5141                    self.language_registry.clone(),
5142                    fs,
5143                    cx,
5144                )
5145            });
5146            self.project = Some(project.clone());
5147            let (worktree, _) = project
5148                .update(cx, |p, cx| {
5149                    p.find_or_create_local_worktree(root_path, true, cx)
5150                })
5151                .await
5152                .unwrap();
5153            worktree
5154                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5155                .await;
5156            project
5157                .update(cx, |project, _| project.next_remote_id())
5158                .await;
5159            (project, worktree.read_with(cx, |tree, _| tree.id()))
5160        }
5161
5162        async fn build_remote_project(
5163            &mut self,
5164            project_id: u64,
5165            cx: &mut TestAppContext,
5166        ) -> ModelHandle<Project> {
5167            let project = Project::remote(
5168                project_id,
5169                self.client.clone(),
5170                self.user_store.clone(),
5171                self.language_registry.clone(),
5172                FakeFs::new(cx.background()),
5173                &mut cx.to_async(),
5174            )
5175            .await
5176            .unwrap();
5177            self.project = Some(project.clone());
5178            project
5179        }
5180
5181        fn build_workspace(
5182            &self,
5183            project: &ModelHandle<Project>,
5184            cx: &mut TestAppContext,
5185        ) -> ViewHandle<Workspace> {
5186            let (window_id, _) = cx.add_window(|_| EmptyView);
5187            cx.add_view(window_id, |cx| {
5188                let fs = project.read(cx).fs().clone();
5189                Workspace::new(
5190                    &WorkspaceParams {
5191                        fs,
5192                        project: project.clone(),
5193                        user_store: self.user_store.clone(),
5194                        languages: self.language_registry.clone(),
5195                        channel_list: cx.add_model(|cx| {
5196                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5197                        }),
5198                        client: self.client.clone(),
5199                    },
5200                    cx,
5201                )
5202            })
5203        }
5204
5205        fn simulate_host(
5206            mut self,
5207            project: ModelHandle<Project>,
5208            mut language_server_config: LanguageServerConfig,
5209            operations: Rc<Cell<usize>>,
5210            max_operations: usize,
5211            rng: Arc<Mutex<StdRng>>,
5212            mut cx: TestAppContext,
5213        ) -> impl Future<Output = (Self, TestAppContext)> {
5214            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
5215
5216            // Set up a fake language server.
5217            language_server_config.set_fake_initializer({
5218                let rng = rng.clone();
5219                let files = files.clone();
5220                let project = project.downgrade();
5221                move |fake_server| {
5222                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
5223                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
5224                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5225                                range: lsp::Range::new(
5226                                    lsp::Position::new(0, 0),
5227                                    lsp::Position::new(0, 0),
5228                                ),
5229                                new_text: "the-new-text".to_string(),
5230                            })),
5231                            ..Default::default()
5232                        }]))
5233                    });
5234
5235                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
5236                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
5237                            lsp::CodeAction {
5238                                title: "the-code-action".to_string(),
5239                                ..Default::default()
5240                            },
5241                        )])
5242                    });
5243
5244                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
5245                        |params, _| {
5246                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5247                                params.position,
5248                                params.position,
5249                            )))
5250                        },
5251                    );
5252
5253                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
5254                        let files = files.clone();
5255                        let rng = rng.clone();
5256                        move |_, _| {
5257                            let files = files.lock();
5258                            let mut rng = rng.lock();
5259                            let count = rng.gen_range::<usize, _>(1..3);
5260                            let files = (0..count)
5261                                .map(|_| files.choose(&mut *rng).unwrap())
5262                                .collect::<Vec<_>>();
5263                            log::info!("LSP: Returning definitions in files {:?}", &files);
5264                            Some(lsp::GotoDefinitionResponse::Array(
5265                                files
5266                                    .into_iter()
5267                                    .map(|file| lsp::Location {
5268                                        uri: lsp::Url::from_file_path(file).unwrap(),
5269                                        range: Default::default(),
5270                                    })
5271                                    .collect(),
5272                            ))
5273                        }
5274                    });
5275
5276                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
5277                        let rng = rng.clone();
5278                        let project = project.clone();
5279                        move |params, mut cx| {
5280                            if let Some(project) = project.upgrade(&cx) {
5281                                project.update(&mut cx, |project, cx| {
5282                                    let path = params
5283                                        .text_document_position_params
5284                                        .text_document
5285                                        .uri
5286                                        .to_file_path()
5287                                        .unwrap();
5288                                    let (worktree, relative_path) =
5289                                        project.find_local_worktree(&path, cx)?;
5290                                    let project_path =
5291                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5292                                    let buffer =
5293                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5294
5295                                    let mut highlights = Vec::new();
5296                                    let highlight_count = rng.lock().gen_range(1..=5);
5297                                    let mut prev_end = 0;
5298                                    for _ in 0..highlight_count {
5299                                        let range =
5300                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5301                                        let start = buffer
5302                                            .offset_to_point_utf16(range.start)
5303                                            .to_lsp_position();
5304                                        let end = buffer
5305                                            .offset_to_point_utf16(range.end)
5306                                            .to_lsp_position();
5307                                        highlights.push(lsp::DocumentHighlight {
5308                                            range: lsp::Range::new(start, end),
5309                                            kind: Some(lsp::DocumentHighlightKind::READ),
5310                                        });
5311                                        prev_end = range.end;
5312                                    }
5313                                    Some(highlights)
5314                                })
5315                            } else {
5316                                None
5317                            }
5318                        }
5319                    });
5320                }
5321            });
5322
5323            project.update(&mut cx, |project, _| {
5324                project.languages().add(Arc::new(Language::new(
5325                    LanguageConfig {
5326                        name: "Rust".into(),
5327                        path_suffixes: vec!["rs".to_string()],
5328                        language_server: Some(language_server_config),
5329                        ..Default::default()
5330                    },
5331                    None,
5332                )));
5333            });
5334
5335            async move {
5336                let fs = project.read_with(&cx, |project, _| project.fs().clone());
5337                while operations.get() < max_operations {
5338                    operations.set(operations.get() + 1);
5339
5340                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5341                    match distribution {
5342                        0..=20 if !files.lock().is_empty() => {
5343                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5344                            let mut path = path.as_path();
5345                            while let Some(parent_path) = path.parent() {
5346                                path = parent_path;
5347                                if rng.lock().gen() {
5348                                    break;
5349                                }
5350                            }
5351
5352                            log::info!("Host: find/create local worktree {:?}", path);
5353                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
5354                                project.find_or_create_local_worktree(path, true, cx)
5355                            });
5356                            let find_or_create_worktree = async move {
5357                                find_or_create_worktree.await.unwrap();
5358                            };
5359                            if rng.lock().gen() {
5360                                cx.background().spawn(find_or_create_worktree).detach();
5361                            } else {
5362                                find_or_create_worktree.await;
5363                            }
5364                        }
5365                        10..=80 if !files.lock().is_empty() => {
5366                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
5367                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5368                                let (worktree, path) = project
5369                                    .update(&mut cx, |project, cx| {
5370                                        project.find_or_create_local_worktree(
5371                                            file.clone(),
5372                                            true,
5373                                            cx,
5374                                        )
5375                                    })
5376                                    .await
5377                                    .unwrap();
5378                                let project_path =
5379                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
5380                                log::info!(
5381                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
5382                                    file,
5383                                    project_path.0,
5384                                    project_path.1
5385                                );
5386                                let buffer = project
5387                                    .update(&mut cx, |project, cx| {
5388                                        project.open_buffer(project_path, cx)
5389                                    })
5390                                    .await
5391                                    .unwrap();
5392                                self.buffers.insert(buffer.clone());
5393                                buffer
5394                            } else {
5395                                self.buffers
5396                                    .iter()
5397                                    .choose(&mut *rng.lock())
5398                                    .unwrap()
5399                                    .clone()
5400                            };
5401
5402                            if rng.lock().gen_bool(0.1) {
5403                                cx.update(|cx| {
5404                                    log::info!(
5405                                        "Host: dropping buffer {:?}",
5406                                        buffer.read(cx).file().unwrap().full_path(cx)
5407                                    );
5408                                    self.buffers.remove(&buffer);
5409                                    drop(buffer);
5410                                });
5411                            } else {
5412                                buffer.update(&mut cx, |buffer, cx| {
5413                                    log::info!(
5414                                        "Host: updating buffer {:?} ({})",
5415                                        buffer.file().unwrap().full_path(cx),
5416                                        buffer.remote_id()
5417                                    );
5418                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5419                                });
5420                            }
5421                        }
5422                        _ => loop {
5423                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
5424                            let mut path = PathBuf::new();
5425                            path.push("/");
5426                            for _ in 0..path_component_count {
5427                                let letter = rng.lock().gen_range(b'a'..=b'z');
5428                                path.push(std::str::from_utf8(&[letter]).unwrap());
5429                            }
5430                            path.set_extension("rs");
5431                            let parent_path = path.parent().unwrap();
5432
5433                            log::info!("Host: creating file {:?}", path,);
5434
5435                            if fs.create_dir(&parent_path).await.is_ok()
5436                                && fs.create_file(&path, Default::default()).await.is_ok()
5437                            {
5438                                files.lock().push(path);
5439                                break;
5440                            } else {
5441                                log::info!("Host: cannot create file");
5442                            }
5443                        },
5444                    }
5445
5446                    cx.background().simulate_random_delay().await;
5447                }
5448
5449                log::info!("Host done");
5450
5451                self.project = Some(project);
5452                (self, cx)
5453            }
5454        }
5455
5456        pub async fn simulate_guest(
5457            mut self,
5458            guest_id: usize,
5459            project: ModelHandle<Project>,
5460            operations: Rc<Cell<usize>>,
5461            max_operations: usize,
5462            rng: Arc<Mutex<StdRng>>,
5463            mut cx: TestAppContext,
5464        ) -> (Self, TestAppContext) {
5465            while operations.get() < max_operations {
5466                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
5467                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
5468                        project
5469                            .worktrees(&cx)
5470                            .filter(|worktree| {
5471                                let worktree = worktree.read(cx);
5472                                worktree.is_visible()
5473                                    && worktree.entries(false).any(|e| e.is_file())
5474                            })
5475                            .choose(&mut *rng.lock())
5476                    }) {
5477                        worktree
5478                    } else {
5479                        cx.background().simulate_random_delay().await;
5480                        continue;
5481                    };
5482
5483                    operations.set(operations.get() + 1);
5484                    let (worktree_root_name, project_path) =
5485                        worktree.read_with(&cx, |worktree, _| {
5486                            let entry = worktree
5487                                .entries(false)
5488                                .filter(|e| e.is_file())
5489                                .choose(&mut *rng.lock())
5490                                .unwrap();
5491                            (
5492                                worktree.root_name().to_string(),
5493                                (worktree.id(), entry.path.clone()),
5494                            )
5495                        });
5496                    log::info!(
5497                        "Guest {}: opening path {:?} in worktree {} ({})",
5498                        guest_id,
5499                        project_path.1,
5500                        project_path.0,
5501                        worktree_root_name,
5502                    );
5503                    let buffer = project
5504                        .update(&mut cx, |project, cx| {
5505                            project.open_buffer(project_path.clone(), cx)
5506                        })
5507                        .await
5508                        .unwrap();
5509                    log::info!(
5510                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
5511                        guest_id,
5512                        project_path.1,
5513                        project_path.0,
5514                        worktree_root_name,
5515                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
5516                    );
5517                    self.buffers.insert(buffer.clone());
5518                    buffer
5519                } else {
5520                    operations.set(operations.get() + 1);
5521
5522                    self.buffers
5523                        .iter()
5524                        .choose(&mut *rng.lock())
5525                        .unwrap()
5526                        .clone()
5527                };
5528
5529                let choice = rng.lock().gen_range(0..100);
5530                match choice {
5531                    0..=9 => {
5532                        cx.update(|cx| {
5533                            log::info!(
5534                                "Guest {}: dropping buffer {:?}",
5535                                guest_id,
5536                                buffer.read(cx).file().unwrap().full_path(cx)
5537                            );
5538                            self.buffers.remove(&buffer);
5539                            drop(buffer);
5540                        });
5541                    }
5542                    10..=19 => {
5543                        let completions = project.update(&mut cx, |project, cx| {
5544                            log::info!(
5545                                "Guest {}: requesting completions for buffer {} ({:?})",
5546                                guest_id,
5547                                buffer.read(cx).remote_id(),
5548                                buffer.read(cx).file().unwrap().full_path(cx)
5549                            );
5550                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5551                            project.completions(&buffer, offset, cx)
5552                        });
5553                        let completions = cx.background().spawn(async move {
5554                            completions.await.expect("completions request failed");
5555                        });
5556                        if rng.lock().gen_bool(0.3) {
5557                            log::info!("Guest {}: detaching completions request", guest_id);
5558                            completions.detach();
5559                        } else {
5560                            completions.await;
5561                        }
5562                    }
5563                    20..=29 => {
5564                        let code_actions = project.update(&mut cx, |project, cx| {
5565                            log::info!(
5566                                "Guest {}: requesting code actions for buffer {} ({:?})",
5567                                guest_id,
5568                                buffer.read(cx).remote_id(),
5569                                buffer.read(cx).file().unwrap().full_path(cx)
5570                            );
5571                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
5572                            project.code_actions(&buffer, range, cx)
5573                        });
5574                        let code_actions = cx.background().spawn(async move {
5575                            code_actions.await.expect("code actions request failed");
5576                        });
5577                        if rng.lock().gen_bool(0.3) {
5578                            log::info!("Guest {}: detaching code actions request", guest_id);
5579                            code_actions.detach();
5580                        } else {
5581                            code_actions.await;
5582                        }
5583                    }
5584                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5585                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5586                            log::info!(
5587                                "Guest {}: saving buffer {} ({:?})",
5588                                guest_id,
5589                                buffer.remote_id(),
5590                                buffer.file().unwrap().full_path(cx)
5591                            );
5592                            (buffer.version(), buffer.save(cx))
5593                        });
5594                        let save = cx.background().spawn(async move {
5595                            let (saved_version, _) = save.await.expect("save request failed");
5596                            assert!(saved_version.observed_all(&requested_version));
5597                        });
5598                        if rng.lock().gen_bool(0.3) {
5599                            log::info!("Guest {}: detaching save request", guest_id);
5600                            save.detach();
5601                        } else {
5602                            save.await;
5603                        }
5604                    }
5605                    40..=44 => {
5606                        let prepare_rename = project.update(&mut cx, |project, cx| {
5607                            log::info!(
5608                                "Guest {}: preparing rename for buffer {} ({:?})",
5609                                guest_id,
5610                                buffer.read(cx).remote_id(),
5611                                buffer.read(cx).file().unwrap().full_path(cx)
5612                            );
5613                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5614                            project.prepare_rename(buffer, offset, cx)
5615                        });
5616                        let prepare_rename = cx.background().spawn(async move {
5617                            prepare_rename.await.expect("prepare rename request failed");
5618                        });
5619                        if rng.lock().gen_bool(0.3) {
5620                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5621                            prepare_rename.detach();
5622                        } else {
5623                            prepare_rename.await;
5624                        }
5625                    }
5626                    45..=49 => {
5627                        let definitions = project.update(&mut cx, |project, cx| {
5628                            log::info!(
5629                                "Guest {}: requesting definitions for buffer {} ({:?})",
5630                                guest_id,
5631                                buffer.read(cx).remote_id(),
5632                                buffer.read(cx).file().unwrap().full_path(cx)
5633                            );
5634                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5635                            project.definition(&buffer, offset, cx)
5636                        });
5637                        let definitions = cx.background().spawn(async move {
5638                            definitions.await.expect("definitions request failed")
5639                        });
5640                        if rng.lock().gen_bool(0.3) {
5641                            log::info!("Guest {}: detaching definitions request", guest_id);
5642                            definitions.detach();
5643                        } else {
5644                            self.buffers
5645                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5646                        }
5647                    }
5648                    50..=54 => {
5649                        let highlights = project.update(&mut cx, |project, cx| {
5650                            log::info!(
5651                                "Guest {}: requesting highlights for buffer {} ({:?})",
5652                                guest_id,
5653                                buffer.read(cx).remote_id(),
5654                                buffer.read(cx).file().unwrap().full_path(cx)
5655                            );
5656                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5657                            project.document_highlights(&buffer, offset, cx)
5658                        });
5659                        let highlights = cx.background().spawn(async move {
5660                            highlights.await.expect("highlights request failed");
5661                        });
5662                        if rng.lock().gen_bool(0.3) {
5663                            log::info!("Guest {}: detaching highlights request", guest_id);
5664                            highlights.detach();
5665                        } else {
5666                            highlights.await;
5667                        }
5668                    }
5669                    55..=59 => {
5670                        let search = project.update(&mut cx, |project, cx| {
5671                            let query = rng.lock().gen_range('a'..='z');
5672                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5673                            project.search(SearchQuery::text(query, false, false), cx)
5674                        });
5675                        let search = cx
5676                            .background()
5677                            .spawn(async move { search.await.expect("search request failed") });
5678                        if rng.lock().gen_bool(0.3) {
5679                            log::info!("Guest {}: detaching search request", guest_id);
5680                            search.detach();
5681                        } else {
5682                            self.buffers.extend(search.await.into_keys());
5683                        }
5684                    }
5685                    _ => {
5686                        buffer.update(&mut cx, |buffer, cx| {
5687                            log::info!(
5688                                "Guest {}: updating buffer {} ({:?})",
5689                                guest_id,
5690                                buffer.remote_id(),
5691                                buffer.file().unwrap().full_path(cx)
5692                            );
5693                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5694                        });
5695                    }
5696                }
5697                cx.background().simulate_random_delay().await;
5698            }
5699
5700            log::info!("Guest {} done", guest_id);
5701
5702            self.project = Some(project);
5703            (self, cx)
5704        }
5705    }
5706
5707    impl Drop for TestClient {
5708        fn drop(&mut self) {
5709            self.client.tear_down();
5710        }
5711    }
5712
5713    impl Executor for Arc<gpui::executor::Background> {
5714        type Timer = gpui::executor::Timer;
5715
5716        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5717            self.spawn(future).detach();
5718        }
5719
5720        fn timer(&self, duration: Duration) -> Self::Timer {
5721            self.as_ref().timer(duration)
5722        }
5723    }
5724
5725    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5726        channel
5727            .messages()
5728            .cursor::<()>()
5729            .map(|m| {
5730                (
5731                    m.sender.github_login.clone(),
5732                    m.body.clone(),
5733                    m.is_pending(),
5734                )
5735            })
5736            .collect()
5737    }
5738
5739    struct EmptyView;
5740
5741    impl gpui::Entity for EmptyView {
5742        type Event = ();
5743    }
5744
5745    impl gpui::View for EmptyView {
5746        fn ui_name() -> &'static str {
5747            "empty view"
5748        }
5749
5750        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5751            gpui::Element::boxed(gpui::elements::Empty)
5752        }
5753    }
5754}