rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::start_language_server)
  87            .add_message_handler(Server::update_language_server)
  88            .add_message_handler(Server::update_diagnostic_summary)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
  96            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
  97            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  98            .add_request_handler(
  99                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 100            )
 101            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 102            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 103            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 105            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 106            .add_request_handler(Server::update_buffer)
 107            .add_message_handler(Server::update_buffer_file)
 108            .add_message_handler(Server::buffer_reloaded)
 109            .add_message_handler(Server::buffer_saved)
 110            .add_request_handler(Server::save_buffer)
 111            .add_request_handler(Server::get_channels)
 112            .add_request_handler(Server::get_users)
 113            .add_request_handler(Server::join_channel)
 114            .add_message_handler(Server::leave_channel)
 115            .add_request_handler(Server::send_channel_message)
 116            .add_request_handler(Server::follow)
 117            .add_message_handler(Server::unfollow)
 118            .add_message_handler(Server::update_followers)
 119            .add_request_handler(Server::get_channel_messages);
 120
 121        Arc::new(server)
 122    }
 123
 124    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 128        M: EnvelopedMessage,
 129    {
 130        let prev_handler = self.handlers.insert(
 131            TypeId::of::<M>(),
 132            Box::new(move |server, envelope| {
 133                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 134                (handler)(server, *envelope).boxed()
 135            }),
 136        );
 137        if prev_handler.is_some() {
 138            panic!("registered a handler for the same message twice");
 139        }
 140        self
 141    }
 142
 143    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 144    where
 145        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 146        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 147        M: RequestMessage,
 148    {
 149        self.add_message_handler(move |server, envelope| {
 150            let receipt = envelope.receipt();
 151            let response = (handler)(server.clone(), envelope);
 152            async move {
 153                match response.await {
 154                    Ok(response) => {
 155                        server.peer.respond(receipt, response)?;
 156                        Ok(())
 157                    }
 158                    Err(error) => {
 159                        server.peer.respond_with_error(
 160                            receipt,
 161                            proto::Error {
 162                                message: error.to_string(),
 163                            },
 164                        )?;
 165                        Err(error)
 166                    }
 167                }
 168            }
 169        })
 170    }
 171
 172    pub fn handle_connection<E: Executor>(
 173        self: &Arc<Self>,
 174        connection: Connection,
 175        addr: String,
 176        user_id: UserId,
 177        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 178        executor: E,
 179    ) -> impl Future<Output = ()> {
 180        let mut this = self.clone();
 181        async move {
 182            let (connection_id, handle_io, mut incoming_rx) = this
 183                .peer
 184                .add_connection(connection, {
 185                    let executor = executor.clone();
 186                    move |duration| {
 187                        let timer = executor.timer(duration);
 188                        async move {
 189                            timer.await;
 190                        }
 191                    }
 192                })
 193                .await;
 194
 195            if let Some(send_connection_id) = send_connection_id.as_mut() {
 196                let _ = send_connection_id.send(connection_id).await;
 197            }
 198
 199            this.state_mut().add_connection(connection_id, user_id);
 200            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 201                log::error!("error updating contacts for {:?}: {}", user_id, err);
 202            }
 203
 204            let handle_io = handle_io.fuse();
 205            futures::pin_mut!(handle_io);
 206            loop {
 207                let next_message = incoming_rx.next().fuse();
 208                futures::pin_mut!(next_message);
 209                futures::select_biased! {
 210                    result = handle_io => {
 211                        if let Err(err) = result {
 212                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 213                        }
 214                        break;
 215                    }
 216                    message = next_message => {
 217                        if let Some(message) = message {
 218                            let start_time = Instant::now();
 219                            let type_name = message.payload_type_name();
 220                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 221                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 222                                let notifications = this.notifications.clone();
 223                                let is_background = message.is_background();
 224                                let handle_message = (handler)(this.clone(), message);
 225                                let handle_message = async move {
 226                                    if let Err(err) = handle_message.await {
 227                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 228                                    } else {
 229                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 230                                    }
 231                                    if let Some(mut notifications) = notifications {
 232                                        let _ = notifications.send(()).await;
 233                                    }
 234                                };
 235                                if is_background {
 236                                    executor.spawn_detached(handle_message);
 237                                } else {
 238                                    handle_message.await;
 239                                }
 240                            } else {
 241                                log::warn!("unhandled message: {}", type_name);
 242                            }
 243                        } else {
 244                            log::info!("rpc connection closed {:?}", addr);
 245                            break;
 246                        }
 247                    }
 248                }
 249            }
 250
 251            if let Err(err) = this.sign_out(connection_id).await {
 252                log::error!("error signing out connection {:?} - {:?}", addr, err);
 253            }
 254        }
 255    }
 256
 257    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 258        self.peer.disconnect(connection_id);
 259        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 260
 261        for (project_id, project) in removed_connection.hosted_projects {
 262            if let Some(share) = project.share {
 263                broadcast(
 264                    connection_id,
 265                    share.guests.keys().copied().collect(),
 266                    |conn_id| {
 267                        self.peer
 268                            .send(conn_id, proto::UnshareProject { project_id })
 269                    },
 270                )?;
 271            }
 272        }
 273
 274        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 275            broadcast(connection_id, peer_ids, |conn_id| {
 276                self.peer.send(
 277                    conn_id,
 278                    proto::RemoveProjectCollaborator {
 279                        project_id,
 280                        peer_id: connection_id.0,
 281                    },
 282                )
 283            })?;
 284        }
 285
 286        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 287        Ok(())
 288    }
 289
 290    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 291        Ok(proto::Ack {})
 292    }
 293
 294    async fn register_project(
 295        mut self: Arc<Server>,
 296        request: TypedEnvelope<proto::RegisterProject>,
 297    ) -> tide::Result<proto::RegisterProjectResponse> {
 298        let project_id = {
 299            let mut state = self.state_mut();
 300            let user_id = state.user_id_for_connection(request.sender_id)?;
 301            state.register_project(request.sender_id, user_id)
 302        };
 303        Ok(proto::RegisterProjectResponse { project_id })
 304    }
 305
 306    async fn unregister_project(
 307        mut self: Arc<Server>,
 308        request: TypedEnvelope<proto::UnregisterProject>,
 309    ) -> tide::Result<()> {
 310        let project = self
 311            .state_mut()
 312            .unregister_project(request.payload.project_id, request.sender_id)?;
 313        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 314        Ok(())
 315    }
 316
 317    async fn share_project(
 318        mut self: Arc<Server>,
 319        request: TypedEnvelope<proto::ShareProject>,
 320    ) -> tide::Result<proto::Ack> {
 321        self.state_mut()
 322            .share_project(request.payload.project_id, request.sender_id);
 323        Ok(proto::Ack {})
 324    }
 325
 326    async fn unshare_project(
 327        mut self: Arc<Server>,
 328        request: TypedEnvelope<proto::UnshareProject>,
 329    ) -> tide::Result<()> {
 330        let project_id = request.payload.project_id;
 331        let project = self
 332            .state_mut()
 333            .unshare_project(project_id, request.sender_id)?;
 334
 335        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 336            self.peer
 337                .send(conn_id, proto::UnshareProject { project_id })
 338        })?;
 339        self.update_contacts_for_users(&project.authorized_user_ids)?;
 340        Ok(())
 341    }
 342
 343    async fn join_project(
 344        mut self: Arc<Server>,
 345        request: TypedEnvelope<proto::JoinProject>,
 346    ) -> tide::Result<proto::JoinProjectResponse> {
 347        let project_id = request.payload.project_id;
 348
 349        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 350        let (response, connection_ids, contact_user_ids) = self
 351            .state_mut()
 352            .join_project(request.sender_id, user_id, project_id)
 353            .and_then(|joined| {
 354                let share = joined.project.share()?;
 355                let peer_count = share.guests.len();
 356                let mut collaborators = Vec::with_capacity(peer_count);
 357                collaborators.push(proto::Collaborator {
 358                    peer_id: joined.project.host_connection_id.0,
 359                    replica_id: 0,
 360                    user_id: joined.project.host_user_id.to_proto(),
 361                });
 362                let worktrees = share
 363                    .worktrees
 364                    .iter()
 365                    .filter_map(|(id, shared_worktree)| {
 366                        let worktree = joined.project.worktrees.get(&id)?;
 367                        Some(proto::Worktree {
 368                            id: *id,
 369                            root_name: worktree.root_name.clone(),
 370                            entries: shared_worktree.entries.values().cloned().collect(),
 371                            diagnostic_summaries: shared_worktree
 372                                .diagnostic_summaries
 373                                .values()
 374                                .cloned()
 375                                .collect(),
 376                            visible: worktree.visible,
 377                        })
 378                    })
 379                    .collect();
 380                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 381                    if *peer_conn_id != request.sender_id {
 382                        collaborators.push(proto::Collaborator {
 383                            peer_id: peer_conn_id.0,
 384                            replica_id: *peer_replica_id as u32,
 385                            user_id: peer_user_id.to_proto(),
 386                        });
 387                    }
 388                }
 389                let response = proto::JoinProjectResponse {
 390                    worktrees,
 391                    replica_id: joined.replica_id as u32,
 392                    collaborators,
 393                    language_servers: joined.project.language_servers.clone(),
 394                };
 395                let connection_ids = joined.project.connection_ids();
 396                let contact_user_ids = joined.project.authorized_user_ids();
 397                Ok((response, connection_ids, contact_user_ids))
 398            })?;
 399
 400        broadcast(request.sender_id, connection_ids, |conn_id| {
 401            self.peer.send(
 402                conn_id,
 403                proto::AddProjectCollaborator {
 404                    project_id,
 405                    collaborator: Some(proto::Collaborator {
 406                        peer_id: request.sender_id.0,
 407                        replica_id: response.replica_id,
 408                        user_id: user_id.to_proto(),
 409                    }),
 410                },
 411            )
 412        })?;
 413        self.update_contacts_for_users(&contact_user_ids)?;
 414        Ok(response)
 415    }
 416
 417    async fn leave_project(
 418        mut self: Arc<Server>,
 419        request: TypedEnvelope<proto::LeaveProject>,
 420    ) -> tide::Result<()> {
 421        let sender_id = request.sender_id;
 422        let project_id = request.payload.project_id;
 423        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 424
 425        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 426            self.peer.send(
 427                conn_id,
 428                proto::RemoveProjectCollaborator {
 429                    project_id,
 430                    peer_id: sender_id.0,
 431                },
 432            )
 433        })?;
 434        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 435
 436        Ok(())
 437    }
 438
 439    async fn register_worktree(
 440        mut self: Arc<Server>,
 441        request: TypedEnvelope<proto::RegisterWorktree>,
 442    ) -> tide::Result<proto::Ack> {
 443        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 444
 445        let mut contact_user_ids = HashSet::default();
 446        contact_user_ids.insert(host_user_id);
 447        for github_login in &request.payload.authorized_logins {
 448            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 449            contact_user_ids.insert(contact_user_id);
 450        }
 451
 452        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 453        let guest_connection_ids;
 454        {
 455            let mut state = self.state_mut();
 456            guest_connection_ids = state
 457                .read_project(request.payload.project_id, request.sender_id)?
 458                .guest_connection_ids();
 459            state.register_worktree(
 460                request.payload.project_id,
 461                request.payload.worktree_id,
 462                request.sender_id,
 463                Worktree {
 464                    authorized_user_ids: contact_user_ids.clone(),
 465                    root_name: request.payload.root_name.clone(),
 466                    visible: request.payload.visible,
 467                },
 468            )?;
 469        }
 470        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 471            self.peer
 472                .forward_send(request.sender_id, connection_id, request.payload.clone())
 473        })?;
 474        self.update_contacts_for_users(&contact_user_ids)?;
 475        Ok(proto::Ack {})
 476    }
 477
 478    async fn unregister_worktree(
 479        mut self: Arc<Server>,
 480        request: TypedEnvelope<proto::UnregisterWorktree>,
 481    ) -> tide::Result<()> {
 482        let project_id = request.payload.project_id;
 483        let worktree_id = request.payload.worktree_id;
 484        let (worktree, guest_connection_ids) =
 485            self.state_mut()
 486                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 487        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 488            self.peer.send(
 489                conn_id,
 490                proto::UnregisterWorktree {
 491                    project_id,
 492                    worktree_id,
 493                },
 494            )
 495        })?;
 496        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 497        Ok(())
 498    }
 499
 500    async fn update_worktree(
 501        mut self: Arc<Server>,
 502        request: TypedEnvelope<proto::UpdateWorktree>,
 503    ) -> tide::Result<proto::Ack> {
 504        let connection_ids = self.state_mut().update_worktree(
 505            request.sender_id,
 506            request.payload.project_id,
 507            request.payload.worktree_id,
 508            &request.payload.removed_entries,
 509            &request.payload.updated_entries,
 510        )?;
 511
 512        broadcast(request.sender_id, connection_ids, |connection_id| {
 513            self.peer
 514                .forward_send(request.sender_id, connection_id, request.payload.clone())
 515        })?;
 516
 517        Ok(proto::Ack {})
 518    }
 519
 520    async fn update_diagnostic_summary(
 521        mut self: Arc<Server>,
 522        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 523    ) -> tide::Result<()> {
 524        let summary = request
 525            .payload
 526            .summary
 527            .clone()
 528            .ok_or_else(|| anyhow!("invalid summary"))?;
 529        let receiver_ids = self.state_mut().update_diagnostic_summary(
 530            request.payload.project_id,
 531            request.payload.worktree_id,
 532            request.sender_id,
 533            summary,
 534        )?;
 535
 536        broadcast(request.sender_id, receiver_ids, |connection_id| {
 537            self.peer
 538                .forward_send(request.sender_id, connection_id, request.payload.clone())
 539        })?;
 540        Ok(())
 541    }
 542
 543    async fn start_language_server(
 544        mut self: Arc<Server>,
 545        request: TypedEnvelope<proto::StartLanguageServer>,
 546    ) -> tide::Result<()> {
 547        let receiver_ids = self.state_mut().start_language_server(
 548            request.payload.project_id,
 549            request.sender_id,
 550            request
 551                .payload
 552                .server
 553                .clone()
 554                .ok_or_else(|| anyhow!("invalid language server"))?,
 555        )?;
 556        broadcast(request.sender_id, receiver_ids, |connection_id| {
 557            self.peer
 558                .forward_send(request.sender_id, connection_id, request.payload.clone())
 559        })?;
 560        Ok(())
 561    }
 562
 563    async fn update_language_server(
 564        self: Arc<Server>,
 565        request: TypedEnvelope<proto::UpdateLanguageServer>,
 566    ) -> tide::Result<()> {
 567        let receiver_ids = self
 568            .state()
 569            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 570        broadcast(request.sender_id, receiver_ids, |connection_id| {
 571            self.peer
 572                .forward_send(request.sender_id, connection_id, request.payload.clone())
 573        })?;
 574        Ok(())
 575    }
 576
 577    async fn forward_project_request<T>(
 578        self: Arc<Server>,
 579        request: TypedEnvelope<T>,
 580    ) -> tide::Result<T::Response>
 581    where
 582        T: EntityMessage + RequestMessage,
 583    {
 584        let host_connection_id = self
 585            .state()
 586            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 587            .host_connection_id;
 588        Ok(self
 589            .peer
 590            .forward_request(request.sender_id, host_connection_id, request.payload)
 591            .await?)
 592    }
 593
 594    async fn save_buffer(
 595        self: Arc<Server>,
 596        request: TypedEnvelope<proto::SaveBuffer>,
 597    ) -> tide::Result<proto::BufferSaved> {
 598        let host;
 599        let mut guests;
 600        {
 601            let state = self.state();
 602            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 603            host = project.host_connection_id;
 604            guests = project.guest_connection_ids()
 605        }
 606
 607        let response = self
 608            .peer
 609            .forward_request(request.sender_id, host, request.payload.clone())
 610            .await?;
 611
 612        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 613        broadcast(host, guests, |conn_id| {
 614            self.peer.forward_send(host, conn_id, response.clone())
 615        })?;
 616
 617        Ok(response)
 618    }
 619
 620    async fn update_buffer(
 621        self: Arc<Server>,
 622        request: TypedEnvelope<proto::UpdateBuffer>,
 623    ) -> tide::Result<proto::Ack> {
 624        let receiver_ids = self
 625            .state()
 626            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 627        broadcast(request.sender_id, receiver_ids, |connection_id| {
 628            self.peer
 629                .forward_send(request.sender_id, connection_id, request.payload.clone())
 630        })?;
 631        Ok(proto::Ack {})
 632    }
 633
 634    async fn update_buffer_file(
 635        self: Arc<Server>,
 636        request: TypedEnvelope<proto::UpdateBufferFile>,
 637    ) -> tide::Result<()> {
 638        let receiver_ids = self
 639            .state()
 640            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 641        broadcast(request.sender_id, receiver_ids, |connection_id| {
 642            self.peer
 643                .forward_send(request.sender_id, connection_id, request.payload.clone())
 644        })?;
 645        Ok(())
 646    }
 647
 648    async fn buffer_reloaded(
 649        self: Arc<Server>,
 650        request: TypedEnvelope<proto::BufferReloaded>,
 651    ) -> tide::Result<()> {
 652        let receiver_ids = self
 653            .state()
 654            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 655        broadcast(request.sender_id, receiver_ids, |connection_id| {
 656            self.peer
 657                .forward_send(request.sender_id, connection_id, request.payload.clone())
 658        })?;
 659        Ok(())
 660    }
 661
 662    async fn buffer_saved(
 663        self: Arc<Server>,
 664        request: TypedEnvelope<proto::BufferSaved>,
 665    ) -> tide::Result<()> {
 666        let receiver_ids = self
 667            .state()
 668            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 669        broadcast(request.sender_id, receiver_ids, |connection_id| {
 670            self.peer
 671                .forward_send(request.sender_id, connection_id, request.payload.clone())
 672        })?;
 673        Ok(())
 674    }
 675
 676    async fn follow(
 677        self: Arc<Self>,
 678        request: TypedEnvelope<proto::Follow>,
 679    ) -> tide::Result<proto::FollowResponse> {
 680        let leader_id = ConnectionId(request.payload.leader_id);
 681        let follower_id = request.sender_id;
 682        if !self
 683            .state()
 684            .project_connection_ids(request.payload.project_id, follower_id)?
 685            .contains(&leader_id)
 686        {
 687            Err(anyhow!("no such peer"))?;
 688        }
 689        let mut response = self
 690            .peer
 691            .forward_request(request.sender_id, leader_id, request.payload)
 692            .await?;
 693        response
 694            .views
 695            .retain(|view| view.leader_id != Some(follower_id.0));
 696        Ok(response)
 697    }
 698
 699    async fn unfollow(
 700        self: Arc<Self>,
 701        request: TypedEnvelope<proto::Unfollow>,
 702    ) -> tide::Result<()> {
 703        let leader_id = ConnectionId(request.payload.leader_id);
 704        if !self
 705            .state()
 706            .project_connection_ids(request.payload.project_id, request.sender_id)?
 707            .contains(&leader_id)
 708        {
 709            Err(anyhow!("no such peer"))?;
 710        }
 711        self.peer
 712            .forward_send(request.sender_id, leader_id, request.payload)?;
 713        Ok(())
 714    }
 715
 716    async fn update_followers(
 717        self: Arc<Self>,
 718        request: TypedEnvelope<proto::UpdateFollowers>,
 719    ) -> tide::Result<()> {
 720        let connection_ids = self
 721            .state()
 722            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 723        let leader_id = request
 724            .payload
 725            .variant
 726            .as_ref()
 727            .and_then(|variant| match variant {
 728                proto::update_followers::Variant::CreateView(payload) => payload.leader_id,
 729                proto::update_followers::Variant::UpdateView(payload) => payload.leader_id,
 730                proto::update_followers::Variant::UpdateActiveView(payload) => payload.leader_id,
 731            });
 732        for follower_id in &request.payload.follower_ids {
 733            let follower_id = ConnectionId(*follower_id);
 734            if connection_ids.contains(&follower_id) && Some(follower_id.0) != leader_id {
 735                self.peer
 736                    .forward_send(request.sender_id, follower_id, request.payload.clone())?;
 737            }
 738        }
 739        Ok(())
 740    }
 741
 742    async fn get_channels(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::GetChannels>,
 745    ) -> tide::Result<proto::GetChannelsResponse> {
 746        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 747        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 748        Ok(proto::GetChannelsResponse {
 749            channels: channels
 750                .into_iter()
 751                .map(|chan| proto::Channel {
 752                    id: chan.id.to_proto(),
 753                    name: chan.name,
 754                })
 755                .collect(),
 756        })
 757    }
 758
 759    async fn get_users(
 760        self: Arc<Server>,
 761        request: TypedEnvelope<proto::GetUsers>,
 762    ) -> tide::Result<proto::GetUsersResponse> {
 763        let user_ids = request
 764            .payload
 765            .user_ids
 766            .into_iter()
 767            .map(UserId::from_proto)
 768            .collect();
 769        let users = self
 770            .app_state
 771            .db
 772            .get_users_by_ids(user_ids)
 773            .await?
 774            .into_iter()
 775            .map(|user| proto::User {
 776                id: user.id.to_proto(),
 777                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 778                github_login: user.github_login,
 779            })
 780            .collect();
 781        Ok(proto::GetUsersResponse { users })
 782    }
 783
 784    fn update_contacts_for_users<'a>(
 785        self: &Arc<Server>,
 786        user_ids: impl IntoIterator<Item = &'a UserId>,
 787    ) -> anyhow::Result<()> {
 788        let mut result = Ok(());
 789        let state = self.state();
 790        for user_id in user_ids {
 791            let contacts = state.contacts_for_user(*user_id);
 792            for connection_id in state.connection_ids_for_user(*user_id) {
 793                if let Err(error) = self.peer.send(
 794                    connection_id,
 795                    proto::UpdateContacts {
 796                        contacts: contacts.clone(),
 797                    },
 798                ) {
 799                    result = Err(error);
 800                }
 801            }
 802        }
 803        result
 804    }
 805
 806    async fn join_channel(
 807        mut self: Arc<Self>,
 808        request: TypedEnvelope<proto::JoinChannel>,
 809    ) -> tide::Result<proto::JoinChannelResponse> {
 810        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 811        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 812        if !self
 813            .app_state
 814            .db
 815            .can_user_access_channel(user_id, channel_id)
 816            .await?
 817        {
 818            Err(anyhow!("access denied"))?;
 819        }
 820
 821        self.state_mut().join_channel(request.sender_id, channel_id);
 822        let messages = self
 823            .app_state
 824            .db
 825            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 826            .await?
 827            .into_iter()
 828            .map(|msg| proto::ChannelMessage {
 829                id: msg.id.to_proto(),
 830                body: msg.body,
 831                timestamp: msg.sent_at.unix_timestamp() as u64,
 832                sender_id: msg.sender_id.to_proto(),
 833                nonce: Some(msg.nonce.as_u128().into()),
 834            })
 835            .collect::<Vec<_>>();
 836        Ok(proto::JoinChannelResponse {
 837            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 838            messages,
 839        })
 840    }
 841
 842    async fn leave_channel(
 843        mut self: Arc<Self>,
 844        request: TypedEnvelope<proto::LeaveChannel>,
 845    ) -> tide::Result<()> {
 846        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 847        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 848        if !self
 849            .app_state
 850            .db
 851            .can_user_access_channel(user_id, channel_id)
 852            .await?
 853        {
 854            Err(anyhow!("access denied"))?;
 855        }
 856
 857        self.state_mut()
 858            .leave_channel(request.sender_id, channel_id);
 859
 860        Ok(())
 861    }
 862
 863    async fn send_channel_message(
 864        self: Arc<Self>,
 865        request: TypedEnvelope<proto::SendChannelMessage>,
 866    ) -> tide::Result<proto::SendChannelMessageResponse> {
 867        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 868        let user_id;
 869        let connection_ids;
 870        {
 871            let state = self.state();
 872            user_id = state.user_id_for_connection(request.sender_id)?;
 873            connection_ids = state.channel_connection_ids(channel_id)?;
 874        }
 875
 876        // Validate the message body.
 877        let body = request.payload.body.trim().to_string();
 878        if body.len() > MAX_MESSAGE_LEN {
 879            return Err(anyhow!("message is too long"))?;
 880        }
 881        if body.is_empty() {
 882            return Err(anyhow!("message can't be blank"))?;
 883        }
 884
 885        let timestamp = OffsetDateTime::now_utc();
 886        let nonce = request
 887            .payload
 888            .nonce
 889            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 890
 891        let message_id = self
 892            .app_state
 893            .db
 894            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 895            .await?
 896            .to_proto();
 897        let message = proto::ChannelMessage {
 898            sender_id: user_id.to_proto(),
 899            id: message_id,
 900            body,
 901            timestamp: timestamp.unix_timestamp() as u64,
 902            nonce: Some(nonce),
 903        };
 904        broadcast(request.sender_id, connection_ids, |conn_id| {
 905            self.peer.send(
 906                conn_id,
 907                proto::ChannelMessageSent {
 908                    channel_id: channel_id.to_proto(),
 909                    message: Some(message.clone()),
 910                },
 911            )
 912        })?;
 913        Ok(proto::SendChannelMessageResponse {
 914            message: Some(message),
 915        })
 916    }
 917
 918    async fn get_channel_messages(
 919        self: Arc<Self>,
 920        request: TypedEnvelope<proto::GetChannelMessages>,
 921    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 922        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 923        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 924        if !self
 925            .app_state
 926            .db
 927            .can_user_access_channel(user_id, channel_id)
 928            .await?
 929        {
 930            Err(anyhow!("access denied"))?;
 931        }
 932
 933        let messages = self
 934            .app_state
 935            .db
 936            .get_channel_messages(
 937                channel_id,
 938                MESSAGE_COUNT_PER_PAGE,
 939                Some(MessageId::from_proto(request.payload.before_message_id)),
 940            )
 941            .await?
 942            .into_iter()
 943            .map(|msg| proto::ChannelMessage {
 944                id: msg.id.to_proto(),
 945                body: msg.body,
 946                timestamp: msg.sent_at.unix_timestamp() as u64,
 947                sender_id: msg.sender_id.to_proto(),
 948                nonce: Some(msg.nonce.as_u128().into()),
 949            })
 950            .collect::<Vec<_>>();
 951
 952        Ok(proto::GetChannelMessagesResponse {
 953            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 954            messages,
 955        })
 956    }
 957
 958    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 959        self.store.read()
 960    }
 961
 962    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 963        self.store.write()
 964    }
 965}
 966
 967impl Executor for RealExecutor {
 968    type Timer = Timer;
 969
 970    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 971        task::spawn(future);
 972    }
 973
 974    fn timer(&self, duration: Duration) -> Self::Timer {
 975        Timer::after(duration)
 976    }
 977}
 978
 979fn broadcast<F>(
 980    sender_id: ConnectionId,
 981    receiver_ids: Vec<ConnectionId>,
 982    mut f: F,
 983) -> anyhow::Result<()>
 984where
 985    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 986{
 987    let mut result = Ok(());
 988    for receiver_id in receiver_ids {
 989        if receiver_id != sender_id {
 990            if let Err(error) = f(receiver_id) {
 991                if result.is_ok() {
 992                    result = Err(error);
 993                }
 994            }
 995        }
 996    }
 997    result
 998}
 999
1000pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1001    let server = Server::new(app.state().clone(), rpc.clone(), None);
1002    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1003        let server = server.clone();
1004        async move {
1005            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1006
1007            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1008            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1009            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1010            let client_protocol_version: Option<u32> = request
1011                .header("X-Zed-Protocol-Version")
1012                .and_then(|v| v.as_str().parse().ok());
1013
1014            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1015                return Ok(Response::new(StatusCode::UpgradeRequired));
1016            }
1017
1018            let header = match request.header("Sec-Websocket-Key") {
1019                Some(h) => h.as_str(),
1020                None => return Err(anyhow!("expected sec-websocket-key"))?,
1021            };
1022
1023            let user_id = process_auth_header(&request).await?;
1024
1025            let mut response = Response::new(StatusCode::SwitchingProtocols);
1026            response.insert_header(UPGRADE, "websocket");
1027            response.insert_header(CONNECTION, "Upgrade");
1028            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1029            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1030            response.insert_header("Sec-Websocket-Version", "13");
1031
1032            let http_res: &mut tide::http::Response = response.as_mut();
1033            let upgrade_receiver = http_res.recv_upgrade().await;
1034            let addr = request.remote().unwrap_or("unknown").to_string();
1035            task::spawn(async move {
1036                if let Some(stream) = upgrade_receiver.await {
1037                    server
1038                        .handle_connection(
1039                            Connection::new(
1040                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1041                            ),
1042                            addr,
1043                            user_id,
1044                            None,
1045                            RealExecutor,
1046                        )
1047                        .await;
1048                }
1049            });
1050
1051            Ok(response)
1052        }
1053    });
1054}
1055
1056fn header_contains_ignore_case<T>(
1057    request: &tide::Request<T>,
1058    header_name: HeaderName,
1059    value: &str,
1060) -> bool {
1061    request
1062        .header(header_name)
1063        .map(|h| {
1064            h.as_str()
1065                .split(',')
1066                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1067        })
1068        .unwrap_or(false)
1069}
1070
1071#[cfg(test)]
1072mod tests {
1073    use super::*;
1074    use crate::{
1075        auth,
1076        db::{tests::TestDb, UserId},
1077        github, AppState, Config,
1078    };
1079    use ::rpc::Peer;
1080    use client::{
1081        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1082        EstablishConnectionError, UserStore,
1083    };
1084    use collections::BTreeMap;
1085    use editor::{
1086        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1087        ToOffset, ToggleCodeActions, Undo,
1088    };
1089    use gpui::{executor, geometry::vector::vec2f, ModelHandle, TestAppContext, ViewHandle};
1090    use language::{
1091        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1092        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1093    };
1094    use lsp;
1095    use parking_lot::Mutex;
1096    use postage::barrier;
1097    use project::{
1098        fs::{FakeFs, Fs as _},
1099        search::SearchQuery,
1100        worktree::WorktreeHandle,
1101        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1102    };
1103    use rand::prelude::*;
1104    use rpc::PeerId;
1105    use serde_json::json;
1106    use sqlx::types::time::OffsetDateTime;
1107    use std::{
1108        cell::Cell,
1109        env,
1110        ops::Deref,
1111        path::{Path, PathBuf},
1112        rc::Rc,
1113        sync::{
1114            atomic::{AtomicBool, Ordering::SeqCst},
1115            Arc,
1116        },
1117        time::Duration,
1118    };
1119    use workspace::{Item, Settings, SplitDirection, Workspace, WorkspaceParams};
1120
1121    #[cfg(test)]
1122    #[ctor::ctor]
1123    fn init_logger() {
1124        if std::env::var("RUST_LOG").is_ok() {
1125            env_logger::init();
1126        }
1127    }
1128
1129    #[gpui::test(iterations = 10)]
1130    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1131        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1132        let lang_registry = Arc::new(LanguageRegistry::test());
1133        let fs = FakeFs::new(cx_a.background());
1134        cx_a.foreground().forbid_parking();
1135
1136        // Connect to a server as 2 clients.
1137        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1138        let client_a = server.create_client(cx_a, "user_a").await;
1139        let client_b = server.create_client(cx_b, "user_b").await;
1140
1141        // Share a project as client A
1142        fs.insert_tree(
1143            "/a",
1144            json!({
1145                ".zed.toml": r#"collaborators = ["user_b"]"#,
1146                "a.txt": "a-contents",
1147                "b.txt": "b-contents",
1148            }),
1149        )
1150        .await;
1151        let project_a = cx_a.update(|cx| {
1152            Project::local(
1153                client_a.clone(),
1154                client_a.user_store.clone(),
1155                lang_registry.clone(),
1156                fs.clone(),
1157                cx,
1158            )
1159        });
1160        let (worktree_a, _) = project_a
1161            .update(cx_a, |p, cx| {
1162                p.find_or_create_local_worktree("/a", true, cx)
1163            })
1164            .await
1165            .unwrap();
1166        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1167        worktree_a
1168            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1169            .await;
1170        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1171        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1172
1173        // Join that project as client B
1174        let project_b = Project::remote(
1175            project_id,
1176            client_b.clone(),
1177            client_b.user_store.clone(),
1178            lang_registry.clone(),
1179            fs.clone(),
1180            &mut cx_b.to_async(),
1181        )
1182        .await
1183        .unwrap();
1184
1185        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1186            assert_eq!(
1187                project
1188                    .collaborators()
1189                    .get(&client_a.peer_id)
1190                    .unwrap()
1191                    .user
1192                    .github_login,
1193                "user_a"
1194            );
1195            project.replica_id()
1196        });
1197        project_a
1198            .condition(&cx_a, |tree, _| {
1199                tree.collaborators()
1200                    .get(&client_b.peer_id)
1201                    .map_or(false, |collaborator| {
1202                        collaborator.replica_id == replica_id_b
1203                            && collaborator.user.github_login == "user_b"
1204                    })
1205            })
1206            .await;
1207
1208        // Open the same file as client B and client A.
1209        let buffer_b = project_b
1210            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1211            .await
1212            .unwrap();
1213        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1214        project_a.read_with(cx_a, |project, cx| {
1215            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1216        });
1217        let buffer_a = project_a
1218            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1219            .await
1220            .unwrap();
1221
1222        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1223
1224        // TODO
1225        // // Create a selection set as client B and see that selection set as client A.
1226        // buffer_a
1227        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1228        //     .await;
1229
1230        // Edit the buffer as client B and see that edit as client A.
1231        editor_b.update(cx_b, |editor, cx| {
1232            editor.handle_input(&Input("ok, ".into()), cx)
1233        });
1234        buffer_a
1235            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1236            .await;
1237
1238        // TODO
1239        // // Remove the selection set as client B, see those selections disappear as client A.
1240        cx_b.update(move |_| drop(editor_b));
1241        // buffer_a
1242        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1243        //     .await;
1244
1245        // Dropping the client B's project removes client B from client A's collaborators.
1246        cx_b.update(move |_| drop(project_b));
1247        project_a
1248            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1249            .await;
1250    }
1251
1252    #[gpui::test(iterations = 10)]
1253    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1254        let lang_registry = Arc::new(LanguageRegistry::test());
1255        let fs = FakeFs::new(cx_a.background());
1256        cx_a.foreground().forbid_parking();
1257
1258        // Connect to a server as 2 clients.
1259        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1260        let client_a = server.create_client(cx_a, "user_a").await;
1261        let client_b = server.create_client(cx_b, "user_b").await;
1262
1263        // Share a project as client A
1264        fs.insert_tree(
1265            "/a",
1266            json!({
1267                ".zed.toml": r#"collaborators = ["user_b"]"#,
1268                "a.txt": "a-contents",
1269                "b.txt": "b-contents",
1270            }),
1271        )
1272        .await;
1273        let project_a = cx_a.update(|cx| {
1274            Project::local(
1275                client_a.clone(),
1276                client_a.user_store.clone(),
1277                lang_registry.clone(),
1278                fs.clone(),
1279                cx,
1280            )
1281        });
1282        let (worktree_a, _) = project_a
1283            .update(cx_a, |p, cx| {
1284                p.find_or_create_local_worktree("/a", true, cx)
1285            })
1286            .await
1287            .unwrap();
1288        worktree_a
1289            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1290            .await;
1291        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1292        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1293        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1294        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1295
1296        // Join that project as client B
1297        let project_b = Project::remote(
1298            project_id,
1299            client_b.clone(),
1300            client_b.user_store.clone(),
1301            lang_registry.clone(),
1302            fs.clone(),
1303            &mut cx_b.to_async(),
1304        )
1305        .await
1306        .unwrap();
1307        project_b
1308            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1309            .await
1310            .unwrap();
1311
1312        // Unshare the project as client A
1313        project_a
1314            .update(cx_a, |project, cx| project.unshare(cx))
1315            .await
1316            .unwrap();
1317        project_b
1318            .condition(cx_b, |project, _| project.is_read_only())
1319            .await;
1320        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1321        cx_b.update(|_| {
1322            drop(project_b);
1323        });
1324
1325        // Share the project again and ensure guests can still join.
1326        project_a
1327            .update(cx_a, |project, cx| project.share(cx))
1328            .await
1329            .unwrap();
1330        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1331
1332        let project_b2 = Project::remote(
1333            project_id,
1334            client_b.clone(),
1335            client_b.user_store.clone(),
1336            lang_registry.clone(),
1337            fs.clone(),
1338            &mut cx_b.to_async(),
1339        )
1340        .await
1341        .unwrap();
1342        project_b2
1343            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1344            .await
1345            .unwrap();
1346    }
1347
1348    #[gpui::test(iterations = 10)]
1349    async fn test_propagate_saves_and_fs_changes(
1350        cx_a: &mut TestAppContext,
1351        cx_b: &mut TestAppContext,
1352        cx_c: &mut TestAppContext,
1353    ) {
1354        let lang_registry = Arc::new(LanguageRegistry::test());
1355        let fs = FakeFs::new(cx_a.background());
1356        cx_a.foreground().forbid_parking();
1357
1358        // Connect to a server as 3 clients.
1359        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1360        let client_a = server.create_client(cx_a, "user_a").await;
1361        let client_b = server.create_client(cx_b, "user_b").await;
1362        let client_c = server.create_client(cx_c, "user_c").await;
1363
1364        // Share a worktree as client A.
1365        fs.insert_tree(
1366            "/a",
1367            json!({
1368                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1369                "file1": "",
1370                "file2": ""
1371            }),
1372        )
1373        .await;
1374        let project_a = cx_a.update(|cx| {
1375            Project::local(
1376                client_a.clone(),
1377                client_a.user_store.clone(),
1378                lang_registry.clone(),
1379                fs.clone(),
1380                cx,
1381            )
1382        });
1383        let (worktree_a, _) = project_a
1384            .update(cx_a, |p, cx| {
1385                p.find_or_create_local_worktree("/a", true, cx)
1386            })
1387            .await
1388            .unwrap();
1389        worktree_a
1390            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1391            .await;
1392        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1393        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1394        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1395
1396        // Join that worktree as clients B and C.
1397        let project_b = Project::remote(
1398            project_id,
1399            client_b.clone(),
1400            client_b.user_store.clone(),
1401            lang_registry.clone(),
1402            fs.clone(),
1403            &mut cx_b.to_async(),
1404        )
1405        .await
1406        .unwrap();
1407        let project_c = Project::remote(
1408            project_id,
1409            client_c.clone(),
1410            client_c.user_store.clone(),
1411            lang_registry.clone(),
1412            fs.clone(),
1413            &mut cx_c.to_async(),
1414        )
1415        .await
1416        .unwrap();
1417        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1418        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1419
1420        // Open and edit a buffer as both guests B and C.
1421        let buffer_b = project_b
1422            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1423            .await
1424            .unwrap();
1425        let buffer_c = project_c
1426            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1427            .await
1428            .unwrap();
1429        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1430        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1431
1432        // Open and edit that buffer as the host.
1433        let buffer_a = project_a
1434            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1435            .await
1436            .unwrap();
1437
1438        buffer_a
1439            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1440            .await;
1441        buffer_a.update(cx_a, |buf, cx| {
1442            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1443        });
1444
1445        // Wait for edits to propagate
1446        buffer_a
1447            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1448            .await;
1449        buffer_b
1450            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1451            .await;
1452        buffer_c
1453            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1454            .await;
1455
1456        // Edit the buffer as the host and concurrently save as guest B.
1457        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1458        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1459        save_b.await.unwrap();
1460        assert_eq!(
1461            fs.load("/a/file1".as_ref()).await.unwrap(),
1462            "hi-a, i-am-c, i-am-b, i-am-a"
1463        );
1464        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1465        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1466        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1467
1468        worktree_a.flush_fs_events(cx_a).await;
1469
1470        // Make changes on host's file system, see those changes on guest worktrees.
1471        fs.rename(
1472            "/a/file1".as_ref(),
1473            "/a/file1-renamed".as_ref(),
1474            Default::default(),
1475        )
1476        .await
1477        .unwrap();
1478
1479        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1480            .await
1481            .unwrap();
1482        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1483
1484        worktree_a
1485            .condition(&cx_a, |tree, _| {
1486                tree.paths()
1487                    .map(|p| p.to_string_lossy())
1488                    .collect::<Vec<_>>()
1489                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1490            })
1491            .await;
1492        worktree_b
1493            .condition(&cx_b, |tree, _| {
1494                tree.paths()
1495                    .map(|p| p.to_string_lossy())
1496                    .collect::<Vec<_>>()
1497                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1498            })
1499            .await;
1500        worktree_c
1501            .condition(&cx_c, |tree, _| {
1502                tree.paths()
1503                    .map(|p| p.to_string_lossy())
1504                    .collect::<Vec<_>>()
1505                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1506            })
1507            .await;
1508
1509        // Ensure buffer files are updated as well.
1510        buffer_a
1511            .condition(&cx_a, |buf, _| {
1512                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1513            })
1514            .await;
1515        buffer_b
1516            .condition(&cx_b, |buf, _| {
1517                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1518            })
1519            .await;
1520        buffer_c
1521            .condition(&cx_c, |buf, _| {
1522                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1523            })
1524            .await;
1525    }
1526
1527    #[gpui::test(iterations = 10)]
1528    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1529        cx_a.foreground().forbid_parking();
1530        let lang_registry = Arc::new(LanguageRegistry::test());
1531        let fs = FakeFs::new(cx_a.background());
1532
1533        // Connect to a server as 2 clients.
1534        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1535        let client_a = server.create_client(cx_a, "user_a").await;
1536        let client_b = server.create_client(cx_b, "user_b").await;
1537
1538        // Share a project as client A
1539        fs.insert_tree(
1540            "/dir",
1541            json!({
1542                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1543                "a.txt": "a-contents",
1544            }),
1545        )
1546        .await;
1547
1548        let project_a = cx_a.update(|cx| {
1549            Project::local(
1550                client_a.clone(),
1551                client_a.user_store.clone(),
1552                lang_registry.clone(),
1553                fs.clone(),
1554                cx,
1555            )
1556        });
1557        let (worktree_a, _) = project_a
1558            .update(cx_a, |p, cx| {
1559                p.find_or_create_local_worktree("/dir", true, cx)
1560            })
1561            .await
1562            .unwrap();
1563        worktree_a
1564            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1565            .await;
1566        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1567        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1568        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1569
1570        // Join that project as client B
1571        let project_b = Project::remote(
1572            project_id,
1573            client_b.clone(),
1574            client_b.user_store.clone(),
1575            lang_registry.clone(),
1576            fs.clone(),
1577            &mut cx_b.to_async(),
1578        )
1579        .await
1580        .unwrap();
1581
1582        // Open a buffer as client B
1583        let buffer_b = project_b
1584            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1585            .await
1586            .unwrap();
1587
1588        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1589        buffer_b.read_with(cx_b, |buf, _| {
1590            assert!(buf.is_dirty());
1591            assert!(!buf.has_conflict());
1592        });
1593
1594        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1595        buffer_b
1596            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1597            .await;
1598        buffer_b.read_with(cx_b, |buf, _| {
1599            assert!(!buf.has_conflict());
1600        });
1601
1602        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1603        buffer_b.read_with(cx_b, |buf, _| {
1604            assert!(buf.is_dirty());
1605            assert!(!buf.has_conflict());
1606        });
1607    }
1608
1609    #[gpui::test(iterations = 10)]
1610    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1611        cx_a.foreground().forbid_parking();
1612        let lang_registry = Arc::new(LanguageRegistry::test());
1613        let fs = FakeFs::new(cx_a.background());
1614
1615        // Connect to a server as 2 clients.
1616        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1617        let client_a = server.create_client(cx_a, "user_a").await;
1618        let client_b = server.create_client(cx_b, "user_b").await;
1619
1620        // Share a project as client A
1621        fs.insert_tree(
1622            "/dir",
1623            json!({
1624                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1625                "a.txt": "a-contents",
1626            }),
1627        )
1628        .await;
1629
1630        let project_a = cx_a.update(|cx| {
1631            Project::local(
1632                client_a.clone(),
1633                client_a.user_store.clone(),
1634                lang_registry.clone(),
1635                fs.clone(),
1636                cx,
1637            )
1638        });
1639        let (worktree_a, _) = project_a
1640            .update(cx_a, |p, cx| {
1641                p.find_or_create_local_worktree("/dir", true, cx)
1642            })
1643            .await
1644            .unwrap();
1645        worktree_a
1646            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1647            .await;
1648        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1649        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1650        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1651
1652        // Join that project as client B
1653        let project_b = Project::remote(
1654            project_id,
1655            client_b.clone(),
1656            client_b.user_store.clone(),
1657            lang_registry.clone(),
1658            fs.clone(),
1659            &mut cx_b.to_async(),
1660        )
1661        .await
1662        .unwrap();
1663        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1664
1665        // Open a buffer as client B
1666        let buffer_b = project_b
1667            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1668            .await
1669            .unwrap();
1670        buffer_b.read_with(cx_b, |buf, _| {
1671            assert!(!buf.is_dirty());
1672            assert!(!buf.has_conflict());
1673        });
1674
1675        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1676            .await
1677            .unwrap();
1678        buffer_b
1679            .condition(&cx_b, |buf, _| {
1680                buf.text() == "new contents" && !buf.is_dirty()
1681            })
1682            .await;
1683        buffer_b.read_with(cx_b, |buf, _| {
1684            assert!(!buf.has_conflict());
1685        });
1686    }
1687
1688    #[gpui::test(iterations = 10)]
1689    async fn test_editing_while_guest_opens_buffer(
1690        cx_a: &mut TestAppContext,
1691        cx_b: &mut TestAppContext,
1692    ) {
1693        cx_a.foreground().forbid_parking();
1694        let lang_registry = Arc::new(LanguageRegistry::test());
1695        let fs = FakeFs::new(cx_a.background());
1696
1697        // Connect to a server as 2 clients.
1698        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1699        let client_a = server.create_client(cx_a, "user_a").await;
1700        let client_b = server.create_client(cx_b, "user_b").await;
1701
1702        // Share a project as client A
1703        fs.insert_tree(
1704            "/dir",
1705            json!({
1706                ".zed.toml": r#"collaborators = ["user_b"]"#,
1707                "a.txt": "a-contents",
1708            }),
1709        )
1710        .await;
1711        let project_a = cx_a.update(|cx| {
1712            Project::local(
1713                client_a.clone(),
1714                client_a.user_store.clone(),
1715                lang_registry.clone(),
1716                fs.clone(),
1717                cx,
1718            )
1719        });
1720        let (worktree_a, _) = project_a
1721            .update(cx_a, |p, cx| {
1722                p.find_or_create_local_worktree("/dir", true, cx)
1723            })
1724            .await
1725            .unwrap();
1726        worktree_a
1727            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1728            .await;
1729        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1730        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1731        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1732
1733        // Join that project as client B
1734        let project_b = Project::remote(
1735            project_id,
1736            client_b.clone(),
1737            client_b.user_store.clone(),
1738            lang_registry.clone(),
1739            fs.clone(),
1740            &mut cx_b.to_async(),
1741        )
1742        .await
1743        .unwrap();
1744
1745        // Open a buffer as client A
1746        let buffer_a = project_a
1747            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1748            .await
1749            .unwrap();
1750
1751        // Start opening the same buffer as client B
1752        let buffer_b = cx_b
1753            .background()
1754            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1755
1756        // Edit the buffer as client A while client B is still opening it.
1757        cx_b.background().simulate_random_delay().await;
1758        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1759        cx_b.background().simulate_random_delay().await;
1760        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1761
1762        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1763        let buffer_b = buffer_b.await.unwrap();
1764        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1765    }
1766
1767    #[gpui::test(iterations = 10)]
1768    async fn test_leaving_worktree_while_opening_buffer(
1769        cx_a: &mut TestAppContext,
1770        cx_b: &mut TestAppContext,
1771    ) {
1772        cx_a.foreground().forbid_parking();
1773        let lang_registry = Arc::new(LanguageRegistry::test());
1774        let fs = FakeFs::new(cx_a.background());
1775
1776        // Connect to a server as 2 clients.
1777        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1778        let client_a = server.create_client(cx_a, "user_a").await;
1779        let client_b = server.create_client(cx_b, "user_b").await;
1780
1781        // Share a project as client A
1782        fs.insert_tree(
1783            "/dir",
1784            json!({
1785                ".zed.toml": r#"collaborators = ["user_b"]"#,
1786                "a.txt": "a-contents",
1787            }),
1788        )
1789        .await;
1790        let project_a = cx_a.update(|cx| {
1791            Project::local(
1792                client_a.clone(),
1793                client_a.user_store.clone(),
1794                lang_registry.clone(),
1795                fs.clone(),
1796                cx,
1797            )
1798        });
1799        let (worktree_a, _) = project_a
1800            .update(cx_a, |p, cx| {
1801                p.find_or_create_local_worktree("/dir", true, cx)
1802            })
1803            .await
1804            .unwrap();
1805        worktree_a
1806            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1807            .await;
1808        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1809        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1810        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1811
1812        // Join that project as client B
1813        let project_b = Project::remote(
1814            project_id,
1815            client_b.clone(),
1816            client_b.user_store.clone(),
1817            lang_registry.clone(),
1818            fs.clone(),
1819            &mut cx_b.to_async(),
1820        )
1821        .await
1822        .unwrap();
1823
1824        // See that a guest has joined as client A.
1825        project_a
1826            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1827            .await;
1828
1829        // Begin opening a buffer as client B, but leave the project before the open completes.
1830        let buffer_b = cx_b
1831            .background()
1832            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1833        cx_b.update(|_| drop(project_b));
1834        drop(buffer_b);
1835
1836        // See that the guest has left.
1837        project_a
1838            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1839            .await;
1840    }
1841
1842    #[gpui::test(iterations = 10)]
1843    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1844        cx_a.foreground().forbid_parking();
1845        let lang_registry = Arc::new(LanguageRegistry::test());
1846        let fs = FakeFs::new(cx_a.background());
1847
1848        // Connect to a server as 2 clients.
1849        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1850        let client_a = server.create_client(cx_a, "user_a").await;
1851        let client_b = server.create_client(cx_b, "user_b").await;
1852
1853        // Share a project as client A
1854        fs.insert_tree(
1855            "/a",
1856            json!({
1857                ".zed.toml": r#"collaborators = ["user_b"]"#,
1858                "a.txt": "a-contents",
1859                "b.txt": "b-contents",
1860            }),
1861        )
1862        .await;
1863        let project_a = cx_a.update(|cx| {
1864            Project::local(
1865                client_a.clone(),
1866                client_a.user_store.clone(),
1867                lang_registry.clone(),
1868                fs.clone(),
1869                cx,
1870            )
1871        });
1872        let (worktree_a, _) = project_a
1873            .update(cx_a, |p, cx| {
1874                p.find_or_create_local_worktree("/a", true, cx)
1875            })
1876            .await
1877            .unwrap();
1878        worktree_a
1879            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1880            .await;
1881        let project_id = project_a
1882            .update(cx_a, |project, _| project.next_remote_id())
1883            .await;
1884        project_a
1885            .update(cx_a, |project, cx| project.share(cx))
1886            .await
1887            .unwrap();
1888
1889        // Join that project as client B
1890        let _project_b = Project::remote(
1891            project_id,
1892            client_b.clone(),
1893            client_b.user_store.clone(),
1894            lang_registry.clone(),
1895            fs.clone(),
1896            &mut cx_b.to_async(),
1897        )
1898        .await
1899        .unwrap();
1900
1901        // Client A sees that a guest has joined.
1902        project_a
1903            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1904            .await;
1905
1906        // Drop client B's connection and ensure client A observes client B leaving the project.
1907        client_b.disconnect(&cx_b.to_async()).unwrap();
1908        project_a
1909            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1910            .await;
1911
1912        // Rejoin the project as client B
1913        let _project_b = Project::remote(
1914            project_id,
1915            client_b.clone(),
1916            client_b.user_store.clone(),
1917            lang_registry.clone(),
1918            fs.clone(),
1919            &mut cx_b.to_async(),
1920        )
1921        .await
1922        .unwrap();
1923
1924        // Client A sees that a guest has re-joined.
1925        project_a
1926            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1927            .await;
1928
1929        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1930        client_b.wait_for_current_user(cx_b).await;
1931        server.disconnect_client(client_b.current_user_id(cx_b));
1932        cx_a.foreground().advance_clock(Duration::from_secs(3));
1933        project_a
1934            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1935            .await;
1936    }
1937
1938    #[gpui::test(iterations = 10)]
1939    async fn test_collaborating_with_diagnostics(
1940        cx_a: &mut TestAppContext,
1941        cx_b: &mut TestAppContext,
1942    ) {
1943        cx_a.foreground().forbid_parking();
1944        let mut lang_registry = Arc::new(LanguageRegistry::test());
1945        let fs = FakeFs::new(cx_a.background());
1946
1947        // Set up a fake language server.
1948        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1949        Arc::get_mut(&mut lang_registry)
1950            .unwrap()
1951            .add(Arc::new(Language::new(
1952                LanguageConfig {
1953                    name: "Rust".into(),
1954                    path_suffixes: vec!["rs".to_string()],
1955                    language_server: Some(language_server_config),
1956                    ..Default::default()
1957                },
1958                Some(tree_sitter_rust::language()),
1959            )));
1960
1961        // Connect to a server as 2 clients.
1962        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1963        let client_a = server.create_client(cx_a, "user_a").await;
1964        let client_b = server.create_client(cx_b, "user_b").await;
1965
1966        // Share a project as client A
1967        fs.insert_tree(
1968            "/a",
1969            json!({
1970                ".zed.toml": r#"collaborators = ["user_b"]"#,
1971                "a.rs": "let one = two",
1972                "other.rs": "",
1973            }),
1974        )
1975        .await;
1976        let project_a = cx_a.update(|cx| {
1977            Project::local(
1978                client_a.clone(),
1979                client_a.user_store.clone(),
1980                lang_registry.clone(),
1981                fs.clone(),
1982                cx,
1983            )
1984        });
1985        let (worktree_a, _) = project_a
1986            .update(cx_a, |p, cx| {
1987                p.find_or_create_local_worktree("/a", true, cx)
1988            })
1989            .await
1990            .unwrap();
1991        worktree_a
1992            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1993            .await;
1994        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1995        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1996        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1997
1998        // Cause the language server to start.
1999        let _ = cx_a
2000            .background()
2001            .spawn(project_a.update(cx_a, |project, cx| {
2002                project.open_buffer(
2003                    ProjectPath {
2004                        worktree_id,
2005                        path: Path::new("other.rs").into(),
2006                    },
2007                    cx,
2008                )
2009            }))
2010            .await
2011            .unwrap();
2012
2013        // Simulate a language server reporting errors for a file.
2014        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2015        fake_language_server
2016            .receive_notification::<lsp::notification::DidOpenTextDocument>()
2017            .await;
2018        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2019            lsp::PublishDiagnosticsParams {
2020                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2021                version: None,
2022                diagnostics: vec![lsp::Diagnostic {
2023                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2024                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2025                    message: "message 1".to_string(),
2026                    ..Default::default()
2027                }],
2028            },
2029        );
2030
2031        // Wait for server to see the diagnostics update.
2032        server
2033            .condition(|store| {
2034                let worktree = store
2035                    .project(project_id)
2036                    .unwrap()
2037                    .share
2038                    .as_ref()
2039                    .unwrap()
2040                    .worktrees
2041                    .get(&worktree_id.to_proto())
2042                    .unwrap();
2043
2044                !worktree.diagnostic_summaries.is_empty()
2045            })
2046            .await;
2047
2048        // Join the worktree as client B.
2049        let project_b = Project::remote(
2050            project_id,
2051            client_b.clone(),
2052            client_b.user_store.clone(),
2053            lang_registry.clone(),
2054            fs.clone(),
2055            &mut cx_b.to_async(),
2056        )
2057        .await
2058        .unwrap();
2059
2060        project_b.read_with(cx_b, |project, cx| {
2061            assert_eq!(
2062                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2063                &[(
2064                    ProjectPath {
2065                        worktree_id,
2066                        path: Arc::from(Path::new("a.rs")),
2067                    },
2068                    DiagnosticSummary {
2069                        error_count: 1,
2070                        warning_count: 0,
2071                        ..Default::default()
2072                    },
2073                )]
2074            )
2075        });
2076
2077        // Simulate a language server reporting more errors for a file.
2078        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2079            lsp::PublishDiagnosticsParams {
2080                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2081                version: None,
2082                diagnostics: vec![
2083                    lsp::Diagnostic {
2084                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2085                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2086                        message: "message 1".to_string(),
2087                        ..Default::default()
2088                    },
2089                    lsp::Diagnostic {
2090                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2091                        range: lsp::Range::new(
2092                            lsp::Position::new(0, 10),
2093                            lsp::Position::new(0, 13),
2094                        ),
2095                        message: "message 2".to_string(),
2096                        ..Default::default()
2097                    },
2098                ],
2099            },
2100        );
2101
2102        // Client b gets the updated summaries
2103        project_b
2104            .condition(&cx_b, |project, cx| {
2105                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2106                    == &[(
2107                        ProjectPath {
2108                            worktree_id,
2109                            path: Arc::from(Path::new("a.rs")),
2110                        },
2111                        DiagnosticSummary {
2112                            error_count: 1,
2113                            warning_count: 1,
2114                            ..Default::default()
2115                        },
2116                    )]
2117            })
2118            .await;
2119
2120        // Open the file with the errors on client B. They should be present.
2121        let buffer_b = cx_b
2122            .background()
2123            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2124            .await
2125            .unwrap();
2126
2127        buffer_b.read_with(cx_b, |buffer, _| {
2128            assert_eq!(
2129                buffer
2130                    .snapshot()
2131                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2132                    .map(|entry| entry)
2133                    .collect::<Vec<_>>(),
2134                &[
2135                    DiagnosticEntry {
2136                        range: Point::new(0, 4)..Point::new(0, 7),
2137                        diagnostic: Diagnostic {
2138                            group_id: 0,
2139                            message: "message 1".to_string(),
2140                            severity: lsp::DiagnosticSeverity::ERROR,
2141                            is_primary: true,
2142                            ..Default::default()
2143                        }
2144                    },
2145                    DiagnosticEntry {
2146                        range: Point::new(0, 10)..Point::new(0, 13),
2147                        diagnostic: Diagnostic {
2148                            group_id: 1,
2149                            severity: lsp::DiagnosticSeverity::WARNING,
2150                            message: "message 2".to_string(),
2151                            is_primary: true,
2152                            ..Default::default()
2153                        }
2154                    }
2155                ]
2156            );
2157        });
2158    }
2159
2160    #[gpui::test(iterations = 10)]
2161    async fn test_collaborating_with_completion(
2162        cx_a: &mut TestAppContext,
2163        cx_b: &mut TestAppContext,
2164    ) {
2165        cx_a.foreground().forbid_parking();
2166        let mut lang_registry = Arc::new(LanguageRegistry::test());
2167        let fs = FakeFs::new(cx_a.background());
2168
2169        // Set up a fake language server.
2170        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2171        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2172            completion_provider: Some(lsp::CompletionOptions {
2173                trigger_characters: Some(vec![".".to_string()]),
2174                ..Default::default()
2175            }),
2176            ..Default::default()
2177        });
2178        Arc::get_mut(&mut lang_registry)
2179            .unwrap()
2180            .add(Arc::new(Language::new(
2181                LanguageConfig {
2182                    name: "Rust".into(),
2183                    path_suffixes: vec!["rs".to_string()],
2184                    language_server: Some(language_server_config),
2185                    ..Default::default()
2186                },
2187                Some(tree_sitter_rust::language()),
2188            )));
2189
2190        // Connect to a server as 2 clients.
2191        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2192        let client_a = server.create_client(cx_a, "user_a").await;
2193        let client_b = server.create_client(cx_b, "user_b").await;
2194
2195        // Share a project as client A
2196        fs.insert_tree(
2197            "/a",
2198            json!({
2199                ".zed.toml": r#"collaborators = ["user_b"]"#,
2200                "main.rs": "fn main() { a }",
2201                "other.rs": "",
2202            }),
2203        )
2204        .await;
2205        let project_a = cx_a.update(|cx| {
2206            Project::local(
2207                client_a.clone(),
2208                client_a.user_store.clone(),
2209                lang_registry.clone(),
2210                fs.clone(),
2211                cx,
2212            )
2213        });
2214        let (worktree_a, _) = project_a
2215            .update(cx_a, |p, cx| {
2216                p.find_or_create_local_worktree("/a", true, cx)
2217            })
2218            .await
2219            .unwrap();
2220        worktree_a
2221            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2222            .await;
2223        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2224        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2225        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2226
2227        // Join the worktree as client B.
2228        let project_b = Project::remote(
2229            project_id,
2230            client_b.clone(),
2231            client_b.user_store.clone(),
2232            lang_registry.clone(),
2233            fs.clone(),
2234            &mut cx_b.to_async(),
2235        )
2236        .await
2237        .unwrap();
2238
2239        // Open a file in an editor as the guest.
2240        let buffer_b = project_b
2241            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2242            .await
2243            .unwrap();
2244        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2245        let editor_b = cx_b.add_view(window_b, |cx| {
2246            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2247        });
2248
2249        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2250        buffer_b
2251            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2252            .await;
2253
2254        // Type a completion trigger character as the guest.
2255        editor_b.update(cx_b, |editor, cx| {
2256            editor.select_ranges([13..13], None, cx);
2257            editor.handle_input(&Input(".".into()), cx);
2258            cx.focus(&editor_b);
2259        });
2260
2261        // Receive a completion request as the host's language server.
2262        // Return some completions from the host's language server.
2263        cx_a.foreground().start_waiting();
2264        fake_language_server
2265            .handle_request::<lsp::request::Completion, _>(|params, _| {
2266                assert_eq!(
2267                    params.text_document_position.text_document.uri,
2268                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2269                );
2270                assert_eq!(
2271                    params.text_document_position.position,
2272                    lsp::Position::new(0, 14),
2273                );
2274
2275                Some(lsp::CompletionResponse::Array(vec![
2276                    lsp::CompletionItem {
2277                        label: "first_method(…)".into(),
2278                        detail: Some("fn(&mut self, B) -> C".into()),
2279                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2280                            new_text: "first_method($1)".to_string(),
2281                            range: lsp::Range::new(
2282                                lsp::Position::new(0, 14),
2283                                lsp::Position::new(0, 14),
2284                            ),
2285                        })),
2286                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2287                        ..Default::default()
2288                    },
2289                    lsp::CompletionItem {
2290                        label: "second_method(…)".into(),
2291                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2292                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2293                            new_text: "second_method()".to_string(),
2294                            range: lsp::Range::new(
2295                                lsp::Position::new(0, 14),
2296                                lsp::Position::new(0, 14),
2297                            ),
2298                        })),
2299                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2300                        ..Default::default()
2301                    },
2302                ]))
2303            })
2304            .next()
2305            .await
2306            .unwrap();
2307        cx_a.foreground().finish_waiting();
2308
2309        // Open the buffer on the host.
2310        let buffer_a = project_a
2311            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2312            .await
2313            .unwrap();
2314        buffer_a
2315            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2316            .await;
2317
2318        // Confirm a completion on the guest.
2319        editor_b
2320            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2321            .await;
2322        editor_b.update(cx_b, |editor, cx| {
2323            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2324            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2325        });
2326
2327        // Return a resolved completion from the host's language server.
2328        // The resolved completion has an additional text edit.
2329        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2330            |params, _| {
2331                assert_eq!(params.label, "first_method(…)");
2332                lsp::CompletionItem {
2333                    label: "first_method(…)".into(),
2334                    detail: Some("fn(&mut self, B) -> C".into()),
2335                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2336                        new_text: "first_method($1)".to_string(),
2337                        range: lsp::Range::new(
2338                            lsp::Position::new(0, 14),
2339                            lsp::Position::new(0, 14),
2340                        ),
2341                    })),
2342                    additional_text_edits: Some(vec![lsp::TextEdit {
2343                        new_text: "use d::SomeTrait;\n".to_string(),
2344                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2345                    }]),
2346                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2347                    ..Default::default()
2348                }
2349            },
2350        );
2351
2352        // The additional edit is applied.
2353        buffer_a
2354            .condition(&cx_a, |buffer, _| {
2355                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2356            })
2357            .await;
2358        buffer_b
2359            .condition(&cx_b, |buffer, _| {
2360                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2361            })
2362            .await;
2363    }
2364
2365    #[gpui::test(iterations = 10)]
2366    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2367        cx_a.foreground().forbid_parking();
2368        let mut lang_registry = Arc::new(LanguageRegistry::test());
2369        let fs = FakeFs::new(cx_a.background());
2370
2371        // Set up a fake language server.
2372        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2373        Arc::get_mut(&mut lang_registry)
2374            .unwrap()
2375            .add(Arc::new(Language::new(
2376                LanguageConfig {
2377                    name: "Rust".into(),
2378                    path_suffixes: vec!["rs".to_string()],
2379                    language_server: Some(language_server_config),
2380                    ..Default::default()
2381                },
2382                Some(tree_sitter_rust::language()),
2383            )));
2384
2385        // Connect to a server as 2 clients.
2386        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2387        let client_a = server.create_client(cx_a, "user_a").await;
2388        let client_b = server.create_client(cx_b, "user_b").await;
2389
2390        // Share a project as client A
2391        fs.insert_tree(
2392            "/a",
2393            json!({
2394                ".zed.toml": r#"collaborators = ["user_b"]"#,
2395                "a.rs": "let one = two",
2396            }),
2397        )
2398        .await;
2399        let project_a = cx_a.update(|cx| {
2400            Project::local(
2401                client_a.clone(),
2402                client_a.user_store.clone(),
2403                lang_registry.clone(),
2404                fs.clone(),
2405                cx,
2406            )
2407        });
2408        let (worktree_a, _) = project_a
2409            .update(cx_a, |p, cx| {
2410                p.find_or_create_local_worktree("/a", true, cx)
2411            })
2412            .await
2413            .unwrap();
2414        worktree_a
2415            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2416            .await;
2417        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2418        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2419        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2420
2421        // Join the worktree as client B.
2422        let project_b = Project::remote(
2423            project_id,
2424            client_b.clone(),
2425            client_b.user_store.clone(),
2426            lang_registry.clone(),
2427            fs.clone(),
2428            &mut cx_b.to_async(),
2429        )
2430        .await
2431        .unwrap();
2432
2433        let buffer_b = cx_b
2434            .background()
2435            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2436            .await
2437            .unwrap();
2438
2439        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2440        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2441            Some(vec![
2442                lsp::TextEdit {
2443                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2444                    new_text: "h".to_string(),
2445                },
2446                lsp::TextEdit {
2447                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2448                    new_text: "y".to_string(),
2449                },
2450            ])
2451        });
2452
2453        project_b
2454            .update(cx_b, |project, cx| {
2455                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2456            })
2457            .await
2458            .unwrap();
2459        assert_eq!(
2460            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2461            "let honey = two"
2462        );
2463    }
2464
2465    #[gpui::test(iterations = 10)]
2466    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2467        cx_a.foreground().forbid_parking();
2468        let mut lang_registry = Arc::new(LanguageRegistry::test());
2469        let fs = FakeFs::new(cx_a.background());
2470        fs.insert_tree(
2471            "/root-1",
2472            json!({
2473                ".zed.toml": r#"collaborators = ["user_b"]"#,
2474                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2475            }),
2476        )
2477        .await;
2478        fs.insert_tree(
2479            "/root-2",
2480            json!({
2481                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2482            }),
2483        )
2484        .await;
2485
2486        // Set up a fake language server.
2487        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2488        Arc::get_mut(&mut lang_registry)
2489            .unwrap()
2490            .add(Arc::new(Language::new(
2491                LanguageConfig {
2492                    name: "Rust".into(),
2493                    path_suffixes: vec!["rs".to_string()],
2494                    language_server: Some(language_server_config),
2495                    ..Default::default()
2496                },
2497                Some(tree_sitter_rust::language()),
2498            )));
2499
2500        // Connect to a server as 2 clients.
2501        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2502        let client_a = server.create_client(cx_a, "user_a").await;
2503        let client_b = server.create_client(cx_b, "user_b").await;
2504
2505        // Share a project as client A
2506        let project_a = cx_a.update(|cx| {
2507            Project::local(
2508                client_a.clone(),
2509                client_a.user_store.clone(),
2510                lang_registry.clone(),
2511                fs.clone(),
2512                cx,
2513            )
2514        });
2515        let (worktree_a, _) = project_a
2516            .update(cx_a, |p, cx| {
2517                p.find_or_create_local_worktree("/root-1", true, cx)
2518            })
2519            .await
2520            .unwrap();
2521        worktree_a
2522            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2523            .await;
2524        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2525        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2526        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2527
2528        // Join the worktree as client B.
2529        let project_b = Project::remote(
2530            project_id,
2531            client_b.clone(),
2532            client_b.user_store.clone(),
2533            lang_registry.clone(),
2534            fs.clone(),
2535            &mut cx_b.to_async(),
2536        )
2537        .await
2538        .unwrap();
2539
2540        // Open the file on client B.
2541        let buffer_b = cx_b
2542            .background()
2543            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2544            .await
2545            .unwrap();
2546
2547        // Request the definition of a symbol as the guest.
2548        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2549        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2550            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2551                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2552                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2553            )))
2554        });
2555
2556        let definitions_1 = project_b
2557            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2558            .await
2559            .unwrap();
2560        cx_b.read(|cx| {
2561            assert_eq!(definitions_1.len(), 1);
2562            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2563            let target_buffer = definitions_1[0].buffer.read(cx);
2564            assert_eq!(
2565                target_buffer.text(),
2566                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2567            );
2568            assert_eq!(
2569                definitions_1[0].range.to_point(target_buffer),
2570                Point::new(0, 6)..Point::new(0, 9)
2571            );
2572        });
2573
2574        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2575        // the previous call to `definition`.
2576        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2577            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2578                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2579                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2580            )))
2581        });
2582
2583        let definitions_2 = project_b
2584            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2585            .await
2586            .unwrap();
2587        cx_b.read(|cx| {
2588            assert_eq!(definitions_2.len(), 1);
2589            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2590            let target_buffer = definitions_2[0].buffer.read(cx);
2591            assert_eq!(
2592                target_buffer.text(),
2593                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2594            );
2595            assert_eq!(
2596                definitions_2[0].range.to_point(target_buffer),
2597                Point::new(1, 6)..Point::new(1, 11)
2598            );
2599        });
2600        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2601    }
2602
2603    #[gpui::test(iterations = 10)]
2604    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2605        cx_a.foreground().forbid_parking();
2606        let mut lang_registry = Arc::new(LanguageRegistry::test());
2607        let fs = FakeFs::new(cx_a.background());
2608        fs.insert_tree(
2609            "/root-1",
2610            json!({
2611                ".zed.toml": r#"collaborators = ["user_b"]"#,
2612                "one.rs": "const ONE: usize = 1;",
2613                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2614            }),
2615        )
2616        .await;
2617        fs.insert_tree(
2618            "/root-2",
2619            json!({
2620                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2621            }),
2622        )
2623        .await;
2624
2625        // Set up a fake language server.
2626        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2627        Arc::get_mut(&mut lang_registry)
2628            .unwrap()
2629            .add(Arc::new(Language::new(
2630                LanguageConfig {
2631                    name: "Rust".into(),
2632                    path_suffixes: vec!["rs".to_string()],
2633                    language_server: Some(language_server_config),
2634                    ..Default::default()
2635                },
2636                Some(tree_sitter_rust::language()),
2637            )));
2638
2639        // Connect to a server as 2 clients.
2640        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2641        let client_a = server.create_client(cx_a, "user_a").await;
2642        let client_b = server.create_client(cx_b, "user_b").await;
2643
2644        // Share a project as client A
2645        let project_a = cx_a.update(|cx| {
2646            Project::local(
2647                client_a.clone(),
2648                client_a.user_store.clone(),
2649                lang_registry.clone(),
2650                fs.clone(),
2651                cx,
2652            )
2653        });
2654        let (worktree_a, _) = project_a
2655            .update(cx_a, |p, cx| {
2656                p.find_or_create_local_worktree("/root-1", true, cx)
2657            })
2658            .await
2659            .unwrap();
2660        worktree_a
2661            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2662            .await;
2663        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2664        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2665        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2666
2667        // Join the worktree as client B.
2668        let project_b = Project::remote(
2669            project_id,
2670            client_b.clone(),
2671            client_b.user_store.clone(),
2672            lang_registry.clone(),
2673            fs.clone(),
2674            &mut cx_b.to_async(),
2675        )
2676        .await
2677        .unwrap();
2678
2679        // Open the file on client B.
2680        let buffer_b = cx_b
2681            .background()
2682            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2683            .await
2684            .unwrap();
2685
2686        // Request references to a symbol as the guest.
2687        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2688        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2689            assert_eq!(
2690                params.text_document_position.text_document.uri.as_str(),
2691                "file:///root-1/one.rs"
2692            );
2693            Some(vec![
2694                lsp::Location {
2695                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2696                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2697                },
2698                lsp::Location {
2699                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2700                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2701                },
2702                lsp::Location {
2703                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2704                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2705                },
2706            ])
2707        });
2708
2709        let references = project_b
2710            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2711            .await
2712            .unwrap();
2713        cx_b.read(|cx| {
2714            assert_eq!(references.len(), 3);
2715            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2716
2717            let two_buffer = references[0].buffer.read(cx);
2718            let three_buffer = references[2].buffer.read(cx);
2719            assert_eq!(
2720                two_buffer.file().unwrap().path().as_ref(),
2721                Path::new("two.rs")
2722            );
2723            assert_eq!(references[1].buffer, references[0].buffer);
2724            assert_eq!(
2725                three_buffer.file().unwrap().full_path(cx),
2726                Path::new("three.rs")
2727            );
2728
2729            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2730            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2731            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2732        });
2733    }
2734
2735    #[gpui::test(iterations = 10)]
2736    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2737        cx_a.foreground().forbid_parking();
2738        let lang_registry = Arc::new(LanguageRegistry::test());
2739        let fs = FakeFs::new(cx_a.background());
2740        fs.insert_tree(
2741            "/root-1",
2742            json!({
2743                ".zed.toml": r#"collaborators = ["user_b"]"#,
2744                "a": "hello world",
2745                "b": "goodnight moon",
2746                "c": "a world of goo",
2747                "d": "world champion of clown world",
2748            }),
2749        )
2750        .await;
2751        fs.insert_tree(
2752            "/root-2",
2753            json!({
2754                "e": "disney world is fun",
2755            }),
2756        )
2757        .await;
2758
2759        // Connect to a server as 2 clients.
2760        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2761        let client_a = server.create_client(cx_a, "user_a").await;
2762        let client_b = server.create_client(cx_b, "user_b").await;
2763
2764        // Share a project as client A
2765        let project_a = cx_a.update(|cx| {
2766            Project::local(
2767                client_a.clone(),
2768                client_a.user_store.clone(),
2769                lang_registry.clone(),
2770                fs.clone(),
2771                cx,
2772            )
2773        });
2774        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2775
2776        let (worktree_1, _) = project_a
2777            .update(cx_a, |p, cx| {
2778                p.find_or_create_local_worktree("/root-1", true, cx)
2779            })
2780            .await
2781            .unwrap();
2782        worktree_1
2783            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2784            .await;
2785        let (worktree_2, _) = project_a
2786            .update(cx_a, |p, cx| {
2787                p.find_or_create_local_worktree("/root-2", true, cx)
2788            })
2789            .await
2790            .unwrap();
2791        worktree_2
2792            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2793            .await;
2794
2795        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2796
2797        // Join the worktree as client B.
2798        let project_b = Project::remote(
2799            project_id,
2800            client_b.clone(),
2801            client_b.user_store.clone(),
2802            lang_registry.clone(),
2803            fs.clone(),
2804            &mut cx_b.to_async(),
2805        )
2806        .await
2807        .unwrap();
2808
2809        let results = project_b
2810            .update(cx_b, |project, cx| {
2811                project.search(SearchQuery::text("world", false, false), cx)
2812            })
2813            .await
2814            .unwrap();
2815
2816        let mut ranges_by_path = results
2817            .into_iter()
2818            .map(|(buffer, ranges)| {
2819                buffer.read_with(cx_b, |buffer, cx| {
2820                    let path = buffer.file().unwrap().full_path(cx);
2821                    let offset_ranges = ranges
2822                        .into_iter()
2823                        .map(|range| range.to_offset(buffer))
2824                        .collect::<Vec<_>>();
2825                    (path, offset_ranges)
2826                })
2827            })
2828            .collect::<Vec<_>>();
2829        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2830
2831        assert_eq!(
2832            ranges_by_path,
2833            &[
2834                (PathBuf::from("root-1/a"), vec![6..11]),
2835                (PathBuf::from("root-1/c"), vec![2..7]),
2836                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2837                (PathBuf::from("root-2/e"), vec![7..12]),
2838            ]
2839        );
2840    }
2841
2842    #[gpui::test(iterations = 10)]
2843    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2844        cx_a.foreground().forbid_parking();
2845        let lang_registry = Arc::new(LanguageRegistry::test());
2846        let fs = FakeFs::new(cx_a.background());
2847        fs.insert_tree(
2848            "/root-1",
2849            json!({
2850                ".zed.toml": r#"collaborators = ["user_b"]"#,
2851                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2852            }),
2853        )
2854        .await;
2855
2856        // Set up a fake language server.
2857        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2858        lang_registry.add(Arc::new(Language::new(
2859            LanguageConfig {
2860                name: "Rust".into(),
2861                path_suffixes: vec!["rs".to_string()],
2862                language_server: Some(language_server_config),
2863                ..Default::default()
2864            },
2865            Some(tree_sitter_rust::language()),
2866        )));
2867
2868        // Connect to a server as 2 clients.
2869        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2870        let client_a = server.create_client(cx_a, "user_a").await;
2871        let client_b = server.create_client(cx_b, "user_b").await;
2872
2873        // Share a project as client A
2874        let project_a = cx_a.update(|cx| {
2875            Project::local(
2876                client_a.clone(),
2877                client_a.user_store.clone(),
2878                lang_registry.clone(),
2879                fs.clone(),
2880                cx,
2881            )
2882        });
2883        let (worktree_a, _) = project_a
2884            .update(cx_a, |p, cx| {
2885                p.find_or_create_local_worktree("/root-1", true, cx)
2886            })
2887            .await
2888            .unwrap();
2889        worktree_a
2890            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2891            .await;
2892        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2893        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2894        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2895
2896        // Join the worktree as client B.
2897        let project_b = Project::remote(
2898            project_id,
2899            client_b.clone(),
2900            client_b.user_store.clone(),
2901            lang_registry.clone(),
2902            fs.clone(),
2903            &mut cx_b.to_async(),
2904        )
2905        .await
2906        .unwrap();
2907
2908        // Open the file on client B.
2909        let buffer_b = cx_b
2910            .background()
2911            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2912            .await
2913            .unwrap();
2914
2915        // Request document highlights as the guest.
2916        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2917        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2918            |params, _| {
2919                assert_eq!(
2920                    params
2921                        .text_document_position_params
2922                        .text_document
2923                        .uri
2924                        .as_str(),
2925                    "file:///root-1/main.rs"
2926                );
2927                assert_eq!(
2928                    params.text_document_position_params.position,
2929                    lsp::Position::new(0, 34)
2930                );
2931                Some(vec![
2932                    lsp::DocumentHighlight {
2933                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2934                        range: lsp::Range::new(
2935                            lsp::Position::new(0, 10),
2936                            lsp::Position::new(0, 16),
2937                        ),
2938                    },
2939                    lsp::DocumentHighlight {
2940                        kind: Some(lsp::DocumentHighlightKind::READ),
2941                        range: lsp::Range::new(
2942                            lsp::Position::new(0, 32),
2943                            lsp::Position::new(0, 38),
2944                        ),
2945                    },
2946                    lsp::DocumentHighlight {
2947                        kind: Some(lsp::DocumentHighlightKind::READ),
2948                        range: lsp::Range::new(
2949                            lsp::Position::new(0, 41),
2950                            lsp::Position::new(0, 47),
2951                        ),
2952                    },
2953                ])
2954            },
2955        );
2956
2957        let highlights = project_b
2958            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2959            .await
2960            .unwrap();
2961        buffer_b.read_with(cx_b, |buffer, _| {
2962            let snapshot = buffer.snapshot();
2963
2964            let highlights = highlights
2965                .into_iter()
2966                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2967                .collect::<Vec<_>>();
2968            assert_eq!(
2969                highlights,
2970                &[
2971                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2972                    (lsp::DocumentHighlightKind::READ, 32..38),
2973                    (lsp::DocumentHighlightKind::READ, 41..47)
2974                ]
2975            )
2976        });
2977    }
2978
2979    #[gpui::test(iterations = 10)]
2980    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2981        cx_a.foreground().forbid_parking();
2982        let mut lang_registry = Arc::new(LanguageRegistry::test());
2983        let fs = FakeFs::new(cx_a.background());
2984        fs.insert_tree(
2985            "/code",
2986            json!({
2987                "crate-1": {
2988                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2989                    "one.rs": "const ONE: usize = 1;",
2990                },
2991                "crate-2": {
2992                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2993                },
2994                "private": {
2995                    "passwords.txt": "the-password",
2996                }
2997            }),
2998        )
2999        .await;
3000
3001        // Set up a fake language server.
3002        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3003        Arc::get_mut(&mut lang_registry)
3004            .unwrap()
3005            .add(Arc::new(Language::new(
3006                LanguageConfig {
3007                    name: "Rust".into(),
3008                    path_suffixes: vec!["rs".to_string()],
3009                    language_server: Some(language_server_config),
3010                    ..Default::default()
3011                },
3012                Some(tree_sitter_rust::language()),
3013            )));
3014
3015        // Connect to a server as 2 clients.
3016        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3017        let client_a = server.create_client(cx_a, "user_a").await;
3018        let client_b = server.create_client(cx_b, "user_b").await;
3019
3020        // Share a project as client A
3021        let project_a = cx_a.update(|cx| {
3022            Project::local(
3023                client_a.clone(),
3024                client_a.user_store.clone(),
3025                lang_registry.clone(),
3026                fs.clone(),
3027                cx,
3028            )
3029        });
3030        let (worktree_a, _) = project_a
3031            .update(cx_a, |p, cx| {
3032                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3033            })
3034            .await
3035            .unwrap();
3036        worktree_a
3037            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3038            .await;
3039        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3040        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3041        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3042
3043        // Join the worktree as client B.
3044        let project_b = Project::remote(
3045            project_id,
3046            client_b.clone(),
3047            client_b.user_store.clone(),
3048            lang_registry.clone(),
3049            fs.clone(),
3050            &mut cx_b.to_async(),
3051        )
3052        .await
3053        .unwrap();
3054
3055        // Cause the language server to start.
3056        let _buffer = cx_b
3057            .background()
3058            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3059            .await
3060            .unwrap();
3061
3062        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3063        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3064            #[allow(deprecated)]
3065            Some(vec![lsp::SymbolInformation {
3066                name: "TWO".into(),
3067                location: lsp::Location {
3068                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3069                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3070                },
3071                kind: lsp::SymbolKind::CONSTANT,
3072                tags: None,
3073                container_name: None,
3074                deprecated: None,
3075            }])
3076        });
3077
3078        // Request the definition of a symbol as the guest.
3079        let symbols = project_b
3080            .update(cx_b, |p, cx| p.symbols("two", cx))
3081            .await
3082            .unwrap();
3083        assert_eq!(symbols.len(), 1);
3084        assert_eq!(symbols[0].name, "TWO");
3085
3086        // Open one of the returned symbols.
3087        let buffer_b_2 = project_b
3088            .update(cx_b, |project, cx| {
3089                project.open_buffer_for_symbol(&symbols[0], cx)
3090            })
3091            .await
3092            .unwrap();
3093        buffer_b_2.read_with(cx_b, |buffer, _| {
3094            assert_eq!(
3095                buffer.file().unwrap().path().as_ref(),
3096                Path::new("../crate-2/two.rs")
3097            );
3098        });
3099
3100        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3101        let mut fake_symbol = symbols[0].clone();
3102        fake_symbol.path = Path::new("/code/secrets").into();
3103        let error = project_b
3104            .update(cx_b, |project, cx| {
3105                project.open_buffer_for_symbol(&fake_symbol, cx)
3106            })
3107            .await
3108            .unwrap_err();
3109        assert!(error.to_string().contains("invalid symbol signature"));
3110    }
3111
3112    #[gpui::test(iterations = 10)]
3113    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3114        cx_a: &mut TestAppContext,
3115        cx_b: &mut TestAppContext,
3116        mut rng: StdRng,
3117    ) {
3118        cx_a.foreground().forbid_parking();
3119        let mut lang_registry = Arc::new(LanguageRegistry::test());
3120        let fs = FakeFs::new(cx_a.background());
3121        fs.insert_tree(
3122            "/root",
3123            json!({
3124                ".zed.toml": r#"collaborators = ["user_b"]"#,
3125                "a.rs": "const ONE: usize = b::TWO;",
3126                "b.rs": "const TWO: usize = 2",
3127            }),
3128        )
3129        .await;
3130
3131        // Set up a fake language server.
3132        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3133
3134        Arc::get_mut(&mut lang_registry)
3135            .unwrap()
3136            .add(Arc::new(Language::new(
3137                LanguageConfig {
3138                    name: "Rust".into(),
3139                    path_suffixes: vec!["rs".to_string()],
3140                    language_server: Some(language_server_config),
3141                    ..Default::default()
3142                },
3143                Some(tree_sitter_rust::language()),
3144            )));
3145
3146        // Connect to a server as 2 clients.
3147        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3148        let client_a = server.create_client(cx_a, "user_a").await;
3149        let client_b = server.create_client(cx_b, "user_b").await;
3150
3151        // Share a project as client A
3152        let project_a = cx_a.update(|cx| {
3153            Project::local(
3154                client_a.clone(),
3155                client_a.user_store.clone(),
3156                lang_registry.clone(),
3157                fs.clone(),
3158                cx,
3159            )
3160        });
3161
3162        let (worktree_a, _) = project_a
3163            .update(cx_a, |p, cx| {
3164                p.find_or_create_local_worktree("/root", true, cx)
3165            })
3166            .await
3167            .unwrap();
3168        worktree_a
3169            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3170            .await;
3171        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3172        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3173        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3174
3175        // Join the worktree as client B.
3176        let project_b = Project::remote(
3177            project_id,
3178            client_b.clone(),
3179            client_b.user_store.clone(),
3180            lang_registry.clone(),
3181            fs.clone(),
3182            &mut cx_b.to_async(),
3183        )
3184        .await
3185        .unwrap();
3186
3187        let buffer_b1 = cx_b
3188            .background()
3189            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3190            .await
3191            .unwrap();
3192
3193        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3194        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3195            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3196                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3197                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3198            )))
3199        });
3200
3201        let definitions;
3202        let buffer_b2;
3203        if rng.gen() {
3204            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3205            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3206        } else {
3207            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3208            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3209        }
3210
3211        let buffer_b2 = buffer_b2.await.unwrap();
3212        let definitions = definitions.await.unwrap();
3213        assert_eq!(definitions.len(), 1);
3214        assert_eq!(definitions[0].buffer, buffer_b2);
3215    }
3216
3217    #[gpui::test(iterations = 10)]
3218    async fn test_collaborating_with_code_actions(
3219        cx_a: &mut TestAppContext,
3220        cx_b: &mut TestAppContext,
3221    ) {
3222        cx_a.foreground().forbid_parking();
3223        let mut lang_registry = Arc::new(LanguageRegistry::test());
3224        let fs = FakeFs::new(cx_a.background());
3225        cx_b.update(|cx| editor::init(cx));
3226
3227        // Set up a fake language server.
3228        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3229        Arc::get_mut(&mut lang_registry)
3230            .unwrap()
3231            .add(Arc::new(Language::new(
3232                LanguageConfig {
3233                    name: "Rust".into(),
3234                    path_suffixes: vec!["rs".to_string()],
3235                    language_server: Some(language_server_config),
3236                    ..Default::default()
3237                },
3238                Some(tree_sitter_rust::language()),
3239            )));
3240
3241        // Connect to a server as 2 clients.
3242        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3243        let client_a = server.create_client(cx_a, "user_a").await;
3244        let client_b = server.create_client(cx_b, "user_b").await;
3245
3246        // Share a project as client A
3247        fs.insert_tree(
3248            "/a",
3249            json!({
3250                ".zed.toml": r#"collaborators = ["user_b"]"#,
3251                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3252                "other.rs": "pub fn foo() -> usize { 4 }",
3253            }),
3254        )
3255        .await;
3256        let project_a = cx_a.update(|cx| {
3257            Project::local(
3258                client_a.clone(),
3259                client_a.user_store.clone(),
3260                lang_registry.clone(),
3261                fs.clone(),
3262                cx,
3263            )
3264        });
3265        let (worktree_a, _) = project_a
3266            .update(cx_a, |p, cx| {
3267                p.find_or_create_local_worktree("/a", true, cx)
3268            })
3269            .await
3270            .unwrap();
3271        worktree_a
3272            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3273            .await;
3274        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3275        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3276        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3277
3278        // Join the worktree as client B.
3279        let project_b = Project::remote(
3280            project_id,
3281            client_b.clone(),
3282            client_b.user_store.clone(),
3283            lang_registry.clone(),
3284            fs.clone(),
3285            &mut cx_b.to_async(),
3286        )
3287        .await
3288        .unwrap();
3289        let mut params = cx_b.update(WorkspaceParams::test);
3290        params.languages = lang_registry.clone();
3291        params.client = client_b.client.clone();
3292        params.user_store = client_b.user_store.clone();
3293        params.project = project_b;
3294
3295        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3296        let editor_b = workspace_b
3297            .update(cx_b, |workspace, cx| {
3298                workspace.open_path((worktree_id, "main.rs"), cx)
3299            })
3300            .await
3301            .unwrap()
3302            .downcast::<Editor>()
3303            .unwrap();
3304
3305        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3306        fake_language_server
3307            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3308                assert_eq!(
3309                    params.text_document.uri,
3310                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3311                );
3312                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3313                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3314                None
3315            })
3316            .next()
3317            .await;
3318
3319        // Move cursor to a location that contains code actions.
3320        editor_b.update(cx_b, |editor, cx| {
3321            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3322            cx.focus(&editor_b);
3323        });
3324
3325        fake_language_server
3326            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3327                assert_eq!(
3328                    params.text_document.uri,
3329                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3330                );
3331                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3332                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3333
3334                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3335                    lsp::CodeAction {
3336                        title: "Inline into all callers".to_string(),
3337                        edit: Some(lsp::WorkspaceEdit {
3338                            changes: Some(
3339                                [
3340                                    (
3341                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3342                                        vec![lsp::TextEdit::new(
3343                                            lsp::Range::new(
3344                                                lsp::Position::new(1, 22),
3345                                                lsp::Position::new(1, 34),
3346                                            ),
3347                                            "4".to_string(),
3348                                        )],
3349                                    ),
3350                                    (
3351                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3352                                        vec![lsp::TextEdit::new(
3353                                            lsp::Range::new(
3354                                                lsp::Position::new(0, 0),
3355                                                lsp::Position::new(0, 27),
3356                                            ),
3357                                            "".to_string(),
3358                                        )],
3359                                    ),
3360                                ]
3361                                .into_iter()
3362                                .collect(),
3363                            ),
3364                            ..Default::default()
3365                        }),
3366                        data: Some(json!({
3367                            "codeActionParams": {
3368                                "range": {
3369                                    "start": {"line": 1, "column": 31},
3370                                    "end": {"line": 1, "column": 31},
3371                                }
3372                            }
3373                        })),
3374                        ..Default::default()
3375                    },
3376                )])
3377            })
3378            .next()
3379            .await;
3380
3381        // Toggle code actions and wait for them to display.
3382        editor_b.update(cx_b, |editor, cx| {
3383            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3384        });
3385        editor_b
3386            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3387            .await;
3388
3389        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3390
3391        // Confirming the code action will trigger a resolve request.
3392        let confirm_action = workspace_b
3393            .update(cx_b, |workspace, cx| {
3394                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3395            })
3396            .unwrap();
3397        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3398            lsp::CodeAction {
3399                title: "Inline into all callers".to_string(),
3400                edit: Some(lsp::WorkspaceEdit {
3401                    changes: Some(
3402                        [
3403                            (
3404                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3405                                vec![lsp::TextEdit::new(
3406                                    lsp::Range::new(
3407                                        lsp::Position::new(1, 22),
3408                                        lsp::Position::new(1, 34),
3409                                    ),
3410                                    "4".to_string(),
3411                                )],
3412                            ),
3413                            (
3414                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3415                                vec![lsp::TextEdit::new(
3416                                    lsp::Range::new(
3417                                        lsp::Position::new(0, 0),
3418                                        lsp::Position::new(0, 27),
3419                                    ),
3420                                    "".to_string(),
3421                                )],
3422                            ),
3423                        ]
3424                        .into_iter()
3425                        .collect(),
3426                    ),
3427                    ..Default::default()
3428                }),
3429                ..Default::default()
3430            }
3431        });
3432
3433        // After the action is confirmed, an editor containing both modified files is opened.
3434        confirm_action.await.unwrap();
3435        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3436            workspace
3437                .active_item(cx)
3438                .unwrap()
3439                .downcast::<Editor>()
3440                .unwrap()
3441        });
3442        code_action_editor.update(cx_b, |editor, cx| {
3443            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3444            editor.undo(&Undo, cx);
3445            assert_eq!(
3446                editor.text(cx),
3447                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3448            );
3449            editor.redo(&Redo, cx);
3450            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3451        });
3452    }
3453
3454    #[gpui::test(iterations = 10)]
3455    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3456        cx_a.foreground().forbid_parking();
3457        let mut lang_registry = Arc::new(LanguageRegistry::test());
3458        let fs = FakeFs::new(cx_a.background());
3459        cx_b.update(|cx| editor::init(cx));
3460
3461        // Set up a fake language server.
3462        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3463        Arc::get_mut(&mut lang_registry)
3464            .unwrap()
3465            .add(Arc::new(Language::new(
3466                LanguageConfig {
3467                    name: "Rust".into(),
3468                    path_suffixes: vec!["rs".to_string()],
3469                    language_server: Some(language_server_config),
3470                    ..Default::default()
3471                },
3472                Some(tree_sitter_rust::language()),
3473            )));
3474
3475        // Connect to a server as 2 clients.
3476        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3477        let client_a = server.create_client(cx_a, "user_a").await;
3478        let client_b = server.create_client(cx_b, "user_b").await;
3479
3480        // Share a project as client A
3481        fs.insert_tree(
3482            "/dir",
3483            json!({
3484                ".zed.toml": r#"collaborators = ["user_b"]"#,
3485                "one.rs": "const ONE: usize = 1;",
3486                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3487            }),
3488        )
3489        .await;
3490        let project_a = cx_a.update(|cx| {
3491            Project::local(
3492                client_a.clone(),
3493                client_a.user_store.clone(),
3494                lang_registry.clone(),
3495                fs.clone(),
3496                cx,
3497            )
3498        });
3499        let (worktree_a, _) = project_a
3500            .update(cx_a, |p, cx| {
3501                p.find_or_create_local_worktree("/dir", true, cx)
3502            })
3503            .await
3504            .unwrap();
3505        worktree_a
3506            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3507            .await;
3508        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3509        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3510        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3511
3512        // Join the worktree as client B.
3513        let project_b = Project::remote(
3514            project_id,
3515            client_b.clone(),
3516            client_b.user_store.clone(),
3517            lang_registry.clone(),
3518            fs.clone(),
3519            &mut cx_b.to_async(),
3520        )
3521        .await
3522        .unwrap();
3523        let mut params = cx_b.update(WorkspaceParams::test);
3524        params.languages = lang_registry.clone();
3525        params.client = client_b.client.clone();
3526        params.user_store = client_b.user_store.clone();
3527        params.project = project_b;
3528
3529        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3530        let editor_b = workspace_b
3531            .update(cx_b, |workspace, cx| {
3532                workspace.open_path((worktree_id, "one.rs"), cx)
3533            })
3534            .await
3535            .unwrap()
3536            .downcast::<Editor>()
3537            .unwrap();
3538        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3539
3540        // Move cursor to a location that can be renamed.
3541        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3542            editor.select_ranges([7..7], None, cx);
3543            editor.rename(&Rename, cx).unwrap()
3544        });
3545
3546        fake_language_server
3547            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3548                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3549                assert_eq!(params.position, lsp::Position::new(0, 7));
3550                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3551                    lsp::Position::new(0, 6),
3552                    lsp::Position::new(0, 9),
3553                )))
3554            })
3555            .next()
3556            .await
3557            .unwrap();
3558        prepare_rename.await.unwrap();
3559        editor_b.update(cx_b, |editor, cx| {
3560            let rename = editor.pending_rename().unwrap();
3561            let buffer = editor.buffer().read(cx).snapshot(cx);
3562            assert_eq!(
3563                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3564                6..9
3565            );
3566            rename.editor.update(cx, |rename_editor, cx| {
3567                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3568                    rename_buffer.edit([0..3], "THREE", cx);
3569                });
3570            });
3571        });
3572
3573        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3574            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3575        });
3576        fake_language_server
3577            .handle_request::<lsp::request::Rename, _>(|params, _| {
3578                assert_eq!(
3579                    params.text_document_position.text_document.uri.as_str(),
3580                    "file:///dir/one.rs"
3581                );
3582                assert_eq!(
3583                    params.text_document_position.position,
3584                    lsp::Position::new(0, 6)
3585                );
3586                assert_eq!(params.new_name, "THREE");
3587                Some(lsp::WorkspaceEdit {
3588                    changes: Some(
3589                        [
3590                            (
3591                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3592                                vec![lsp::TextEdit::new(
3593                                    lsp::Range::new(
3594                                        lsp::Position::new(0, 6),
3595                                        lsp::Position::new(0, 9),
3596                                    ),
3597                                    "THREE".to_string(),
3598                                )],
3599                            ),
3600                            (
3601                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3602                                vec![
3603                                    lsp::TextEdit::new(
3604                                        lsp::Range::new(
3605                                            lsp::Position::new(0, 24),
3606                                            lsp::Position::new(0, 27),
3607                                        ),
3608                                        "THREE".to_string(),
3609                                    ),
3610                                    lsp::TextEdit::new(
3611                                        lsp::Range::new(
3612                                            lsp::Position::new(0, 35),
3613                                            lsp::Position::new(0, 38),
3614                                        ),
3615                                        "THREE".to_string(),
3616                                    ),
3617                                ],
3618                            ),
3619                        ]
3620                        .into_iter()
3621                        .collect(),
3622                    ),
3623                    ..Default::default()
3624                })
3625            })
3626            .next()
3627            .await
3628            .unwrap();
3629        confirm_rename.await.unwrap();
3630
3631        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3632            workspace
3633                .active_item(cx)
3634                .unwrap()
3635                .downcast::<Editor>()
3636                .unwrap()
3637        });
3638        rename_editor.update(cx_b, |editor, cx| {
3639            assert_eq!(
3640                editor.text(cx),
3641                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3642            );
3643            editor.undo(&Undo, cx);
3644            assert_eq!(
3645                editor.text(cx),
3646                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3647            );
3648            editor.redo(&Redo, cx);
3649            assert_eq!(
3650                editor.text(cx),
3651                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3652            );
3653        });
3654
3655        // Ensure temporary rename edits cannot be undone/redone.
3656        editor_b.update(cx_b, |editor, cx| {
3657            editor.undo(&Undo, cx);
3658            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3659            editor.undo(&Undo, cx);
3660            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3661            editor.redo(&Redo, cx);
3662            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3663        })
3664    }
3665
3666    #[gpui::test(iterations = 10)]
3667    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3668        cx_a.foreground().forbid_parking();
3669
3670        // Connect to a server as 2 clients.
3671        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3672        let client_a = server.create_client(cx_a, "user_a").await;
3673        let client_b = server.create_client(cx_b, "user_b").await;
3674
3675        // Create an org that includes these 2 users.
3676        let db = &server.app_state.db;
3677        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3678        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3679            .await
3680            .unwrap();
3681        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3682            .await
3683            .unwrap();
3684
3685        // Create a channel that includes all the users.
3686        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3687        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3688            .await
3689            .unwrap();
3690        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3691            .await
3692            .unwrap();
3693        db.create_channel_message(
3694            channel_id,
3695            client_b.current_user_id(&cx_b),
3696            "hello A, it's B.",
3697            OffsetDateTime::now_utc(),
3698            1,
3699        )
3700        .await
3701        .unwrap();
3702
3703        let channels_a = cx_a
3704            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3705        channels_a
3706            .condition(cx_a, |list, _| list.available_channels().is_some())
3707            .await;
3708        channels_a.read_with(cx_a, |list, _| {
3709            assert_eq!(
3710                list.available_channels().unwrap(),
3711                &[ChannelDetails {
3712                    id: channel_id.to_proto(),
3713                    name: "test-channel".to_string()
3714                }]
3715            )
3716        });
3717        let channel_a = channels_a.update(cx_a, |this, cx| {
3718            this.get_channel(channel_id.to_proto(), cx).unwrap()
3719        });
3720        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3721        channel_a
3722            .condition(&cx_a, |channel, _| {
3723                channel_messages(channel)
3724                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3725            })
3726            .await;
3727
3728        let channels_b = cx_b
3729            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3730        channels_b
3731            .condition(cx_b, |list, _| list.available_channels().is_some())
3732            .await;
3733        channels_b.read_with(cx_b, |list, _| {
3734            assert_eq!(
3735                list.available_channels().unwrap(),
3736                &[ChannelDetails {
3737                    id: channel_id.to_proto(),
3738                    name: "test-channel".to_string()
3739                }]
3740            )
3741        });
3742
3743        let channel_b = channels_b.update(cx_b, |this, cx| {
3744            this.get_channel(channel_id.to_proto(), cx).unwrap()
3745        });
3746        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3747        channel_b
3748            .condition(&cx_b, |channel, _| {
3749                channel_messages(channel)
3750                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3751            })
3752            .await;
3753
3754        channel_a
3755            .update(cx_a, |channel, cx| {
3756                channel
3757                    .send_message("oh, hi B.".to_string(), cx)
3758                    .unwrap()
3759                    .detach();
3760                let task = channel.send_message("sup".to_string(), cx).unwrap();
3761                assert_eq!(
3762                    channel_messages(channel),
3763                    &[
3764                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3765                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3766                        ("user_a".to_string(), "sup".to_string(), true)
3767                    ]
3768                );
3769                task
3770            })
3771            .await
3772            .unwrap();
3773
3774        channel_b
3775            .condition(&cx_b, |channel, _| {
3776                channel_messages(channel)
3777                    == [
3778                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3779                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3780                        ("user_a".to_string(), "sup".to_string(), false),
3781                    ]
3782            })
3783            .await;
3784
3785        assert_eq!(
3786            server
3787                .state()
3788                .await
3789                .channel(channel_id)
3790                .unwrap()
3791                .connection_ids
3792                .len(),
3793            2
3794        );
3795        cx_b.update(|_| drop(channel_b));
3796        server
3797            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3798            .await;
3799
3800        cx_a.update(|_| drop(channel_a));
3801        server
3802            .condition(|state| state.channel(channel_id).is_none())
3803            .await;
3804    }
3805
3806    #[gpui::test(iterations = 10)]
3807    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3808        cx_a.foreground().forbid_parking();
3809
3810        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3811        let client_a = server.create_client(cx_a, "user_a").await;
3812
3813        let db = &server.app_state.db;
3814        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3815        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3816        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3817            .await
3818            .unwrap();
3819        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3820            .await
3821            .unwrap();
3822
3823        let channels_a = cx_a
3824            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3825        channels_a
3826            .condition(cx_a, |list, _| list.available_channels().is_some())
3827            .await;
3828        let channel_a = channels_a.update(cx_a, |this, cx| {
3829            this.get_channel(channel_id.to_proto(), cx).unwrap()
3830        });
3831
3832        // Messages aren't allowed to be too long.
3833        channel_a
3834            .update(cx_a, |channel, cx| {
3835                let long_body = "this is long.\n".repeat(1024);
3836                channel.send_message(long_body, cx).unwrap()
3837            })
3838            .await
3839            .unwrap_err();
3840
3841        // Messages aren't allowed to be blank.
3842        channel_a.update(cx_a, |channel, cx| {
3843            channel.send_message(String::new(), cx).unwrap_err()
3844        });
3845
3846        // Leading and trailing whitespace are trimmed.
3847        channel_a
3848            .update(cx_a, |channel, cx| {
3849                channel
3850                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3851                    .unwrap()
3852            })
3853            .await
3854            .unwrap();
3855        assert_eq!(
3856            db.get_channel_messages(channel_id, 10, None)
3857                .await
3858                .unwrap()
3859                .iter()
3860                .map(|m| &m.body)
3861                .collect::<Vec<_>>(),
3862            &["surrounded by whitespace"]
3863        );
3864    }
3865
3866    #[gpui::test(iterations = 10)]
3867    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3868        cx_a.foreground().forbid_parking();
3869
3870        // Connect to a server as 2 clients.
3871        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3872        let client_a = server.create_client(cx_a, "user_a").await;
3873        let client_b = server.create_client(cx_b, "user_b").await;
3874        let mut status_b = client_b.status();
3875
3876        // Create an org that includes these 2 users.
3877        let db = &server.app_state.db;
3878        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3879        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3880            .await
3881            .unwrap();
3882        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3883            .await
3884            .unwrap();
3885
3886        // Create a channel that includes all the users.
3887        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3888        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3889            .await
3890            .unwrap();
3891        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3892            .await
3893            .unwrap();
3894        db.create_channel_message(
3895            channel_id,
3896            client_b.current_user_id(&cx_b),
3897            "hello A, it's B.",
3898            OffsetDateTime::now_utc(),
3899            2,
3900        )
3901        .await
3902        .unwrap();
3903
3904        let channels_a = cx_a
3905            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3906        channels_a
3907            .condition(cx_a, |list, _| list.available_channels().is_some())
3908            .await;
3909
3910        channels_a.read_with(cx_a, |list, _| {
3911            assert_eq!(
3912                list.available_channels().unwrap(),
3913                &[ChannelDetails {
3914                    id: channel_id.to_proto(),
3915                    name: "test-channel".to_string()
3916                }]
3917            )
3918        });
3919        let channel_a = channels_a.update(cx_a, |this, cx| {
3920            this.get_channel(channel_id.to_proto(), cx).unwrap()
3921        });
3922        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3923        channel_a
3924            .condition(&cx_a, |channel, _| {
3925                channel_messages(channel)
3926                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3927            })
3928            .await;
3929
3930        let channels_b = cx_b
3931            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3932        channels_b
3933            .condition(cx_b, |list, _| list.available_channels().is_some())
3934            .await;
3935        channels_b.read_with(cx_b, |list, _| {
3936            assert_eq!(
3937                list.available_channels().unwrap(),
3938                &[ChannelDetails {
3939                    id: channel_id.to_proto(),
3940                    name: "test-channel".to_string()
3941                }]
3942            )
3943        });
3944
3945        let channel_b = channels_b.update(cx_b, |this, cx| {
3946            this.get_channel(channel_id.to_proto(), cx).unwrap()
3947        });
3948        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3949        channel_b
3950            .condition(&cx_b, |channel, _| {
3951                channel_messages(channel)
3952                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3953            })
3954            .await;
3955
3956        // Disconnect client B, ensuring we can still access its cached channel data.
3957        server.forbid_connections();
3958        server.disconnect_client(client_b.current_user_id(&cx_b));
3959        cx_b.foreground().advance_clock(Duration::from_secs(3));
3960        while !matches!(
3961            status_b.next().await,
3962            Some(client::Status::ReconnectionError { .. })
3963        ) {}
3964
3965        channels_b.read_with(cx_b, |channels, _| {
3966            assert_eq!(
3967                channels.available_channels().unwrap(),
3968                [ChannelDetails {
3969                    id: channel_id.to_proto(),
3970                    name: "test-channel".to_string()
3971                }]
3972            )
3973        });
3974        channel_b.read_with(cx_b, |channel, _| {
3975            assert_eq!(
3976                channel_messages(channel),
3977                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3978            )
3979        });
3980
3981        // Send a message from client B while it is disconnected.
3982        channel_b
3983            .update(cx_b, |channel, cx| {
3984                let task = channel
3985                    .send_message("can you see this?".to_string(), cx)
3986                    .unwrap();
3987                assert_eq!(
3988                    channel_messages(channel),
3989                    &[
3990                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3991                        ("user_b".to_string(), "can you see this?".to_string(), true)
3992                    ]
3993                );
3994                task
3995            })
3996            .await
3997            .unwrap_err();
3998
3999        // Send a message from client A while B is disconnected.
4000        channel_a
4001            .update(cx_a, |channel, cx| {
4002                channel
4003                    .send_message("oh, hi B.".to_string(), cx)
4004                    .unwrap()
4005                    .detach();
4006                let task = channel.send_message("sup".to_string(), cx).unwrap();
4007                assert_eq!(
4008                    channel_messages(channel),
4009                    &[
4010                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4011                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
4012                        ("user_a".to_string(), "sup".to_string(), true)
4013                    ]
4014                );
4015                task
4016            })
4017            .await
4018            .unwrap();
4019
4020        // Give client B a chance to reconnect.
4021        server.allow_connections();
4022        cx_b.foreground().advance_clock(Duration::from_secs(10));
4023
4024        // Verify that B sees the new messages upon reconnection, as well as the message client B
4025        // sent while offline.
4026        channel_b
4027            .condition(&cx_b, |channel, _| {
4028                channel_messages(channel)
4029                    == [
4030                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4031                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4032                        ("user_a".to_string(), "sup".to_string(), false),
4033                        ("user_b".to_string(), "can you see this?".to_string(), false),
4034                    ]
4035            })
4036            .await;
4037
4038        // Ensure client A and B can communicate normally after reconnection.
4039        channel_a
4040            .update(cx_a, |channel, cx| {
4041                channel.send_message("you online?".to_string(), cx).unwrap()
4042            })
4043            .await
4044            .unwrap();
4045        channel_b
4046            .condition(&cx_b, |channel, _| {
4047                channel_messages(channel)
4048                    == [
4049                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4050                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4051                        ("user_a".to_string(), "sup".to_string(), false),
4052                        ("user_b".to_string(), "can you see this?".to_string(), false),
4053                        ("user_a".to_string(), "you online?".to_string(), false),
4054                    ]
4055            })
4056            .await;
4057
4058        channel_b
4059            .update(cx_b, |channel, cx| {
4060                channel.send_message("yep".to_string(), cx).unwrap()
4061            })
4062            .await
4063            .unwrap();
4064        channel_a
4065            .condition(&cx_a, |channel, _| {
4066                channel_messages(channel)
4067                    == [
4068                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4069                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4070                        ("user_a".to_string(), "sup".to_string(), false),
4071                        ("user_b".to_string(), "can you see this?".to_string(), false),
4072                        ("user_a".to_string(), "you online?".to_string(), false),
4073                        ("user_b".to_string(), "yep".to_string(), false),
4074                    ]
4075            })
4076            .await;
4077    }
4078
4079    #[gpui::test(iterations = 10)]
4080    async fn test_contacts(
4081        cx_a: &mut TestAppContext,
4082        cx_b: &mut TestAppContext,
4083        cx_c: &mut TestAppContext,
4084    ) {
4085        cx_a.foreground().forbid_parking();
4086        let lang_registry = Arc::new(LanguageRegistry::test());
4087        let fs = FakeFs::new(cx_a.background());
4088
4089        // Connect to a server as 3 clients.
4090        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4091        let client_a = server.create_client(cx_a, "user_a").await;
4092        let client_b = server.create_client(cx_b, "user_b").await;
4093        let client_c = server.create_client(cx_c, "user_c").await;
4094
4095        // Share a worktree as client A.
4096        fs.insert_tree(
4097            "/a",
4098            json!({
4099                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4100            }),
4101        )
4102        .await;
4103
4104        let project_a = cx_a.update(|cx| {
4105            Project::local(
4106                client_a.clone(),
4107                client_a.user_store.clone(),
4108                lang_registry.clone(),
4109                fs.clone(),
4110                cx,
4111            )
4112        });
4113        let (worktree_a, _) = project_a
4114            .update(cx_a, |p, cx| {
4115                p.find_or_create_local_worktree("/a", true, cx)
4116            })
4117            .await
4118            .unwrap();
4119        worktree_a
4120            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4121            .await;
4122
4123        client_a
4124            .user_store
4125            .condition(&cx_a, |user_store, _| {
4126                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4127            })
4128            .await;
4129        client_b
4130            .user_store
4131            .condition(&cx_b, |user_store, _| {
4132                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4133            })
4134            .await;
4135        client_c
4136            .user_store
4137            .condition(&cx_c, |user_store, _| {
4138                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4139            })
4140            .await;
4141
4142        let project_id = project_a
4143            .update(cx_a, |project, _| project.next_remote_id())
4144            .await;
4145        project_a
4146            .update(cx_a, |project, cx| project.share(cx))
4147            .await
4148            .unwrap();
4149
4150        let _project_b = Project::remote(
4151            project_id,
4152            client_b.clone(),
4153            client_b.user_store.clone(),
4154            lang_registry.clone(),
4155            fs.clone(),
4156            &mut cx_b.to_async(),
4157        )
4158        .await
4159        .unwrap();
4160
4161        client_a
4162            .user_store
4163            .condition(&cx_a, |user_store, _| {
4164                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4165            })
4166            .await;
4167        client_b
4168            .user_store
4169            .condition(&cx_b, |user_store, _| {
4170                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4171            })
4172            .await;
4173        client_c
4174            .user_store
4175            .condition(&cx_c, |user_store, _| {
4176                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4177            })
4178            .await;
4179
4180        project_a
4181            .condition(&cx_a, |project, _| {
4182                project.collaborators().contains_key(&client_b.peer_id)
4183            })
4184            .await;
4185
4186        cx_a.update(move |_| drop(project_a));
4187        client_a
4188            .user_store
4189            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4190            .await;
4191        client_b
4192            .user_store
4193            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4194            .await;
4195        client_c
4196            .user_store
4197            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4198            .await;
4199
4200        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4201            user_store
4202                .contacts()
4203                .iter()
4204                .map(|contact| {
4205                    let worktrees = contact
4206                        .projects
4207                        .iter()
4208                        .map(|p| {
4209                            (
4210                                p.worktree_root_names[0].as_str(),
4211                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4212                            )
4213                        })
4214                        .collect();
4215                    (contact.user.github_login.as_str(), worktrees)
4216                })
4217                .collect()
4218        }
4219    }
4220
4221    #[gpui::test(iterations = 10)]
4222    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4223        cx_a.foreground().forbid_parking();
4224        let fs = FakeFs::new(cx_a.background());
4225
4226        // 2 clients connect to a server.
4227        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4228        let mut client_a = server.create_client(cx_a, "user_a").await;
4229        let mut client_b = server.create_client(cx_b, "user_b").await;
4230        cx_a.update(editor::init);
4231        cx_b.update(editor::init);
4232
4233        // Client A shares a project.
4234        fs.insert_tree(
4235            "/a",
4236            json!({
4237                ".zed.toml": r#"collaborators = ["user_b"]"#,
4238                "1.txt": "one",
4239                "2.txt": "two",
4240                "3.txt": "three",
4241            }),
4242        )
4243        .await;
4244        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4245        project_a
4246            .update(cx_a, |project, cx| project.share(cx))
4247            .await
4248            .unwrap();
4249
4250        // Client B joins the project.
4251        let project_b = client_b
4252            .build_remote_project(
4253                project_a
4254                    .read_with(cx_a, |project, _| project.remote_id())
4255                    .unwrap(),
4256                cx_b,
4257            )
4258            .await;
4259
4260        // Client A opens some editors.
4261        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4262        let pane_a = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4263        let editor_a1 = workspace_a
4264            .update(cx_a, |workspace, cx| {
4265                workspace.open_path((worktree_id, "1.txt"), cx)
4266            })
4267            .await
4268            .unwrap()
4269            .downcast::<Editor>()
4270            .unwrap();
4271        let editor_a2 = workspace_a
4272            .update(cx_a, |workspace, cx| {
4273                workspace.open_path((worktree_id, "2.txt"), cx)
4274            })
4275            .await
4276            .unwrap()
4277            .downcast::<Editor>()
4278            .unwrap();
4279
4280        // Client B opens an editor.
4281        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4282        let editor_b1 = workspace_b
4283            .update(cx_b, |workspace, cx| {
4284                workspace.open_path((worktree_id, "1.txt"), cx)
4285            })
4286            .await
4287            .unwrap()
4288            .downcast::<Editor>()
4289            .unwrap();
4290
4291        let client_a_id = project_b.read_with(cx_b, |project, _| {
4292            project.collaborators().values().next().unwrap().peer_id
4293        });
4294        let client_b_id = project_a.read_with(cx_a, |project, _| {
4295            project.collaborators().values().next().unwrap().peer_id
4296        });
4297
4298        // When client B starts following client A, all visible view states are replicated to client B.
4299        editor_a1.update(cx_a, |editor, cx| editor.select_ranges([0..1], None, cx));
4300        editor_a2.update(cx_a, |editor, cx| editor.select_ranges([2..3], None, cx));
4301        workspace_b
4302            .update(cx_b, |workspace, cx| {
4303                workspace.toggle_follow(&client_a_id.into(), cx).unwrap()
4304            })
4305            .await
4306            .unwrap();
4307        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4308            workspace
4309                .active_item(cx)
4310                .unwrap()
4311                .downcast::<Editor>()
4312                .unwrap()
4313        });
4314        assert!(cx_b.read(|cx| editor_b2.is_focused(cx)));
4315        assert_eq!(
4316            editor_b2.read_with(cx_b, |editor, cx| editor.project_path(cx)),
4317            Some((worktree_id, "2.txt").into())
4318        );
4319        assert_eq!(
4320            editor_b2.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4321            vec![2..3]
4322        );
4323        assert_eq!(
4324            editor_b1.read_with(cx_b, |editor, cx| editor.selected_ranges(cx)),
4325            vec![0..1]
4326        );
4327
4328        // When client A activates a different editor, client B does so as well.
4329        workspace_a.update(cx_a, |workspace, cx| {
4330            workspace.activate_item(&editor_a1, cx)
4331        });
4332        workspace_b
4333            .condition(cx_b, |workspace, cx| {
4334                workspace.active_item(cx).unwrap().id() == editor_b1.id()
4335            })
4336            .await;
4337
4338        // Changes to client A's editor are reflected on client B.
4339        editor_a1.update(cx_a, |editor, cx| {
4340            editor.select_ranges([1..1, 2..2], None, cx);
4341        });
4342        editor_b1
4343            .condition(cx_b, |editor, cx| {
4344                editor.selected_ranges(cx) == vec![1..1, 2..2]
4345            })
4346            .await;
4347
4348        editor_a1.update(cx_a, |editor, cx| editor.set_text("TWO", cx));
4349        editor_b1
4350            .condition(cx_b, |editor, cx| editor.text(cx) == "TWO")
4351            .await;
4352
4353        editor_a1.update(cx_a, |editor, cx| {
4354            editor.select_ranges([3..3], None, cx);
4355            editor.set_scroll_position(vec2f(0., 100.), cx);
4356        });
4357        editor_b1
4358            .condition(cx_b, |editor, cx| editor.selected_ranges(cx) == vec![3..3])
4359            .await;
4360
4361        // After unfollowing, client B stops receiving updates from client A.
4362        workspace_b.update(cx_b, |workspace, cx| {
4363            workspace.unfollow(&workspace.active_pane().clone(), cx)
4364        });
4365        workspace_a.update(cx_a, |workspace, cx| {
4366            workspace.activate_item(&editor_a2, cx)
4367        });
4368        cx_a.foreground().run_until_parked();
4369        assert_eq!(
4370            workspace_b.read_with(cx_b, |workspace, cx| workspace
4371                .active_item(cx)
4372                .unwrap()
4373                .id()),
4374            editor_b1.id()
4375        );
4376
4377        // Client A starts following client B.
4378        workspace_a
4379            .update(cx_a, |workspace, cx| {
4380                workspace.toggle_follow(&client_b_id.into(), cx).unwrap()
4381            })
4382            .await
4383            .unwrap();
4384        assert_eq!(
4385            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4386            Some(client_b_id)
4387        );
4388        assert_eq!(
4389            workspace_a.read_with(cx_a, |workspace, cx| workspace
4390                .active_item(cx)
4391                .unwrap()
4392                .id()),
4393            editor_a1.id()
4394        );
4395
4396        // Following interrupts when client B disconnects.
4397        client_b.disconnect(&cx_b.to_async()).unwrap();
4398        cx_a.foreground().run_until_parked();
4399        assert_eq!(
4400            workspace_a.read_with(cx_a, |workspace, _| workspace.leader_for_pane(&pane_a)),
4401            None
4402        );
4403    }
4404
4405    #[gpui::test(iterations = 10)]
4406    async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4407        cx_a.foreground().forbid_parking();
4408        let fs = FakeFs::new(cx_a.background());
4409
4410        // 2 clients connect to a server.
4411        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4412        let mut client_a = server.create_client(cx_a, "user_a").await;
4413        let mut client_b = server.create_client(cx_b, "user_b").await;
4414        cx_a.update(editor::init);
4415        cx_b.update(editor::init);
4416
4417        // Client A shares a project.
4418        fs.insert_tree(
4419            "/a",
4420            json!({
4421                ".zed.toml": r#"collaborators = ["user_b"]"#,
4422                "1.txt": "one",
4423                "2.txt": "two",
4424                "3.txt": "three",
4425                "4.txt": "four",
4426            }),
4427        )
4428        .await;
4429        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4430        project_a
4431            .update(cx_a, |project, cx| project.share(cx))
4432            .await
4433            .unwrap();
4434
4435        // Client B joins the project.
4436        let project_b = client_b
4437            .build_remote_project(
4438                project_a
4439                    .read_with(cx_a, |project, _| project.remote_id())
4440                    .unwrap(),
4441                cx_b,
4442            )
4443            .await;
4444
4445        // Client A opens some editors.
4446        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4447        let pane_a1 = workspace_a.read_with(cx_a, |workspace, _| workspace.active_pane().clone());
4448        let _editor_a1 = workspace_a
4449            .update(cx_a, |workspace, cx| {
4450                workspace.open_path((worktree_id, "1.txt"), cx)
4451            })
4452            .await
4453            .unwrap()
4454            .downcast::<Editor>()
4455            .unwrap();
4456
4457        // Client B opens an editor.
4458        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4459        let pane_b1 = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4460        let _editor_b1 = workspace_b
4461            .update(cx_b, |workspace, cx| {
4462                workspace.open_path((worktree_id, "2.txt"), cx)
4463            })
4464            .await
4465            .unwrap()
4466            .downcast::<Editor>()
4467            .unwrap();
4468
4469        // Clients A and B follow each other in split panes
4470        workspace_a
4471            .update(cx_a, |workspace, cx| {
4472                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4473                assert_ne!(*workspace.active_pane(), pane_a1);
4474                let leader_id = *project_a.read(cx).collaborators().keys().next().unwrap();
4475                workspace
4476                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4477                    .unwrap()
4478            })
4479            .await
4480            .unwrap();
4481        workspace_b
4482            .update(cx_b, |workspace, cx| {
4483                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4484                assert_ne!(*workspace.active_pane(), pane_b1);
4485                let leader_id = *project_b.read(cx).collaborators().keys().next().unwrap();
4486                workspace
4487                    .toggle_follow(&workspace::ToggleFollow(leader_id), cx)
4488                    .unwrap()
4489            })
4490            .await
4491            .unwrap();
4492
4493        workspace_a
4494            .update(cx_a, |workspace, cx| {
4495                workspace.activate_next_pane(cx);
4496                assert_eq!(*workspace.active_pane(), pane_a1);
4497                workspace.open_path((worktree_id, "3.txt"), cx)
4498            })
4499            .await
4500            .unwrap();
4501        workspace_b
4502            .update(cx_b, |workspace, cx| {
4503                workspace.activate_next_pane(cx);
4504                assert_eq!(*workspace.active_pane(), pane_b1);
4505                workspace.open_path((worktree_id, "4.txt"), cx)
4506            })
4507            .await
4508            .unwrap();
4509        cx_a.foreground().run_until_parked();
4510
4511        // Ensure leader updates don't change the active pane of followers
4512        workspace_a.read_with(cx_a, |workspace, _| {
4513            assert_eq!(*workspace.active_pane(), pane_a1);
4514        });
4515        workspace_b.read_with(cx_b, |workspace, _| {
4516            assert_eq!(*workspace.active_pane(), pane_b1);
4517        });
4518
4519        // Ensure peers following each other doesn't cause an infinite loop.
4520        assert_eq!(
4521            workspace_a.read_with(cx_a, |workspace, cx| workspace
4522                .active_item(cx)
4523                .unwrap()
4524                .project_path(cx)),
4525            Some((worktree_id, "3.txt").into())
4526        );
4527        workspace_a.update(cx_a, |workspace, cx| {
4528            assert_eq!(
4529                workspace.active_item(cx).unwrap().project_path(cx),
4530                Some((worktree_id, "3.txt").into())
4531            );
4532            workspace.activate_next_pane(cx);
4533            assert_eq!(
4534                workspace.active_item(cx).unwrap().project_path(cx),
4535                Some((worktree_id, "4.txt").into())
4536            );
4537        });
4538        workspace_b.update(cx_b, |workspace, cx| {
4539            assert_eq!(
4540                workspace.active_item(cx).unwrap().project_path(cx),
4541                Some((worktree_id, "4.txt").into())
4542            );
4543            workspace.activate_next_pane(cx);
4544            assert_eq!(
4545                workspace.active_item(cx).unwrap().project_path(cx),
4546                Some((worktree_id, "3.txt").into())
4547            );
4548        });
4549    }
4550
4551    #[gpui::test(iterations = 10)]
4552    async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4553        cx_a.foreground().forbid_parking();
4554        let fs = FakeFs::new(cx_a.background());
4555
4556        // 2 clients connect to a server.
4557        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4558        let mut client_a = server.create_client(cx_a, "user_a").await;
4559        let mut client_b = server.create_client(cx_b, "user_b").await;
4560        cx_a.update(editor::init);
4561        cx_b.update(editor::init);
4562
4563        // Client A shares a project.
4564        fs.insert_tree(
4565            "/a",
4566            json!({
4567                ".zed.toml": r#"collaborators = ["user_b"]"#,
4568                "1.txt": "one",
4569                "2.txt": "two",
4570                "3.txt": "three",
4571            }),
4572        )
4573        .await;
4574        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4575        project_a
4576            .update(cx_a, |project, cx| project.share(cx))
4577            .await
4578            .unwrap();
4579
4580        // Client B joins the project.
4581        let project_b = client_b
4582            .build_remote_project(
4583                project_a
4584                    .read_with(cx_a, |project, _| project.remote_id())
4585                    .unwrap(),
4586                cx_b,
4587            )
4588            .await;
4589
4590        // Client A opens some editors.
4591        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4592        let _editor_a1 = workspace_a
4593            .update(cx_a, |workspace, cx| {
4594                workspace.open_path((worktree_id, "1.txt"), cx)
4595            })
4596            .await
4597            .unwrap()
4598            .downcast::<Editor>()
4599            .unwrap();
4600
4601        // Client B starts following client A.
4602        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4603        let pane_b = workspace_b.read_with(cx_b, |workspace, _| workspace.active_pane().clone());
4604        let leader_id = project_b.read_with(cx_b, |project, _| {
4605            project.collaborators().values().next().unwrap().peer_id
4606        });
4607        workspace_b
4608            .update(cx_b, |workspace, cx| {
4609                workspace.toggle_follow(&leader_id.into(), cx).unwrap()
4610            })
4611            .await
4612            .unwrap();
4613        assert_eq!(
4614            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4615            Some(leader_id)
4616        );
4617        let editor_b2 = workspace_b.read_with(cx_b, |workspace, cx| {
4618            workspace
4619                .active_item(cx)
4620                .unwrap()
4621                .downcast::<Editor>()
4622                .unwrap()
4623        });
4624
4625        // When client B moves, it automatically stops following client A.
4626        editor_b2.update(cx_b, |editor, cx| editor.move_right(&editor::MoveRight, cx));
4627        assert_eq!(
4628            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4629            None
4630        );
4631
4632        workspace_b
4633            .update(cx_b, |workspace, cx| {
4634                workspace.toggle_follow(&leader_id.into(), cx).unwrap()
4635            })
4636            .await
4637            .unwrap();
4638        assert_eq!(
4639            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4640            Some(leader_id)
4641        );
4642
4643        // When client B edits, it automatically stops following client A.
4644        editor_b2.update(cx_b, |editor, cx| editor.insert("X", cx));
4645        assert_eq!(
4646            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4647            None
4648        );
4649
4650        workspace_b
4651            .update(cx_b, |workspace, cx| {
4652                workspace.toggle_follow(&leader_id.into(), cx).unwrap()
4653            })
4654            .await
4655            .unwrap();
4656        assert_eq!(
4657            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4658            Some(leader_id)
4659        );
4660
4661        // When client B scrolls, it automatically stops following client A.
4662        editor_b2.update(cx_b, |editor, cx| {
4663            editor.set_scroll_position(vec2f(0., 3.), cx)
4664        });
4665        assert_eq!(
4666            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4667            None
4668        );
4669
4670        workspace_b
4671            .update(cx_b, |workspace, cx| {
4672                workspace.toggle_follow(&leader_id.into(), cx).unwrap()
4673            })
4674            .await
4675            .unwrap();
4676        assert_eq!(
4677            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4678            Some(leader_id)
4679        );
4680
4681        // When client B activates a different pane, it continues following client A in the original pane.
4682        workspace_b.update(cx_b, |workspace, cx| {
4683            workspace.split_pane(pane_b.clone(), SplitDirection::Right, cx)
4684        });
4685        assert_eq!(
4686            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4687            Some(leader_id)
4688        );
4689
4690        workspace_b.update(cx_b, |workspace, cx| workspace.activate_next_pane(cx));
4691        assert_eq!(
4692            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4693            Some(leader_id)
4694        );
4695
4696        // When client B activates a different item in the original pane, it automatically stops following client A.
4697        workspace_b
4698            .update(cx_b, |workspace, cx| {
4699                workspace.open_path((worktree_id, "2.txt"), cx)
4700            })
4701            .await
4702            .unwrap();
4703        assert_eq!(
4704            workspace_b.read_with(cx_b, |workspace, _| workspace.leader_for_pane(&pane_b)),
4705            None
4706        );
4707    }
4708
4709    #[gpui::test(iterations = 100)]
4710    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4711        cx.foreground().forbid_parking();
4712        let max_peers = env::var("MAX_PEERS")
4713            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4714            .unwrap_or(5);
4715        let max_operations = env::var("OPERATIONS")
4716            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4717            .unwrap_or(10);
4718
4719        let rng = Arc::new(Mutex::new(rng));
4720
4721        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4722        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4723
4724        let fs = FakeFs::new(cx.background());
4725        fs.insert_tree(
4726            "/_collab",
4727            json!({
4728                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4729            }),
4730        )
4731        .await;
4732
4733        let operations = Rc::new(Cell::new(0));
4734        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4735        let mut clients = Vec::new();
4736
4737        let mut next_entity_id = 100000;
4738        let mut host_cx = TestAppContext::new(
4739            cx.foreground_platform(),
4740            cx.platform(),
4741            cx.foreground(),
4742            cx.background(),
4743            cx.font_cache(),
4744            cx.leak_detector(),
4745            next_entity_id,
4746        );
4747        let host = server.create_client(&mut host_cx, "host").await;
4748        let host_project = host_cx.update(|cx| {
4749            Project::local(
4750                host.client.clone(),
4751                host.user_store.clone(),
4752                Arc::new(LanguageRegistry::test()),
4753                fs.clone(),
4754                cx,
4755            )
4756        });
4757        let host_project_id = host_project
4758            .update(&mut host_cx, |p, _| p.next_remote_id())
4759            .await;
4760
4761        let (collab_worktree, _) = host_project
4762            .update(&mut host_cx, |project, cx| {
4763                project.find_or_create_local_worktree("/_collab", true, cx)
4764            })
4765            .await
4766            .unwrap();
4767        collab_worktree
4768            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4769            .await;
4770        host_project
4771            .update(&mut host_cx, |project, cx| project.share(cx))
4772            .await
4773            .unwrap();
4774
4775        clients.push(cx.foreground().spawn(host.simulate_host(
4776            host_project,
4777            language_server_config,
4778            operations.clone(),
4779            max_operations,
4780            rng.clone(),
4781            host_cx,
4782        )));
4783
4784        while operations.get() < max_operations {
4785            cx.background().simulate_random_delay().await;
4786            if clients.len() >= max_peers {
4787                break;
4788            } else if rng.lock().gen_bool(0.05) {
4789                operations.set(operations.get() + 1);
4790
4791                let guest_id = clients.len();
4792                log::info!("Adding guest {}", guest_id);
4793                next_entity_id += 100000;
4794                let mut guest_cx = TestAppContext::new(
4795                    cx.foreground_platform(),
4796                    cx.platform(),
4797                    cx.foreground(),
4798                    cx.background(),
4799                    cx.font_cache(),
4800                    cx.leak_detector(),
4801                    next_entity_id,
4802                );
4803                let guest = server
4804                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4805                    .await;
4806                let guest_project = Project::remote(
4807                    host_project_id,
4808                    guest.client.clone(),
4809                    guest.user_store.clone(),
4810                    guest_lang_registry.clone(),
4811                    FakeFs::new(cx.background()),
4812                    &mut guest_cx.to_async(),
4813                )
4814                .await
4815                .unwrap();
4816                clients.push(cx.foreground().spawn(guest.simulate_guest(
4817                    guest_id,
4818                    guest_project,
4819                    operations.clone(),
4820                    max_operations,
4821                    rng.clone(),
4822                    guest_cx,
4823                )));
4824
4825                log::info!("Guest {} added", guest_id);
4826            }
4827        }
4828
4829        let mut clients = futures::future::join_all(clients).await;
4830        cx.foreground().run_until_parked();
4831
4832        let (host_client, mut host_cx) = clients.remove(0);
4833        let host_project = host_client.project.as_ref().unwrap();
4834        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4835            project
4836                .worktrees(cx)
4837                .map(|worktree| {
4838                    let snapshot = worktree.read(cx).snapshot();
4839                    (snapshot.id(), snapshot)
4840                })
4841                .collect::<BTreeMap<_, _>>()
4842        });
4843
4844        host_client
4845            .project
4846            .as_ref()
4847            .unwrap()
4848            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4849
4850        for (guest_client, mut guest_cx) in clients.into_iter() {
4851            let guest_id = guest_client.client.id();
4852            let worktree_snapshots =
4853                guest_client
4854                    .project
4855                    .as_ref()
4856                    .unwrap()
4857                    .read_with(&guest_cx, |project, cx| {
4858                        project
4859                            .worktrees(cx)
4860                            .map(|worktree| {
4861                                let worktree = worktree.read(cx);
4862                                (worktree.id(), worktree.snapshot())
4863                            })
4864                            .collect::<BTreeMap<_, _>>()
4865                    });
4866
4867            assert_eq!(
4868                worktree_snapshots.keys().collect::<Vec<_>>(),
4869                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4870                "guest {} has different worktrees than the host",
4871                guest_id
4872            );
4873            for (id, host_snapshot) in &host_worktree_snapshots {
4874                let guest_snapshot = &worktree_snapshots[id];
4875                assert_eq!(
4876                    guest_snapshot.root_name(),
4877                    host_snapshot.root_name(),
4878                    "guest {} has different root name than the host for worktree {}",
4879                    guest_id,
4880                    id
4881                );
4882                assert_eq!(
4883                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4884                    host_snapshot.entries(false).collect::<Vec<_>>(),
4885                    "guest {} has different snapshot than the host for worktree {}",
4886                    guest_id,
4887                    id
4888                );
4889            }
4890
4891            guest_client
4892                .project
4893                .as_ref()
4894                .unwrap()
4895                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4896
4897            for guest_buffer in &guest_client.buffers {
4898                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4899                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4900                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4901                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4902                        guest_id, guest_client.peer_id, buffer_id
4903                    ))
4904                });
4905                let path = host_buffer
4906                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4907
4908                assert_eq!(
4909                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4910                    0,
4911                    "guest {}, buffer {}, path {:?} has deferred operations",
4912                    guest_id,
4913                    buffer_id,
4914                    path,
4915                );
4916                assert_eq!(
4917                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4918                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4919                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4920                    guest_id,
4921                    buffer_id,
4922                    path
4923                );
4924            }
4925
4926            guest_cx.update(|_| drop(guest_client));
4927        }
4928
4929        host_cx.update(|_| drop(host_client));
4930    }
4931
4932    struct TestServer {
4933        peer: Arc<Peer>,
4934        app_state: Arc<AppState>,
4935        server: Arc<Server>,
4936        foreground: Rc<executor::Foreground>,
4937        notifications: mpsc::UnboundedReceiver<()>,
4938        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4939        forbid_connections: Arc<AtomicBool>,
4940        _test_db: TestDb,
4941    }
4942
4943    impl TestServer {
4944        async fn start(
4945            foreground: Rc<executor::Foreground>,
4946            background: Arc<executor::Background>,
4947        ) -> Self {
4948            let test_db = TestDb::fake(background);
4949            let app_state = Self::build_app_state(&test_db).await;
4950            let peer = Peer::new();
4951            let notifications = mpsc::unbounded();
4952            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4953            Self {
4954                peer,
4955                app_state,
4956                server,
4957                foreground,
4958                notifications: notifications.1,
4959                connection_killers: Default::default(),
4960                forbid_connections: Default::default(),
4961                _test_db: test_db,
4962            }
4963        }
4964
4965        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4966            cx.update(|cx| {
4967                let settings = Settings::test(cx);
4968                cx.set_global(settings);
4969            });
4970
4971            let http = FakeHttpClient::with_404_response();
4972            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4973            let client_name = name.to_string();
4974            let mut client = Client::new(http.clone());
4975            let server = self.server.clone();
4976            let connection_killers = self.connection_killers.clone();
4977            let forbid_connections = self.forbid_connections.clone();
4978            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4979
4980            Arc::get_mut(&mut client)
4981                .unwrap()
4982                .override_authenticate(move |cx| {
4983                    cx.spawn(|_| async move {
4984                        let access_token = "the-token".to_string();
4985                        Ok(Credentials {
4986                            user_id: user_id.0 as u64,
4987                            access_token,
4988                        })
4989                    })
4990                })
4991                .override_establish_connection(move |credentials, cx| {
4992                    assert_eq!(credentials.user_id, user_id.0 as u64);
4993                    assert_eq!(credentials.access_token, "the-token");
4994
4995                    let server = server.clone();
4996                    let connection_killers = connection_killers.clone();
4997                    let forbid_connections = forbid_connections.clone();
4998                    let client_name = client_name.clone();
4999                    let connection_id_tx = connection_id_tx.clone();
5000                    cx.spawn(move |cx| async move {
5001                        if forbid_connections.load(SeqCst) {
5002                            Err(EstablishConnectionError::other(anyhow!(
5003                                "server is forbidding connections"
5004                            )))
5005                        } else {
5006                            let (client_conn, server_conn, kill_conn) =
5007                                Connection::in_memory(cx.background());
5008                            connection_killers.lock().insert(user_id, kill_conn);
5009                            cx.background()
5010                                .spawn(server.handle_connection(
5011                                    server_conn,
5012                                    client_name,
5013                                    user_id,
5014                                    Some(connection_id_tx),
5015                                    cx.background(),
5016                                ))
5017                                .detach();
5018                            Ok(client_conn)
5019                        }
5020                    })
5021                });
5022
5023            client
5024                .authenticate_and_connect(&cx.to_async())
5025                .await
5026                .unwrap();
5027
5028            Channel::init(&client);
5029            Project::init(&client);
5030            cx.update(|cx| {
5031                workspace::init(&client, cx);
5032            });
5033
5034            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
5035            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
5036
5037            let client = TestClient {
5038                client,
5039                peer_id,
5040                user_store,
5041                language_registry: Arc::new(LanguageRegistry::test()),
5042                project: Default::default(),
5043                buffers: Default::default(),
5044            };
5045            client.wait_for_current_user(cx).await;
5046            client
5047        }
5048
5049        fn disconnect_client(&self, user_id: UserId) {
5050            self.connection_killers.lock().remove(&user_id);
5051        }
5052
5053        fn forbid_connections(&self) {
5054            self.forbid_connections.store(true, SeqCst);
5055        }
5056
5057        fn allow_connections(&self) {
5058            self.forbid_connections.store(false, SeqCst);
5059        }
5060
5061        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
5062            let mut config = Config::default();
5063            config.session_secret = "a".repeat(32);
5064            config.database_url = test_db.url.clone();
5065            let github_client = github::AppClient::test();
5066            Arc::new(AppState {
5067                db: test_db.db().clone(),
5068                handlebars: Default::default(),
5069                auth_client: auth::build_client("", ""),
5070                repo_client: github::RepoClient::test(&github_client),
5071                github_client,
5072                config,
5073            })
5074        }
5075
5076        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
5077            self.server.store.read()
5078        }
5079
5080        async fn condition<F>(&mut self, mut predicate: F)
5081        where
5082            F: FnMut(&Store) -> bool,
5083        {
5084            async_std::future::timeout(Duration::from_millis(500), async {
5085                while !(predicate)(&*self.server.store.read()) {
5086                    self.foreground.start_waiting();
5087                    self.notifications.next().await;
5088                    self.foreground.finish_waiting();
5089                }
5090            })
5091            .await
5092            .expect("condition timed out");
5093        }
5094    }
5095
5096    impl Drop for TestServer {
5097        fn drop(&mut self) {
5098            self.peer.reset();
5099        }
5100    }
5101
5102    struct TestClient {
5103        client: Arc<Client>,
5104        pub peer_id: PeerId,
5105        pub user_store: ModelHandle<UserStore>,
5106        language_registry: Arc<LanguageRegistry>,
5107        project: Option<ModelHandle<Project>>,
5108        buffers: HashSet<ModelHandle<language::Buffer>>,
5109    }
5110
5111    impl Deref for TestClient {
5112        type Target = Arc<Client>;
5113
5114        fn deref(&self) -> &Self::Target {
5115            &self.client
5116        }
5117    }
5118
5119    impl TestClient {
5120        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
5121            UserId::from_proto(
5122                self.user_store
5123                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
5124            )
5125        }
5126
5127        async fn wait_for_current_user(&self, cx: &TestAppContext) {
5128            let mut authed_user = self
5129                .user_store
5130                .read_with(cx, |user_store, _| user_store.watch_current_user());
5131            while authed_user.next().await.unwrap().is_none() {}
5132        }
5133
5134        async fn build_local_project(
5135            &mut self,
5136            fs: Arc<FakeFs>,
5137            root_path: impl AsRef<Path>,
5138            cx: &mut TestAppContext,
5139        ) -> (ModelHandle<Project>, WorktreeId) {
5140            let project = cx.update(|cx| {
5141                Project::local(
5142                    self.client.clone(),
5143                    self.user_store.clone(),
5144                    self.language_registry.clone(),
5145                    fs,
5146                    cx,
5147                )
5148            });
5149            self.project = Some(project.clone());
5150            let (worktree, _) = project
5151                .update(cx, |p, cx| {
5152                    p.find_or_create_local_worktree(root_path, true, cx)
5153                })
5154                .await
5155                .unwrap();
5156            worktree
5157                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
5158                .await;
5159            project
5160                .update(cx, |project, _| project.next_remote_id())
5161                .await;
5162            (project, worktree.read_with(cx, |tree, _| tree.id()))
5163        }
5164
5165        async fn build_remote_project(
5166            &mut self,
5167            project_id: u64,
5168            cx: &mut TestAppContext,
5169        ) -> ModelHandle<Project> {
5170            let project = Project::remote(
5171                project_id,
5172                self.client.clone(),
5173                self.user_store.clone(),
5174                self.language_registry.clone(),
5175                FakeFs::new(cx.background()),
5176                &mut cx.to_async(),
5177            )
5178            .await
5179            .unwrap();
5180            self.project = Some(project.clone());
5181            project
5182        }
5183
5184        fn build_workspace(
5185            &self,
5186            project: &ModelHandle<Project>,
5187            cx: &mut TestAppContext,
5188        ) -> ViewHandle<Workspace> {
5189            let (window_id, _) = cx.add_window(|_| EmptyView);
5190            cx.add_view(window_id, |cx| {
5191                let fs = project.read(cx).fs().clone();
5192                Workspace::new(
5193                    &WorkspaceParams {
5194                        fs,
5195                        project: project.clone(),
5196                        user_store: self.user_store.clone(),
5197                        languages: self.language_registry.clone(),
5198                        channel_list: cx.add_model(|cx| {
5199                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
5200                        }),
5201                        client: self.client.clone(),
5202                    },
5203                    cx,
5204                )
5205            })
5206        }
5207
5208        fn simulate_host(
5209            mut self,
5210            project: ModelHandle<Project>,
5211            mut language_server_config: LanguageServerConfig,
5212            operations: Rc<Cell<usize>>,
5213            max_operations: usize,
5214            rng: Arc<Mutex<StdRng>>,
5215            mut cx: TestAppContext,
5216        ) -> impl Future<Output = (Self, TestAppContext)> {
5217            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
5218
5219            // Set up a fake language server.
5220            language_server_config.set_fake_initializer({
5221                let rng = rng.clone();
5222                let files = files.clone();
5223                let project = project.downgrade();
5224                move |fake_server| {
5225                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
5226                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
5227                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
5228                                range: lsp::Range::new(
5229                                    lsp::Position::new(0, 0),
5230                                    lsp::Position::new(0, 0),
5231                                ),
5232                                new_text: "the-new-text".to_string(),
5233                            })),
5234                            ..Default::default()
5235                        }]))
5236                    });
5237
5238                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
5239                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
5240                            lsp::CodeAction {
5241                                title: "the-code-action".to_string(),
5242                                ..Default::default()
5243                            },
5244                        )])
5245                    });
5246
5247                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
5248                        |params, _| {
5249                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
5250                                params.position,
5251                                params.position,
5252                            )))
5253                        },
5254                    );
5255
5256                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
5257                        let files = files.clone();
5258                        let rng = rng.clone();
5259                        move |_, _| {
5260                            let files = files.lock();
5261                            let mut rng = rng.lock();
5262                            let count = rng.gen_range::<usize, _>(1..3);
5263                            let files = (0..count)
5264                                .map(|_| files.choose(&mut *rng).unwrap())
5265                                .collect::<Vec<_>>();
5266                            log::info!("LSP: Returning definitions in files {:?}", &files);
5267                            Some(lsp::GotoDefinitionResponse::Array(
5268                                files
5269                                    .into_iter()
5270                                    .map(|file| lsp::Location {
5271                                        uri: lsp::Url::from_file_path(file).unwrap(),
5272                                        range: Default::default(),
5273                                    })
5274                                    .collect(),
5275                            ))
5276                        }
5277                    });
5278
5279                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
5280                        let rng = rng.clone();
5281                        let project = project.clone();
5282                        move |params, mut cx| {
5283                            if let Some(project) = project.upgrade(&cx) {
5284                                project.update(&mut cx, |project, cx| {
5285                                    let path = params
5286                                        .text_document_position_params
5287                                        .text_document
5288                                        .uri
5289                                        .to_file_path()
5290                                        .unwrap();
5291                                    let (worktree, relative_path) =
5292                                        project.find_local_worktree(&path, cx)?;
5293                                    let project_path =
5294                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
5295                                    let buffer =
5296                                        project.get_open_buffer(&project_path, cx)?.read(cx);
5297
5298                                    let mut highlights = Vec::new();
5299                                    let highlight_count = rng.lock().gen_range(1..=5);
5300                                    let mut prev_end = 0;
5301                                    for _ in 0..highlight_count {
5302                                        let range =
5303                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
5304                                        let start = buffer
5305                                            .offset_to_point_utf16(range.start)
5306                                            .to_lsp_position();
5307                                        let end = buffer
5308                                            .offset_to_point_utf16(range.end)
5309                                            .to_lsp_position();
5310                                        highlights.push(lsp::DocumentHighlight {
5311                                            range: lsp::Range::new(start, end),
5312                                            kind: Some(lsp::DocumentHighlightKind::READ),
5313                                        });
5314                                        prev_end = range.end;
5315                                    }
5316                                    Some(highlights)
5317                                })
5318                            } else {
5319                                None
5320                            }
5321                        }
5322                    });
5323                }
5324            });
5325
5326            project.update(&mut cx, |project, _| {
5327                project.languages().add(Arc::new(Language::new(
5328                    LanguageConfig {
5329                        name: "Rust".into(),
5330                        path_suffixes: vec!["rs".to_string()],
5331                        language_server: Some(language_server_config),
5332                        ..Default::default()
5333                    },
5334                    None,
5335                )));
5336            });
5337
5338            async move {
5339                let fs = project.read_with(&cx, |project, _| project.fs().clone());
5340                while operations.get() < max_operations {
5341                    operations.set(operations.get() + 1);
5342
5343                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
5344                    match distribution {
5345                        0..=20 if !files.lock().is_empty() => {
5346                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5347                            let mut path = path.as_path();
5348                            while let Some(parent_path) = path.parent() {
5349                                path = parent_path;
5350                                if rng.lock().gen() {
5351                                    break;
5352                                }
5353                            }
5354
5355                            log::info!("Host: find/create local worktree {:?}", path);
5356                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
5357                                project.find_or_create_local_worktree(path, true, cx)
5358                            });
5359                            let find_or_create_worktree = async move {
5360                                find_or_create_worktree.await.unwrap();
5361                            };
5362                            if rng.lock().gen() {
5363                                cx.background().spawn(find_or_create_worktree).detach();
5364                            } else {
5365                                find_or_create_worktree.await;
5366                            }
5367                        }
5368                        10..=80 if !files.lock().is_empty() => {
5369                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
5370                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
5371                                let (worktree, path) = project
5372                                    .update(&mut cx, |project, cx| {
5373                                        project.find_or_create_local_worktree(
5374                                            file.clone(),
5375                                            true,
5376                                            cx,
5377                                        )
5378                                    })
5379                                    .await
5380                                    .unwrap();
5381                                let project_path =
5382                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
5383                                log::info!(
5384                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
5385                                    file,
5386                                    project_path.0,
5387                                    project_path.1
5388                                );
5389                                let buffer = project
5390                                    .update(&mut cx, |project, cx| {
5391                                        project.open_buffer(project_path, cx)
5392                                    })
5393                                    .await
5394                                    .unwrap();
5395                                self.buffers.insert(buffer.clone());
5396                                buffer
5397                            } else {
5398                                self.buffers
5399                                    .iter()
5400                                    .choose(&mut *rng.lock())
5401                                    .unwrap()
5402                                    .clone()
5403                            };
5404
5405                            if rng.lock().gen_bool(0.1) {
5406                                cx.update(|cx| {
5407                                    log::info!(
5408                                        "Host: dropping buffer {:?}",
5409                                        buffer.read(cx).file().unwrap().full_path(cx)
5410                                    );
5411                                    self.buffers.remove(&buffer);
5412                                    drop(buffer);
5413                                });
5414                            } else {
5415                                buffer.update(&mut cx, |buffer, cx| {
5416                                    log::info!(
5417                                        "Host: updating buffer {:?} ({})",
5418                                        buffer.file().unwrap().full_path(cx),
5419                                        buffer.remote_id()
5420                                    );
5421                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5422                                });
5423                            }
5424                        }
5425                        _ => loop {
5426                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
5427                            let mut path = PathBuf::new();
5428                            path.push("/");
5429                            for _ in 0..path_component_count {
5430                                let letter = rng.lock().gen_range(b'a'..=b'z');
5431                                path.push(std::str::from_utf8(&[letter]).unwrap());
5432                            }
5433                            path.set_extension("rs");
5434                            let parent_path = path.parent().unwrap();
5435
5436                            log::info!("Host: creating file {:?}", path,);
5437
5438                            if fs.create_dir(&parent_path).await.is_ok()
5439                                && fs.create_file(&path, Default::default()).await.is_ok()
5440                            {
5441                                files.lock().push(path);
5442                                break;
5443                            } else {
5444                                log::info!("Host: cannot create file");
5445                            }
5446                        },
5447                    }
5448
5449                    cx.background().simulate_random_delay().await;
5450                }
5451
5452                log::info!("Host done");
5453
5454                self.project = Some(project);
5455                (self, cx)
5456            }
5457        }
5458
5459        pub async fn simulate_guest(
5460            mut self,
5461            guest_id: usize,
5462            project: ModelHandle<Project>,
5463            operations: Rc<Cell<usize>>,
5464            max_operations: usize,
5465            rng: Arc<Mutex<StdRng>>,
5466            mut cx: TestAppContext,
5467        ) -> (Self, TestAppContext) {
5468            while operations.get() < max_operations {
5469                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
5470                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
5471                        project
5472                            .worktrees(&cx)
5473                            .filter(|worktree| {
5474                                let worktree = worktree.read(cx);
5475                                worktree.is_visible()
5476                                    && worktree.entries(false).any(|e| e.is_file())
5477                            })
5478                            .choose(&mut *rng.lock())
5479                    }) {
5480                        worktree
5481                    } else {
5482                        cx.background().simulate_random_delay().await;
5483                        continue;
5484                    };
5485
5486                    operations.set(operations.get() + 1);
5487                    let (worktree_root_name, project_path) =
5488                        worktree.read_with(&cx, |worktree, _| {
5489                            let entry = worktree
5490                                .entries(false)
5491                                .filter(|e| e.is_file())
5492                                .choose(&mut *rng.lock())
5493                                .unwrap();
5494                            (
5495                                worktree.root_name().to_string(),
5496                                (worktree.id(), entry.path.clone()),
5497                            )
5498                        });
5499                    log::info!(
5500                        "Guest {}: opening path {:?} in worktree {} ({})",
5501                        guest_id,
5502                        project_path.1,
5503                        project_path.0,
5504                        worktree_root_name,
5505                    );
5506                    let buffer = project
5507                        .update(&mut cx, |project, cx| {
5508                            project.open_buffer(project_path.clone(), cx)
5509                        })
5510                        .await
5511                        .unwrap();
5512                    log::info!(
5513                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
5514                        guest_id,
5515                        project_path.1,
5516                        project_path.0,
5517                        worktree_root_name,
5518                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
5519                    );
5520                    self.buffers.insert(buffer.clone());
5521                    buffer
5522                } else {
5523                    operations.set(operations.get() + 1);
5524
5525                    self.buffers
5526                        .iter()
5527                        .choose(&mut *rng.lock())
5528                        .unwrap()
5529                        .clone()
5530                };
5531
5532                let choice = rng.lock().gen_range(0..100);
5533                match choice {
5534                    0..=9 => {
5535                        cx.update(|cx| {
5536                            log::info!(
5537                                "Guest {}: dropping buffer {:?}",
5538                                guest_id,
5539                                buffer.read(cx).file().unwrap().full_path(cx)
5540                            );
5541                            self.buffers.remove(&buffer);
5542                            drop(buffer);
5543                        });
5544                    }
5545                    10..=19 => {
5546                        let completions = project.update(&mut cx, |project, cx| {
5547                            log::info!(
5548                                "Guest {}: requesting completions for buffer {} ({:?})",
5549                                guest_id,
5550                                buffer.read(cx).remote_id(),
5551                                buffer.read(cx).file().unwrap().full_path(cx)
5552                            );
5553                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5554                            project.completions(&buffer, offset, cx)
5555                        });
5556                        let completions = cx.background().spawn(async move {
5557                            completions.await.expect("completions request failed");
5558                        });
5559                        if rng.lock().gen_bool(0.3) {
5560                            log::info!("Guest {}: detaching completions request", guest_id);
5561                            completions.detach();
5562                        } else {
5563                            completions.await;
5564                        }
5565                    }
5566                    20..=29 => {
5567                        let code_actions = project.update(&mut cx, |project, cx| {
5568                            log::info!(
5569                                "Guest {}: requesting code actions for buffer {} ({:?})",
5570                                guest_id,
5571                                buffer.read(cx).remote_id(),
5572                                buffer.read(cx).file().unwrap().full_path(cx)
5573                            );
5574                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
5575                            project.code_actions(&buffer, range, cx)
5576                        });
5577                        let code_actions = cx.background().spawn(async move {
5578                            code_actions.await.expect("code actions request failed");
5579                        });
5580                        if rng.lock().gen_bool(0.3) {
5581                            log::info!("Guest {}: detaching code actions request", guest_id);
5582                            code_actions.detach();
5583                        } else {
5584                            code_actions.await;
5585                        }
5586                    }
5587                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5588                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5589                            log::info!(
5590                                "Guest {}: saving buffer {} ({:?})",
5591                                guest_id,
5592                                buffer.remote_id(),
5593                                buffer.file().unwrap().full_path(cx)
5594                            );
5595                            (buffer.version(), buffer.save(cx))
5596                        });
5597                        let save = cx.background().spawn(async move {
5598                            let (saved_version, _) = save.await.expect("save request failed");
5599                            assert!(saved_version.observed_all(&requested_version));
5600                        });
5601                        if rng.lock().gen_bool(0.3) {
5602                            log::info!("Guest {}: detaching save request", guest_id);
5603                            save.detach();
5604                        } else {
5605                            save.await;
5606                        }
5607                    }
5608                    40..=44 => {
5609                        let prepare_rename = project.update(&mut cx, |project, cx| {
5610                            log::info!(
5611                                "Guest {}: preparing rename for buffer {} ({:?})",
5612                                guest_id,
5613                                buffer.read(cx).remote_id(),
5614                                buffer.read(cx).file().unwrap().full_path(cx)
5615                            );
5616                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5617                            project.prepare_rename(buffer, offset, cx)
5618                        });
5619                        let prepare_rename = cx.background().spawn(async move {
5620                            prepare_rename.await.expect("prepare rename request failed");
5621                        });
5622                        if rng.lock().gen_bool(0.3) {
5623                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5624                            prepare_rename.detach();
5625                        } else {
5626                            prepare_rename.await;
5627                        }
5628                    }
5629                    45..=49 => {
5630                        let definitions = project.update(&mut cx, |project, cx| {
5631                            log::info!(
5632                                "Guest {}: requesting definitions for buffer {} ({:?})",
5633                                guest_id,
5634                                buffer.read(cx).remote_id(),
5635                                buffer.read(cx).file().unwrap().full_path(cx)
5636                            );
5637                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5638                            project.definition(&buffer, offset, cx)
5639                        });
5640                        let definitions = cx.background().spawn(async move {
5641                            definitions.await.expect("definitions request failed")
5642                        });
5643                        if rng.lock().gen_bool(0.3) {
5644                            log::info!("Guest {}: detaching definitions request", guest_id);
5645                            definitions.detach();
5646                        } else {
5647                            self.buffers
5648                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5649                        }
5650                    }
5651                    50..=54 => {
5652                        let highlights = project.update(&mut cx, |project, cx| {
5653                            log::info!(
5654                                "Guest {}: requesting highlights for buffer {} ({:?})",
5655                                guest_id,
5656                                buffer.read(cx).remote_id(),
5657                                buffer.read(cx).file().unwrap().full_path(cx)
5658                            );
5659                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5660                            project.document_highlights(&buffer, offset, cx)
5661                        });
5662                        let highlights = cx.background().spawn(async move {
5663                            highlights.await.expect("highlights request failed");
5664                        });
5665                        if rng.lock().gen_bool(0.3) {
5666                            log::info!("Guest {}: detaching highlights request", guest_id);
5667                            highlights.detach();
5668                        } else {
5669                            highlights.await;
5670                        }
5671                    }
5672                    55..=59 => {
5673                        let search = project.update(&mut cx, |project, cx| {
5674                            let query = rng.lock().gen_range('a'..='z');
5675                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5676                            project.search(SearchQuery::text(query, false, false), cx)
5677                        });
5678                        let search = cx
5679                            .background()
5680                            .spawn(async move { search.await.expect("search request failed") });
5681                        if rng.lock().gen_bool(0.3) {
5682                            log::info!("Guest {}: detaching search request", guest_id);
5683                            search.detach();
5684                        } else {
5685                            self.buffers.extend(search.await.into_keys());
5686                        }
5687                    }
5688                    _ => {
5689                        buffer.update(&mut cx, |buffer, cx| {
5690                            log::info!(
5691                                "Guest {}: updating buffer {} ({:?})",
5692                                guest_id,
5693                                buffer.remote_id(),
5694                                buffer.file().unwrap().full_path(cx)
5695                            );
5696                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5697                        });
5698                    }
5699                }
5700                cx.background().simulate_random_delay().await;
5701            }
5702
5703            log::info!("Guest {} done", guest_id);
5704
5705            self.project = Some(project);
5706            (self, cx)
5707        }
5708    }
5709
5710    impl Drop for TestClient {
5711        fn drop(&mut self) {
5712            self.client.tear_down();
5713        }
5714    }
5715
5716    impl Executor for Arc<gpui::executor::Background> {
5717        type Timer = gpui::executor::Timer;
5718
5719        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5720            self.spawn(future).detach();
5721        }
5722
5723        fn timer(&self, duration: Duration) -> Self::Timer {
5724            self.as_ref().timer(duration)
5725        }
5726    }
5727
5728    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5729        channel
5730            .messages()
5731            .cursor::<()>()
5732            .map(|m| {
5733                (
5734                    m.sender.github_login.clone(),
5735                    m.body.clone(),
5736                    m.is_pending(),
5737                )
5738            })
5739            .collect()
5740    }
5741
5742    struct EmptyView;
5743
5744    impl gpui::Entity for EmptyView {
5745        type Event = ();
5746    }
5747
5748    impl gpui::View for EmptyView {
5749        fn ui_name() -> &'static str {
5750            "empty view"
5751        }
5752
5753        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5754            gpui::Element::boxed(gpui::elements::Empty)
5755        }
5756    }
5757}