rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::start_language_server)
  87            .add_message_handler(Server::update_language_server)
  88            .add_message_handler(Server::update_diagnostic_summary)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  96            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  97            .add_request_handler(
  98                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  99            )
 100            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 101            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 102            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 105            .add_request_handler(Server::update_buffer)
 106            .add_message_handler(Server::update_buffer_file)
 107            .add_message_handler(Server::buffer_reloaded)
 108            .add_message_handler(Server::buffer_saved)
 109            .add_request_handler(Server::save_buffer)
 110            .add_request_handler(Server::get_channels)
 111            .add_request_handler(Server::get_users)
 112            .add_request_handler(Server::join_channel)
 113            .add_message_handler(Server::leave_channel)
 114            .add_request_handler(Server::send_channel_message)
 115            .add_request_handler(Server::get_channel_messages);
 116
 117        Arc::new(server)
 118    }
 119
 120    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 121    where
 122        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 123        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 124        M: EnvelopedMessage,
 125    {
 126        let prev_handler = self.handlers.insert(
 127            TypeId::of::<M>(),
 128            Box::new(move |server, envelope| {
 129                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 130                (handler)(server, *envelope).boxed()
 131            }),
 132        );
 133        if prev_handler.is_some() {
 134            panic!("registered a handler for the same message twice");
 135        }
 136        self
 137    }
 138
 139    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 140    where
 141        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 142        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 143        M: RequestMessage,
 144    {
 145        self.add_message_handler(move |server, envelope| {
 146            let receipt = envelope.receipt();
 147            let response = (handler)(server.clone(), envelope);
 148            async move {
 149                match response.await {
 150                    Ok(response) => {
 151                        server.peer.respond(receipt, response)?;
 152                        Ok(())
 153                    }
 154                    Err(error) => {
 155                        server.peer.respond_with_error(
 156                            receipt,
 157                            proto::Error {
 158                                message: error.to_string(),
 159                            },
 160                        )?;
 161                        Err(error)
 162                    }
 163                }
 164            }
 165        })
 166    }
 167
 168    pub fn handle_connection<E: Executor>(
 169        self: &Arc<Self>,
 170        connection: Connection,
 171        addr: String,
 172        user_id: UserId,
 173        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 174        executor: E,
 175    ) -> impl Future<Output = ()> {
 176        let mut this = self.clone();
 177        async move {
 178            let (connection_id, handle_io, mut incoming_rx) = this
 179                .peer
 180                .add_connection(connection, {
 181                    let executor = executor.clone();
 182                    move |duration| {
 183                        let timer = executor.timer(duration);
 184                        async move {
 185                            timer.await;
 186                        }
 187                    }
 188                })
 189                .await;
 190
 191            if let Some(send_connection_id) = send_connection_id.as_mut() {
 192                let _ = send_connection_id.send(connection_id).await;
 193            }
 194
 195            this.state_mut().add_connection(connection_id, user_id);
 196            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 197                log::error!("error updating contacts for {:?}: {}", user_id, err);
 198            }
 199
 200            let handle_io = handle_io.fuse();
 201            futures::pin_mut!(handle_io);
 202            loop {
 203                let next_message = incoming_rx.next().fuse();
 204                futures::pin_mut!(next_message);
 205                futures::select_biased! {
 206                    result = handle_io => {
 207                        if let Err(err) = result {
 208                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 209                        }
 210                        break;
 211                    }
 212                    message = next_message => {
 213                        if let Some(message) = message {
 214                            let start_time = Instant::now();
 215                            let type_name = message.payload_type_name();
 216                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 217                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 218                                let notifications = this.notifications.clone();
 219                                let is_background = message.is_background();
 220                                let handle_message = (handler)(this.clone(), message);
 221                                let handle_message = async move {
 222                                    if let Err(err) = handle_message.await {
 223                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 224                                    } else {
 225                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 226                                    }
 227                                    if let Some(mut notifications) = notifications {
 228                                        let _ = notifications.send(()).await;
 229                                    }
 230                                };
 231                                if is_background {
 232                                    executor.spawn_detached(handle_message);
 233                                } else {
 234                                    handle_message.await;
 235                                }
 236                            } else {
 237                                log::warn!("unhandled message: {}", type_name);
 238                            }
 239                        } else {
 240                            log::info!("rpc connection closed {:?}", addr);
 241                            break;
 242                        }
 243                    }
 244                }
 245            }
 246
 247            if let Err(err) = this.sign_out(connection_id).await {
 248                log::error!("error signing out connection {:?} - {:?}", addr, err);
 249            }
 250        }
 251    }
 252
 253    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 254        self.peer.disconnect(connection_id);
 255        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 256
 257        for (project_id, project) in removed_connection.hosted_projects {
 258            if let Some(share) = project.share {
 259                broadcast(
 260                    connection_id,
 261                    share.guests.keys().copied().collect(),
 262                    |conn_id| {
 263                        self.peer
 264                            .send(conn_id, proto::UnshareProject { project_id })
 265                    },
 266                )?;
 267            }
 268        }
 269
 270        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 271            broadcast(connection_id, peer_ids, |conn_id| {
 272                self.peer.send(
 273                    conn_id,
 274                    proto::RemoveProjectCollaborator {
 275                        project_id,
 276                        peer_id: connection_id.0,
 277                    },
 278                )
 279            })?;
 280        }
 281
 282        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 283        Ok(())
 284    }
 285
 286    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 287        Ok(proto::Ack {})
 288    }
 289
 290    async fn register_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::RegisterProject>,
 293    ) -> tide::Result<proto::RegisterProjectResponse> {
 294        let project_id = {
 295            let mut state = self.state_mut();
 296            let user_id = state.user_id_for_connection(request.sender_id)?;
 297            state.register_project(request.sender_id, user_id)
 298        };
 299        Ok(proto::RegisterProjectResponse { project_id })
 300    }
 301
 302    async fn unregister_project(
 303        mut self: Arc<Server>,
 304        request: TypedEnvelope<proto::UnregisterProject>,
 305    ) -> tide::Result<()> {
 306        let project = self
 307            .state_mut()
 308            .unregister_project(request.payload.project_id, request.sender_id)?;
 309        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 310        Ok(())
 311    }
 312
 313    async fn share_project(
 314        mut self: Arc<Server>,
 315        request: TypedEnvelope<proto::ShareProject>,
 316    ) -> tide::Result<proto::Ack> {
 317        self.state_mut()
 318            .share_project(request.payload.project_id, request.sender_id);
 319        Ok(proto::Ack {})
 320    }
 321
 322    async fn unshare_project(
 323        mut self: Arc<Server>,
 324        request: TypedEnvelope<proto::UnshareProject>,
 325    ) -> tide::Result<()> {
 326        let project_id = request.payload.project_id;
 327        let project = self
 328            .state_mut()
 329            .unshare_project(project_id, request.sender_id)?;
 330
 331        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 332            self.peer
 333                .send(conn_id, proto::UnshareProject { project_id })
 334        })?;
 335        self.update_contacts_for_users(&project.authorized_user_ids)?;
 336        Ok(())
 337    }
 338
 339    async fn join_project(
 340        mut self: Arc<Server>,
 341        request: TypedEnvelope<proto::JoinProject>,
 342    ) -> tide::Result<proto::JoinProjectResponse> {
 343        let project_id = request.payload.project_id;
 344
 345        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 346        let (response, connection_ids, contact_user_ids) = self
 347            .state_mut()
 348            .join_project(request.sender_id, user_id, project_id)
 349            .and_then(|joined| {
 350                let share = joined.project.share()?;
 351                let peer_count = share.guests.len();
 352                let mut collaborators = Vec::with_capacity(peer_count);
 353                collaborators.push(proto::Collaborator {
 354                    peer_id: joined.project.host_connection_id.0,
 355                    replica_id: 0,
 356                    user_id: joined.project.host_user_id.to_proto(),
 357                });
 358                let worktrees = share
 359                    .worktrees
 360                    .iter()
 361                    .filter_map(|(id, shared_worktree)| {
 362                        let worktree = joined.project.worktrees.get(&id)?;
 363                        Some(proto::Worktree {
 364                            id: *id,
 365                            root_name: worktree.root_name.clone(),
 366                            entries: shared_worktree.entries.values().cloned().collect(),
 367                            diagnostic_summaries: shared_worktree
 368                                .diagnostic_summaries
 369                                .values()
 370                                .cloned()
 371                                .collect(),
 372                            visible: worktree.visible,
 373                        })
 374                    })
 375                    .collect();
 376                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 377                    if *peer_conn_id != request.sender_id {
 378                        collaborators.push(proto::Collaborator {
 379                            peer_id: peer_conn_id.0,
 380                            replica_id: *peer_replica_id as u32,
 381                            user_id: peer_user_id.to_proto(),
 382                        });
 383                    }
 384                }
 385                let response = proto::JoinProjectResponse {
 386                    worktrees,
 387                    replica_id: joined.replica_id as u32,
 388                    collaborators,
 389                    language_servers: joined.project.language_servers.clone(),
 390                };
 391                let connection_ids = joined.project.connection_ids();
 392                let contact_user_ids = joined.project.authorized_user_ids();
 393                Ok((response, connection_ids, contact_user_ids))
 394            })?;
 395
 396        broadcast(request.sender_id, connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::AddProjectCollaborator {
 400                    project_id,
 401                    collaborator: Some(proto::Collaborator {
 402                        peer_id: request.sender_id.0,
 403                        replica_id: response.replica_id,
 404                        user_id: user_id.to_proto(),
 405                    }),
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&contact_user_ids)?;
 410        Ok(response)
 411    }
 412
 413    async fn leave_project(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::LeaveProject>,
 416    ) -> tide::Result<()> {
 417        let sender_id = request.sender_id;
 418        let project_id = request.payload.project_id;
 419        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 420
 421        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 422            self.peer.send(
 423                conn_id,
 424                proto::RemoveProjectCollaborator {
 425                    project_id,
 426                    peer_id: sender_id.0,
 427                },
 428            )
 429        })?;
 430        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 431
 432        Ok(())
 433    }
 434
 435    async fn register_worktree(
 436        mut self: Arc<Server>,
 437        request: TypedEnvelope<proto::RegisterWorktree>,
 438    ) -> tide::Result<proto::Ack> {
 439        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 440
 441        let mut contact_user_ids = HashSet::default();
 442        contact_user_ids.insert(host_user_id);
 443        for github_login in &request.payload.authorized_logins {
 444            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 445            contact_user_ids.insert(contact_user_id);
 446        }
 447
 448        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 449        let guest_connection_ids;
 450        {
 451            let mut state = self.state_mut();
 452            guest_connection_ids = state
 453                .read_project(request.payload.project_id, request.sender_id)?
 454                .guest_connection_ids();
 455            state.register_worktree(
 456                request.payload.project_id,
 457                request.payload.worktree_id,
 458                request.sender_id,
 459                Worktree {
 460                    authorized_user_ids: contact_user_ids.clone(),
 461                    root_name: request.payload.root_name.clone(),
 462                    visible: request.payload.visible,
 463                },
 464            )?;
 465        }
 466        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 467            self.peer
 468                .forward_send(request.sender_id, connection_id, request.payload.clone())
 469        })?;
 470        self.update_contacts_for_users(&contact_user_ids)?;
 471        Ok(proto::Ack {})
 472    }
 473
 474    async fn unregister_worktree(
 475        mut self: Arc<Server>,
 476        request: TypedEnvelope<proto::UnregisterWorktree>,
 477    ) -> tide::Result<()> {
 478        let project_id = request.payload.project_id;
 479        let worktree_id = request.payload.worktree_id;
 480        let (worktree, guest_connection_ids) =
 481            self.state_mut()
 482                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 483        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 484            self.peer.send(
 485                conn_id,
 486                proto::UnregisterWorktree {
 487                    project_id,
 488                    worktree_id,
 489                },
 490            )
 491        })?;
 492        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 493        Ok(())
 494    }
 495
 496    async fn update_worktree(
 497        mut self: Arc<Server>,
 498        request: TypedEnvelope<proto::UpdateWorktree>,
 499    ) -> tide::Result<proto::Ack> {
 500        let connection_ids = self.state_mut().update_worktree(
 501            request.sender_id,
 502            request.payload.project_id,
 503            request.payload.worktree_id,
 504            &request.payload.removed_entries,
 505            &request.payload.updated_entries,
 506        )?;
 507
 508        broadcast(request.sender_id, connection_ids, |connection_id| {
 509            self.peer
 510                .forward_send(request.sender_id, connection_id, request.payload.clone())
 511        })?;
 512
 513        Ok(proto::Ack {})
 514    }
 515
 516    async fn update_diagnostic_summary(
 517        mut self: Arc<Server>,
 518        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 519    ) -> tide::Result<()> {
 520        let summary = request
 521            .payload
 522            .summary
 523            .clone()
 524            .ok_or_else(|| anyhow!("invalid summary"))?;
 525        let receiver_ids = self.state_mut().update_diagnostic_summary(
 526            request.payload.project_id,
 527            request.payload.worktree_id,
 528            request.sender_id,
 529            summary,
 530        )?;
 531
 532        broadcast(request.sender_id, receiver_ids, |connection_id| {
 533            self.peer
 534                .forward_send(request.sender_id, connection_id, request.payload.clone())
 535        })?;
 536        Ok(())
 537    }
 538
 539    async fn start_language_server(
 540        mut self: Arc<Server>,
 541        request: TypedEnvelope<proto::StartLanguageServer>,
 542    ) -> tide::Result<()> {
 543        let receiver_ids = self.state_mut().start_language_server(
 544            request.payload.project_id,
 545            request.sender_id,
 546            request
 547                .payload
 548                .server
 549                .clone()
 550                .ok_or_else(|| anyhow!("invalid language server"))?,
 551        )?;
 552        broadcast(request.sender_id, receiver_ids, |connection_id| {
 553            self.peer
 554                .forward_send(request.sender_id, connection_id, request.payload.clone())
 555        })?;
 556        Ok(())
 557    }
 558
 559    async fn update_language_server(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::UpdateLanguageServer>,
 562    ) -> tide::Result<()> {
 563        let receiver_ids = self
 564            .state()
 565            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn forward_project_request<T>(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<T>,
 576    ) -> tide::Result<T::Response>
 577    where
 578        T: EntityMessage + RequestMessage,
 579    {
 580        let host_connection_id = self
 581            .state()
 582            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 583            .host_connection_id;
 584        Ok(self
 585            .peer
 586            .forward_request(request.sender_id, host_connection_id, request.payload)
 587            .await?)
 588    }
 589
 590    async fn save_buffer(
 591        self: Arc<Server>,
 592        request: TypedEnvelope<proto::SaveBuffer>,
 593    ) -> tide::Result<proto::BufferSaved> {
 594        let host;
 595        let mut guests;
 596        {
 597            let state = self.state();
 598            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 599            host = project.host_connection_id;
 600            guests = project.guest_connection_ids()
 601        }
 602
 603        let response = self
 604            .peer
 605            .forward_request(request.sender_id, host, request.payload.clone())
 606            .await?;
 607
 608        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 609        broadcast(host, guests, |conn_id| {
 610            self.peer.forward_send(host, conn_id, response.clone())
 611        })?;
 612
 613        Ok(response)
 614    }
 615
 616    async fn update_buffer(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::UpdateBuffer>,
 619    ) -> tide::Result<proto::Ack> {
 620        let receiver_ids = self
 621            .state()
 622            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 623        broadcast(request.sender_id, receiver_ids, |connection_id| {
 624            self.peer
 625                .forward_send(request.sender_id, connection_id, request.payload.clone())
 626        })?;
 627        Ok(proto::Ack {})
 628    }
 629
 630    async fn update_buffer_file(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::UpdateBufferFile>,
 633    ) -> tide::Result<()> {
 634        let receiver_ids = self
 635            .state()
 636            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 637        broadcast(request.sender_id, receiver_ids, |connection_id| {
 638            self.peer
 639                .forward_send(request.sender_id, connection_id, request.payload.clone())
 640        })?;
 641        Ok(())
 642    }
 643
 644    async fn buffer_reloaded(
 645        self: Arc<Server>,
 646        request: TypedEnvelope<proto::BufferReloaded>,
 647    ) -> tide::Result<()> {
 648        let receiver_ids = self
 649            .state()
 650            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 651        broadcast(request.sender_id, receiver_ids, |connection_id| {
 652            self.peer
 653                .forward_send(request.sender_id, connection_id, request.payload.clone())
 654        })?;
 655        Ok(())
 656    }
 657
 658    async fn buffer_saved(
 659        self: Arc<Server>,
 660        request: TypedEnvelope<proto::BufferSaved>,
 661    ) -> tide::Result<()> {
 662        let receiver_ids = self
 663            .state()
 664            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 665        broadcast(request.sender_id, receiver_ids, |connection_id| {
 666            self.peer
 667                .forward_send(request.sender_id, connection_id, request.payload.clone())
 668        })?;
 669        Ok(())
 670    }
 671
 672    async fn get_channels(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::GetChannels>,
 675    ) -> tide::Result<proto::GetChannelsResponse> {
 676        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 677        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 678        Ok(proto::GetChannelsResponse {
 679            channels: channels
 680                .into_iter()
 681                .map(|chan| proto::Channel {
 682                    id: chan.id.to_proto(),
 683                    name: chan.name,
 684                })
 685                .collect(),
 686        })
 687    }
 688
 689    async fn get_users(
 690        self: Arc<Server>,
 691        request: TypedEnvelope<proto::GetUsers>,
 692    ) -> tide::Result<proto::GetUsersResponse> {
 693        let user_ids = request
 694            .payload
 695            .user_ids
 696            .into_iter()
 697            .map(UserId::from_proto)
 698            .collect();
 699        let users = self
 700            .app_state
 701            .db
 702            .get_users_by_ids(user_ids)
 703            .await?
 704            .into_iter()
 705            .map(|user| proto::User {
 706                id: user.id.to_proto(),
 707                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 708                github_login: user.github_login,
 709            })
 710            .collect();
 711        Ok(proto::GetUsersResponse { users })
 712    }
 713
 714    fn update_contacts_for_users<'a>(
 715        self: &Arc<Server>,
 716        user_ids: impl IntoIterator<Item = &'a UserId>,
 717    ) -> anyhow::Result<()> {
 718        let mut result = Ok(());
 719        let state = self.state();
 720        for user_id in user_ids {
 721            let contacts = state.contacts_for_user(*user_id);
 722            for connection_id in state.connection_ids_for_user(*user_id) {
 723                if let Err(error) = self.peer.send(
 724                    connection_id,
 725                    proto::UpdateContacts {
 726                        contacts: contacts.clone(),
 727                    },
 728                ) {
 729                    result = Err(error);
 730                }
 731            }
 732        }
 733        result
 734    }
 735
 736    async fn join_channel(
 737        mut self: Arc<Self>,
 738        request: TypedEnvelope<proto::JoinChannel>,
 739    ) -> tide::Result<proto::JoinChannelResponse> {
 740        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 741        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 742        if !self
 743            .app_state
 744            .db
 745            .can_user_access_channel(user_id, channel_id)
 746            .await?
 747        {
 748            Err(anyhow!("access denied"))?;
 749        }
 750
 751        self.state_mut().join_channel(request.sender_id, channel_id);
 752        let messages = self
 753            .app_state
 754            .db
 755            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 756            .await?
 757            .into_iter()
 758            .map(|msg| proto::ChannelMessage {
 759                id: msg.id.to_proto(),
 760                body: msg.body,
 761                timestamp: msg.sent_at.unix_timestamp() as u64,
 762                sender_id: msg.sender_id.to_proto(),
 763                nonce: Some(msg.nonce.as_u128().into()),
 764            })
 765            .collect::<Vec<_>>();
 766        Ok(proto::JoinChannelResponse {
 767            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 768            messages,
 769        })
 770    }
 771
 772    async fn leave_channel(
 773        mut self: Arc<Self>,
 774        request: TypedEnvelope<proto::LeaveChannel>,
 775    ) -> tide::Result<()> {
 776        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 777        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 778        if !self
 779            .app_state
 780            .db
 781            .can_user_access_channel(user_id, channel_id)
 782            .await?
 783        {
 784            Err(anyhow!("access denied"))?;
 785        }
 786
 787        self.state_mut()
 788            .leave_channel(request.sender_id, channel_id);
 789
 790        Ok(())
 791    }
 792
 793    async fn send_channel_message(
 794        self: Arc<Self>,
 795        request: TypedEnvelope<proto::SendChannelMessage>,
 796    ) -> tide::Result<proto::SendChannelMessageResponse> {
 797        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 798        let user_id;
 799        let connection_ids;
 800        {
 801            let state = self.state();
 802            user_id = state.user_id_for_connection(request.sender_id)?;
 803            connection_ids = state.channel_connection_ids(channel_id)?;
 804        }
 805
 806        // Validate the message body.
 807        let body = request.payload.body.trim().to_string();
 808        if body.len() > MAX_MESSAGE_LEN {
 809            return Err(anyhow!("message is too long"))?;
 810        }
 811        if body.is_empty() {
 812            return Err(anyhow!("message can't be blank"))?;
 813        }
 814
 815        let timestamp = OffsetDateTime::now_utc();
 816        let nonce = request
 817            .payload
 818            .nonce
 819            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 820
 821        let message_id = self
 822            .app_state
 823            .db
 824            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 825            .await?
 826            .to_proto();
 827        let message = proto::ChannelMessage {
 828            sender_id: user_id.to_proto(),
 829            id: message_id,
 830            body,
 831            timestamp: timestamp.unix_timestamp() as u64,
 832            nonce: Some(nonce),
 833        };
 834        broadcast(request.sender_id, connection_ids, |conn_id| {
 835            self.peer.send(
 836                conn_id,
 837                proto::ChannelMessageSent {
 838                    channel_id: channel_id.to_proto(),
 839                    message: Some(message.clone()),
 840                },
 841            )
 842        })?;
 843        Ok(proto::SendChannelMessageResponse {
 844            message: Some(message),
 845        })
 846    }
 847
 848    async fn get_channel_messages(
 849        self: Arc<Self>,
 850        request: TypedEnvelope<proto::GetChannelMessages>,
 851    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 852        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 853        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 854        if !self
 855            .app_state
 856            .db
 857            .can_user_access_channel(user_id, channel_id)
 858            .await?
 859        {
 860            Err(anyhow!("access denied"))?;
 861        }
 862
 863        let messages = self
 864            .app_state
 865            .db
 866            .get_channel_messages(
 867                channel_id,
 868                MESSAGE_COUNT_PER_PAGE,
 869                Some(MessageId::from_proto(request.payload.before_message_id)),
 870            )
 871            .await?
 872            .into_iter()
 873            .map(|msg| proto::ChannelMessage {
 874                id: msg.id.to_proto(),
 875                body: msg.body,
 876                timestamp: msg.sent_at.unix_timestamp() as u64,
 877                sender_id: msg.sender_id.to_proto(),
 878                nonce: Some(msg.nonce.as_u128().into()),
 879            })
 880            .collect::<Vec<_>>();
 881
 882        Ok(proto::GetChannelMessagesResponse {
 883            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 884            messages,
 885        })
 886    }
 887
 888    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 889        self.store.read()
 890    }
 891
 892    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 893        self.store.write()
 894    }
 895}
 896
 897impl Executor for RealExecutor {
 898    type Timer = Timer;
 899
 900    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 901        task::spawn(future);
 902    }
 903
 904    fn timer(&self, duration: Duration) -> Self::Timer {
 905        Timer::after(duration)
 906    }
 907}
 908
 909fn broadcast<F>(
 910    sender_id: ConnectionId,
 911    receiver_ids: Vec<ConnectionId>,
 912    mut f: F,
 913) -> anyhow::Result<()>
 914where
 915    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 916{
 917    let mut result = Ok(());
 918    for receiver_id in receiver_ids {
 919        if receiver_id != sender_id {
 920            if let Err(error) = f(receiver_id) {
 921                if result.is_ok() {
 922                    result = Err(error);
 923                }
 924            }
 925        }
 926    }
 927    result
 928}
 929
 930pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 931    let server = Server::new(app.state().clone(), rpc.clone(), None);
 932    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 933        let server = server.clone();
 934        async move {
 935            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 936
 937            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 938            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 939            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 940            let client_protocol_version: Option<u32> = request
 941                .header("X-Zed-Protocol-Version")
 942                .and_then(|v| v.as_str().parse().ok());
 943
 944            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 945                return Ok(Response::new(StatusCode::UpgradeRequired));
 946            }
 947
 948            let header = match request.header("Sec-Websocket-Key") {
 949                Some(h) => h.as_str(),
 950                None => return Err(anyhow!("expected sec-websocket-key"))?,
 951            };
 952
 953            let user_id = process_auth_header(&request).await?;
 954
 955            let mut response = Response::new(StatusCode::SwitchingProtocols);
 956            response.insert_header(UPGRADE, "websocket");
 957            response.insert_header(CONNECTION, "Upgrade");
 958            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 959            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 960            response.insert_header("Sec-Websocket-Version", "13");
 961
 962            let http_res: &mut tide::http::Response = response.as_mut();
 963            let upgrade_receiver = http_res.recv_upgrade().await;
 964            let addr = request.remote().unwrap_or("unknown").to_string();
 965            task::spawn(async move {
 966                if let Some(stream) = upgrade_receiver.await {
 967                    server
 968                        .handle_connection(
 969                            Connection::new(
 970                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 971                            ),
 972                            addr,
 973                            user_id,
 974                            None,
 975                            RealExecutor,
 976                        )
 977                        .await;
 978                }
 979            });
 980
 981            Ok(response)
 982        }
 983    });
 984}
 985
 986fn header_contains_ignore_case<T>(
 987    request: &tide::Request<T>,
 988    header_name: HeaderName,
 989    value: &str,
 990) -> bool {
 991    request
 992        .header(header_name)
 993        .map(|h| {
 994            h.as_str()
 995                .split(',')
 996                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 997        })
 998        .unwrap_or(false)
 999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004    use crate::{
1005        auth,
1006        db::{tests::TestDb, UserId},
1007        github, AppState, Config,
1008    };
1009    use ::rpc::Peer;
1010    use client::{
1011        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1012        EstablishConnectionError, UserStore,
1013    };
1014    use collections::BTreeMap;
1015    use editor::{
1016        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1017        Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1018    };
1019    use gpui::{executor, ModelHandle, TestAppContext};
1020    use language::{
1021        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1022        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1023    };
1024    use lsp;
1025    use parking_lot::Mutex;
1026    use postage::barrier;
1027    use project::{
1028        fs::{FakeFs, Fs as _},
1029        search::SearchQuery,
1030        worktree::WorktreeHandle,
1031        DiagnosticSummary, Project, ProjectPath,
1032    };
1033    use rand::prelude::*;
1034    use rpc::PeerId;
1035    use serde_json::json;
1036    use sqlx::types::time::OffsetDateTime;
1037    use std::{
1038        cell::Cell,
1039        env,
1040        ops::Deref,
1041        path::{Path, PathBuf},
1042        rc::Rc,
1043        sync::{
1044            atomic::{AtomicBool, Ordering::SeqCst},
1045            Arc,
1046        },
1047        time::Duration,
1048    };
1049    use workspace::{Settings, Workspace, WorkspaceParams};
1050
1051    #[cfg(test)]
1052    #[ctor::ctor]
1053    fn init_logger() {
1054        if std::env::var("RUST_LOG").is_ok() {
1055            env_logger::init();
1056        }
1057    }
1058
1059    #[gpui::test(iterations = 10)]
1060    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1061        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1062        let lang_registry = Arc::new(LanguageRegistry::test());
1063        let fs = FakeFs::new(cx_a.background());
1064        cx_a.foreground().forbid_parking();
1065
1066        // Connect to a server as 2 clients.
1067        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1068        let client_a = server.create_client(cx_a, "user_a").await;
1069        let client_b = server.create_client(cx_b, "user_b").await;
1070
1071        // Share a project as client A
1072        fs.insert_tree(
1073            "/a",
1074            json!({
1075                ".zed.toml": r#"collaborators = ["user_b"]"#,
1076                "a.txt": "a-contents",
1077                "b.txt": "b-contents",
1078            }),
1079        )
1080        .await;
1081        let project_a = cx_a.update(|cx| {
1082            Project::local(
1083                client_a.clone(),
1084                client_a.user_store.clone(),
1085                lang_registry.clone(),
1086                fs.clone(),
1087                cx,
1088            )
1089        });
1090        let (worktree_a, _) = project_a
1091            .update(cx_a, |p, cx| {
1092                p.find_or_create_local_worktree("/a", true, cx)
1093            })
1094            .await
1095            .unwrap();
1096        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1097        worktree_a
1098            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1099            .await;
1100        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1101        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1102
1103        // Join that project as client B
1104        let project_b = Project::remote(
1105            project_id,
1106            client_b.clone(),
1107            client_b.user_store.clone(),
1108            lang_registry.clone(),
1109            fs.clone(),
1110            &mut cx_b.to_async(),
1111        )
1112        .await
1113        .unwrap();
1114
1115        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1116            assert_eq!(
1117                project
1118                    .collaborators()
1119                    .get(&client_a.peer_id)
1120                    .unwrap()
1121                    .user
1122                    .github_login,
1123                "user_a"
1124            );
1125            project.replica_id()
1126        });
1127        project_a
1128            .condition(&cx_a, |tree, _| {
1129                tree.collaborators()
1130                    .get(&client_b.peer_id)
1131                    .map_or(false, |collaborator| {
1132                        collaborator.replica_id == replica_id_b
1133                            && collaborator.user.github_login == "user_b"
1134                    })
1135            })
1136            .await;
1137
1138        // Open the same file as client B and client A.
1139        let buffer_b = project_b
1140            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1141            .await
1142            .unwrap();
1143        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1144        buffer_b.read_with(cx_b, |buf, cx| {
1145            assert_eq!(buf.read(cx).text(), "b-contents")
1146        });
1147        project_a.read_with(cx_a, |project, cx| {
1148            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1149        });
1150        let buffer_a = project_a
1151            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1152            .await
1153            .unwrap();
1154
1155        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1156
1157        // TODO
1158        // // Create a selection set as client B and see that selection set as client A.
1159        // buffer_a
1160        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1161        //     .await;
1162
1163        // Edit the buffer as client B and see that edit as client A.
1164        editor_b.update(cx_b, |editor, cx| {
1165            editor.handle_input(&Input("ok, ".into()), cx)
1166        });
1167        buffer_a
1168            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1169            .await;
1170
1171        // TODO
1172        // // Remove the selection set as client B, see those selections disappear as client A.
1173        cx_b.update(move |_| drop(editor_b));
1174        // buffer_a
1175        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1176        //     .await;
1177
1178        // Dropping the client B's project removes client B from client A's collaborators.
1179        cx_b.update(move |_| drop(project_b));
1180        project_a
1181            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1182            .await;
1183    }
1184
1185    #[gpui::test(iterations = 10)]
1186    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1187        let lang_registry = Arc::new(LanguageRegistry::test());
1188        let fs = FakeFs::new(cx_a.background());
1189        cx_a.foreground().forbid_parking();
1190
1191        // Connect to a server as 2 clients.
1192        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1193        let client_a = server.create_client(cx_a, "user_a").await;
1194        let client_b = server.create_client(cx_b, "user_b").await;
1195
1196        // Share a project as client A
1197        fs.insert_tree(
1198            "/a",
1199            json!({
1200                ".zed.toml": r#"collaborators = ["user_b"]"#,
1201                "a.txt": "a-contents",
1202                "b.txt": "b-contents",
1203            }),
1204        )
1205        .await;
1206        let project_a = cx_a.update(|cx| {
1207            Project::local(
1208                client_a.clone(),
1209                client_a.user_store.clone(),
1210                lang_registry.clone(),
1211                fs.clone(),
1212                cx,
1213            )
1214        });
1215        let (worktree_a, _) = project_a
1216            .update(cx_a, |p, cx| {
1217                p.find_or_create_local_worktree("/a", true, cx)
1218            })
1219            .await
1220            .unwrap();
1221        worktree_a
1222            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1223            .await;
1224        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1225        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1226        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1227        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1228
1229        // Join that project as client B
1230        let project_b = Project::remote(
1231            project_id,
1232            client_b.clone(),
1233            client_b.user_store.clone(),
1234            lang_registry.clone(),
1235            fs.clone(),
1236            &mut cx_b.to_async(),
1237        )
1238        .await
1239        .unwrap();
1240        project_b
1241            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1242            .await
1243            .unwrap();
1244
1245        // Unshare the project as client A
1246        project_a
1247            .update(cx_a, |project, cx| project.unshare(cx))
1248            .await
1249            .unwrap();
1250        project_b
1251            .condition(cx_b, |project, _| project.is_read_only())
1252            .await;
1253        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1254        cx_b.update(|_| {
1255            drop(project_b);
1256        });
1257
1258        // Share the project again and ensure guests can still join.
1259        project_a
1260            .update(cx_a, |project, cx| project.share(cx))
1261            .await
1262            .unwrap();
1263        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1264
1265        let project_b2 = Project::remote(
1266            project_id,
1267            client_b.clone(),
1268            client_b.user_store.clone(),
1269            lang_registry.clone(),
1270            fs.clone(),
1271            &mut cx_b.to_async(),
1272        )
1273        .await
1274        .unwrap();
1275        project_b2
1276            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1277            .await
1278            .unwrap();
1279    }
1280
1281    #[gpui::test(iterations = 10)]
1282    async fn test_propagate_saves_and_fs_changes(
1283        cx_a: &mut TestAppContext,
1284        cx_b: &mut TestAppContext,
1285        cx_c: &mut TestAppContext,
1286    ) {
1287        let lang_registry = Arc::new(LanguageRegistry::test());
1288        let fs = FakeFs::new(cx_a.background());
1289        cx_a.foreground().forbid_parking();
1290
1291        // Connect to a server as 3 clients.
1292        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1293        let client_a = server.create_client(cx_a, "user_a").await;
1294        let client_b = server.create_client(cx_b, "user_b").await;
1295        let client_c = server.create_client(cx_c, "user_c").await;
1296
1297        // Share a worktree as client A.
1298        fs.insert_tree(
1299            "/a",
1300            json!({
1301                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1302                "file1": "",
1303                "file2": ""
1304            }),
1305        )
1306        .await;
1307        let project_a = cx_a.update(|cx| {
1308            Project::local(
1309                client_a.clone(),
1310                client_a.user_store.clone(),
1311                lang_registry.clone(),
1312                fs.clone(),
1313                cx,
1314            )
1315        });
1316        let (worktree_a, _) = project_a
1317            .update(cx_a, |p, cx| {
1318                p.find_or_create_local_worktree("/a", true, cx)
1319            })
1320            .await
1321            .unwrap();
1322        worktree_a
1323            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1324            .await;
1325        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1326        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1327        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1328
1329        // Join that worktree as clients B and C.
1330        let project_b = Project::remote(
1331            project_id,
1332            client_b.clone(),
1333            client_b.user_store.clone(),
1334            lang_registry.clone(),
1335            fs.clone(),
1336            &mut cx_b.to_async(),
1337        )
1338        .await
1339        .unwrap();
1340        let project_c = Project::remote(
1341            project_id,
1342            client_c.clone(),
1343            client_c.user_store.clone(),
1344            lang_registry.clone(),
1345            fs.clone(),
1346            &mut cx_c.to_async(),
1347        )
1348        .await
1349        .unwrap();
1350        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1351        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1352
1353        // Open and edit a buffer as both guests B and C.
1354        let buffer_b = project_b
1355            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1356            .await
1357            .unwrap();
1358        let buffer_c = project_c
1359            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1360            .await
1361            .unwrap();
1362        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1363        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1364
1365        // Open and edit that buffer as the host.
1366        let buffer_a = project_a
1367            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1368            .await
1369            .unwrap();
1370
1371        buffer_a
1372            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1373            .await;
1374        buffer_a.update(cx_a, |buf, cx| {
1375            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1376        });
1377
1378        // Wait for edits to propagate
1379        buffer_a
1380            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1381            .await;
1382        buffer_b
1383            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1384            .await;
1385        buffer_c
1386            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1387            .await;
1388
1389        // Edit the buffer as the host and concurrently save as guest B.
1390        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1391        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1392        save_b.await.unwrap();
1393        assert_eq!(
1394            fs.load("/a/file1".as_ref()).await.unwrap(),
1395            "hi-a, i-am-c, i-am-b, i-am-a"
1396        );
1397        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1398        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1399        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1400
1401        worktree_a.flush_fs_events(cx_a).await;
1402
1403        // Make changes on host's file system, see those changes on guest worktrees.
1404        fs.rename(
1405            "/a/file1".as_ref(),
1406            "/a/file1-renamed".as_ref(),
1407            Default::default(),
1408        )
1409        .await
1410        .unwrap();
1411
1412        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1413            .await
1414            .unwrap();
1415        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1416
1417        worktree_a
1418            .condition(&cx_a, |tree, _| {
1419                tree.paths()
1420                    .map(|p| p.to_string_lossy())
1421                    .collect::<Vec<_>>()
1422                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1423            })
1424            .await;
1425        worktree_b
1426            .condition(&cx_b, |tree, _| {
1427                tree.paths()
1428                    .map(|p| p.to_string_lossy())
1429                    .collect::<Vec<_>>()
1430                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1431            })
1432            .await;
1433        worktree_c
1434            .condition(&cx_c, |tree, _| {
1435                tree.paths()
1436                    .map(|p| p.to_string_lossy())
1437                    .collect::<Vec<_>>()
1438                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1439            })
1440            .await;
1441
1442        // Ensure buffer files are updated as well.
1443        buffer_a
1444            .condition(&cx_a, |buf, _| {
1445                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1446            })
1447            .await;
1448        buffer_b
1449            .condition(&cx_b, |buf, _| {
1450                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1451            })
1452            .await;
1453        buffer_c
1454            .condition(&cx_c, |buf, _| {
1455                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1456            })
1457            .await;
1458    }
1459
1460    #[gpui::test(iterations = 10)]
1461    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1462        cx_a.foreground().forbid_parking();
1463        let lang_registry = Arc::new(LanguageRegistry::test());
1464        let fs = FakeFs::new(cx_a.background());
1465
1466        // Connect to a server as 2 clients.
1467        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1468        let client_a = server.create_client(cx_a, "user_a").await;
1469        let client_b = server.create_client(cx_b, "user_b").await;
1470
1471        // Share a project as client A
1472        fs.insert_tree(
1473            "/dir",
1474            json!({
1475                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1476                "a.txt": "a-contents",
1477            }),
1478        )
1479        .await;
1480
1481        let project_a = cx_a.update(|cx| {
1482            Project::local(
1483                client_a.clone(),
1484                client_a.user_store.clone(),
1485                lang_registry.clone(),
1486                fs.clone(),
1487                cx,
1488            )
1489        });
1490        let (worktree_a, _) = project_a
1491            .update(cx_a, |p, cx| {
1492                p.find_or_create_local_worktree("/dir", true, cx)
1493            })
1494            .await
1495            .unwrap();
1496        worktree_a
1497            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1498            .await;
1499        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1500        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1501        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1502
1503        // Join that project as client B
1504        let project_b = Project::remote(
1505            project_id,
1506            client_b.clone(),
1507            client_b.user_store.clone(),
1508            lang_registry.clone(),
1509            fs.clone(),
1510            &mut cx_b.to_async(),
1511        )
1512        .await
1513        .unwrap();
1514
1515        // Open a buffer as client B
1516        let buffer_b = project_b
1517            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1518            .await
1519            .unwrap();
1520
1521        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1522        buffer_b.read_with(cx_b, |buf, _| {
1523            assert!(buf.is_dirty());
1524            assert!(!buf.has_conflict());
1525        });
1526
1527        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1528        buffer_b
1529            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1530            .await;
1531        buffer_b.read_with(cx_b, |buf, _| {
1532            assert!(!buf.has_conflict());
1533        });
1534
1535        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1536        buffer_b.read_with(cx_b, |buf, _| {
1537            assert!(buf.is_dirty());
1538            assert!(!buf.has_conflict());
1539        });
1540    }
1541
1542    #[gpui::test(iterations = 10)]
1543    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1544        cx_a.foreground().forbid_parking();
1545        let lang_registry = Arc::new(LanguageRegistry::test());
1546        let fs = FakeFs::new(cx_a.background());
1547
1548        // Connect to a server as 2 clients.
1549        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1550        let client_a = server.create_client(cx_a, "user_a").await;
1551        let client_b = server.create_client(cx_b, "user_b").await;
1552
1553        // Share a project as client A
1554        fs.insert_tree(
1555            "/dir",
1556            json!({
1557                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1558                "a.txt": "a-contents",
1559            }),
1560        )
1561        .await;
1562
1563        let project_a = cx_a.update(|cx| {
1564            Project::local(
1565                client_a.clone(),
1566                client_a.user_store.clone(),
1567                lang_registry.clone(),
1568                fs.clone(),
1569                cx,
1570            )
1571        });
1572        let (worktree_a, _) = project_a
1573            .update(cx_a, |p, cx| {
1574                p.find_or_create_local_worktree("/dir", true, cx)
1575            })
1576            .await
1577            .unwrap();
1578        worktree_a
1579            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1580            .await;
1581        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1582        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1583        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1584
1585        // Join that project as client B
1586        let project_b = Project::remote(
1587            project_id,
1588            client_b.clone(),
1589            client_b.user_store.clone(),
1590            lang_registry.clone(),
1591            fs.clone(),
1592            &mut cx_b.to_async(),
1593        )
1594        .await
1595        .unwrap();
1596        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1597
1598        // Open a buffer as client B
1599        let buffer_b = project_b
1600            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1601            .await
1602            .unwrap();
1603        buffer_b.read_with(cx_b, |buf, _| {
1604            assert!(!buf.is_dirty());
1605            assert!(!buf.has_conflict());
1606        });
1607
1608        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1609            .await
1610            .unwrap();
1611        buffer_b
1612            .condition(&cx_b, |buf, _| {
1613                buf.text() == "new contents" && !buf.is_dirty()
1614            })
1615            .await;
1616        buffer_b.read_with(cx_b, |buf, _| {
1617            assert!(!buf.has_conflict());
1618        });
1619    }
1620
1621    #[gpui::test(iterations = 10)]
1622    async fn test_editing_while_guest_opens_buffer(
1623        cx_a: &mut TestAppContext,
1624        cx_b: &mut TestAppContext,
1625    ) {
1626        cx_a.foreground().forbid_parking();
1627        let lang_registry = Arc::new(LanguageRegistry::test());
1628        let fs = FakeFs::new(cx_a.background());
1629
1630        // Connect to a server as 2 clients.
1631        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1632        let client_a = server.create_client(cx_a, "user_a").await;
1633        let client_b = server.create_client(cx_b, "user_b").await;
1634
1635        // Share a project as client A
1636        fs.insert_tree(
1637            "/dir",
1638            json!({
1639                ".zed.toml": r#"collaborators = ["user_b"]"#,
1640                "a.txt": "a-contents",
1641            }),
1642        )
1643        .await;
1644        let project_a = cx_a.update(|cx| {
1645            Project::local(
1646                client_a.clone(),
1647                client_a.user_store.clone(),
1648                lang_registry.clone(),
1649                fs.clone(),
1650                cx,
1651            )
1652        });
1653        let (worktree_a, _) = project_a
1654            .update(cx_a, |p, cx| {
1655                p.find_or_create_local_worktree("/dir", true, cx)
1656            })
1657            .await
1658            .unwrap();
1659        worktree_a
1660            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1661            .await;
1662        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1663        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1664        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1665
1666        // Join that project as client B
1667        let project_b = Project::remote(
1668            project_id,
1669            client_b.clone(),
1670            client_b.user_store.clone(),
1671            lang_registry.clone(),
1672            fs.clone(),
1673            &mut cx_b.to_async(),
1674        )
1675        .await
1676        .unwrap();
1677
1678        // Open a buffer as client A
1679        let buffer_a = project_a
1680            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1681            .await
1682            .unwrap();
1683
1684        // Start opening the same buffer as client B
1685        let buffer_b = cx_b
1686            .background()
1687            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1688
1689        // Edit the buffer as client A while client B is still opening it.
1690        cx_b.background().simulate_random_delay().await;
1691        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1692        cx_b.background().simulate_random_delay().await;
1693        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1694
1695        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1696        let buffer_b = buffer_b.await.unwrap();
1697        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1698    }
1699
1700    #[gpui::test(iterations = 10)]
1701    async fn test_leaving_worktree_while_opening_buffer(
1702        cx_a: &mut TestAppContext,
1703        cx_b: &mut TestAppContext,
1704    ) {
1705        cx_a.foreground().forbid_parking();
1706        let lang_registry = Arc::new(LanguageRegistry::test());
1707        let fs = FakeFs::new(cx_a.background());
1708
1709        // Connect to a server as 2 clients.
1710        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1711        let client_a = server.create_client(cx_a, "user_a").await;
1712        let client_b = server.create_client(cx_b, "user_b").await;
1713
1714        // Share a project as client A
1715        fs.insert_tree(
1716            "/dir",
1717            json!({
1718                ".zed.toml": r#"collaborators = ["user_b"]"#,
1719                "a.txt": "a-contents",
1720            }),
1721        )
1722        .await;
1723        let project_a = cx_a.update(|cx| {
1724            Project::local(
1725                client_a.clone(),
1726                client_a.user_store.clone(),
1727                lang_registry.clone(),
1728                fs.clone(),
1729                cx,
1730            )
1731        });
1732        let (worktree_a, _) = project_a
1733            .update(cx_a, |p, cx| {
1734                p.find_or_create_local_worktree("/dir", true, cx)
1735            })
1736            .await
1737            .unwrap();
1738        worktree_a
1739            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1740            .await;
1741        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1742        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1743        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1744
1745        // Join that project as client B
1746        let project_b = Project::remote(
1747            project_id,
1748            client_b.clone(),
1749            client_b.user_store.clone(),
1750            lang_registry.clone(),
1751            fs.clone(),
1752            &mut cx_b.to_async(),
1753        )
1754        .await
1755        .unwrap();
1756
1757        // See that a guest has joined as client A.
1758        project_a
1759            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1760            .await;
1761
1762        // Begin opening a buffer as client B, but leave the project before the open completes.
1763        let buffer_b = cx_b
1764            .background()
1765            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1766        cx_b.update(|_| drop(project_b));
1767        drop(buffer_b);
1768
1769        // See that the guest has left.
1770        project_a
1771            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1772            .await;
1773    }
1774
1775    #[gpui::test(iterations = 10)]
1776    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1777        cx_a.foreground().forbid_parking();
1778        let lang_registry = Arc::new(LanguageRegistry::test());
1779        let fs = FakeFs::new(cx_a.background());
1780
1781        // Connect to a server as 2 clients.
1782        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1783        let client_a = server.create_client(cx_a, "user_a").await;
1784        let client_b = server.create_client(cx_b, "user_b").await;
1785
1786        // Share a project as client A
1787        fs.insert_tree(
1788            "/a",
1789            json!({
1790                ".zed.toml": r#"collaborators = ["user_b"]"#,
1791                "a.txt": "a-contents",
1792                "b.txt": "b-contents",
1793            }),
1794        )
1795        .await;
1796        let project_a = cx_a.update(|cx| {
1797            Project::local(
1798                client_a.clone(),
1799                client_a.user_store.clone(),
1800                lang_registry.clone(),
1801                fs.clone(),
1802                cx,
1803            )
1804        });
1805        let (worktree_a, _) = project_a
1806            .update(cx_a, |p, cx| {
1807                p.find_or_create_local_worktree("/a", true, cx)
1808            })
1809            .await
1810            .unwrap();
1811        worktree_a
1812            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1813            .await;
1814        let project_id = project_a
1815            .update(cx_a, |project, _| project.next_remote_id())
1816            .await;
1817        project_a
1818            .update(cx_a, |project, cx| project.share(cx))
1819            .await
1820            .unwrap();
1821
1822        // Join that project as client B
1823        let _project_b = Project::remote(
1824            project_id,
1825            client_b.clone(),
1826            client_b.user_store.clone(),
1827            lang_registry.clone(),
1828            fs.clone(),
1829            &mut cx_b.to_async(),
1830        )
1831        .await
1832        .unwrap();
1833
1834        // Client A sees that a guest has joined.
1835        project_a
1836            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1837            .await;
1838
1839        // Drop client B's connection and ensure client A observes client B leaving the project.
1840        client_b.disconnect(&cx_b.to_async()).unwrap();
1841        project_a
1842            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1843            .await;
1844
1845        // Rejoin the project as client B
1846        let _project_b = Project::remote(
1847            project_id,
1848            client_b.clone(),
1849            client_b.user_store.clone(),
1850            lang_registry.clone(),
1851            fs.clone(),
1852            &mut cx_b.to_async(),
1853        )
1854        .await
1855        .unwrap();
1856
1857        // Client A sees that a guest has re-joined.
1858        project_a
1859            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1860            .await;
1861
1862        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1863        client_b.wait_for_current_user(cx_b).await;
1864        server.disconnect_client(client_b.current_user_id(cx_b));
1865        cx_a.foreground().advance_clock(Duration::from_secs(3));
1866        project_a
1867            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1868            .await;
1869    }
1870
1871    #[gpui::test(iterations = 10)]
1872    async fn test_collaborating_with_diagnostics(
1873        cx_a: &mut TestAppContext,
1874        cx_b: &mut TestAppContext,
1875    ) {
1876        cx_a.foreground().forbid_parking();
1877        let mut lang_registry = Arc::new(LanguageRegistry::test());
1878        let fs = FakeFs::new(cx_a.background());
1879
1880        // Set up a fake language server.
1881        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1882        Arc::get_mut(&mut lang_registry)
1883            .unwrap()
1884            .add(Arc::new(Language::new(
1885                LanguageConfig {
1886                    name: "Rust".into(),
1887                    path_suffixes: vec!["rs".to_string()],
1888                    language_server: Some(language_server_config),
1889                    ..Default::default()
1890                },
1891                Some(tree_sitter_rust::language()),
1892            )));
1893
1894        // Connect to a server as 2 clients.
1895        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1896        let client_a = server.create_client(cx_a, "user_a").await;
1897        let client_b = server.create_client(cx_b, "user_b").await;
1898
1899        // Share a project as client A
1900        fs.insert_tree(
1901            "/a",
1902            json!({
1903                ".zed.toml": r#"collaborators = ["user_b"]"#,
1904                "a.rs": "let one = two",
1905                "other.rs": "",
1906            }),
1907        )
1908        .await;
1909        let project_a = cx_a.update(|cx| {
1910            Project::local(
1911                client_a.clone(),
1912                client_a.user_store.clone(),
1913                lang_registry.clone(),
1914                fs.clone(),
1915                cx,
1916            )
1917        });
1918        let (worktree_a, _) = project_a
1919            .update(cx_a, |p, cx| {
1920                p.find_or_create_local_worktree("/a", true, cx)
1921            })
1922            .await
1923            .unwrap();
1924        worktree_a
1925            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1926            .await;
1927        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1928        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1929        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1930
1931        // Cause the language server to start.
1932        let _ = cx_a
1933            .background()
1934            .spawn(project_a.update(cx_a, |project, cx| {
1935                project.open_buffer(
1936                    ProjectPath {
1937                        worktree_id,
1938                        path: Path::new("other.rs").into(),
1939                    },
1940                    cx,
1941                )
1942            }))
1943            .await
1944            .unwrap();
1945
1946        // Simulate a language server reporting errors for a file.
1947        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1948        fake_language_server
1949            .receive_notification::<lsp::notification::DidOpenTextDocument>()
1950            .await;
1951        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1952            lsp::PublishDiagnosticsParams {
1953                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1954                version: None,
1955                diagnostics: vec![lsp::Diagnostic {
1956                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1957                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1958                    message: "message 1".to_string(),
1959                    ..Default::default()
1960                }],
1961            },
1962        );
1963
1964        // Wait for server to see the diagnostics update.
1965        server
1966            .condition(|store| {
1967                let worktree = store
1968                    .project(project_id)
1969                    .unwrap()
1970                    .share
1971                    .as_ref()
1972                    .unwrap()
1973                    .worktrees
1974                    .get(&worktree_id.to_proto())
1975                    .unwrap();
1976
1977                !worktree.diagnostic_summaries.is_empty()
1978            })
1979            .await;
1980
1981        // Join the worktree as client B.
1982        let project_b = Project::remote(
1983            project_id,
1984            client_b.clone(),
1985            client_b.user_store.clone(),
1986            lang_registry.clone(),
1987            fs.clone(),
1988            &mut cx_b.to_async(),
1989        )
1990        .await
1991        .unwrap();
1992
1993        project_b.read_with(cx_b, |project, cx| {
1994            assert_eq!(
1995                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1996                &[(
1997                    ProjectPath {
1998                        worktree_id,
1999                        path: Arc::from(Path::new("a.rs")),
2000                    },
2001                    DiagnosticSummary {
2002                        error_count: 1,
2003                        warning_count: 0,
2004                        ..Default::default()
2005                    },
2006                )]
2007            )
2008        });
2009
2010        // Simulate a language server reporting more errors for a file.
2011        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2012            lsp::PublishDiagnosticsParams {
2013                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2014                version: None,
2015                diagnostics: vec![
2016                    lsp::Diagnostic {
2017                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2018                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2019                        message: "message 1".to_string(),
2020                        ..Default::default()
2021                    },
2022                    lsp::Diagnostic {
2023                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2024                        range: lsp::Range::new(
2025                            lsp::Position::new(0, 10),
2026                            lsp::Position::new(0, 13),
2027                        ),
2028                        message: "message 2".to_string(),
2029                        ..Default::default()
2030                    },
2031                ],
2032            },
2033        );
2034
2035        // Client b gets the updated summaries
2036        project_b
2037            .condition(&cx_b, |project, cx| {
2038                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2039                    == &[(
2040                        ProjectPath {
2041                            worktree_id,
2042                            path: Arc::from(Path::new("a.rs")),
2043                        },
2044                        DiagnosticSummary {
2045                            error_count: 1,
2046                            warning_count: 1,
2047                            ..Default::default()
2048                        },
2049                    )]
2050            })
2051            .await;
2052
2053        // Open the file with the errors on client B. They should be present.
2054        let buffer_b = cx_b
2055            .background()
2056            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2057            .await
2058            .unwrap();
2059
2060        buffer_b.read_with(cx_b, |buffer, _| {
2061            assert_eq!(
2062                buffer
2063                    .snapshot()
2064                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2065                    .map(|entry| entry)
2066                    .collect::<Vec<_>>(),
2067                &[
2068                    DiagnosticEntry {
2069                        range: Point::new(0, 4)..Point::new(0, 7),
2070                        diagnostic: Diagnostic {
2071                            group_id: 0,
2072                            message: "message 1".to_string(),
2073                            severity: lsp::DiagnosticSeverity::ERROR,
2074                            is_primary: true,
2075                            ..Default::default()
2076                        }
2077                    },
2078                    DiagnosticEntry {
2079                        range: Point::new(0, 10)..Point::new(0, 13),
2080                        diagnostic: Diagnostic {
2081                            group_id: 1,
2082                            severity: lsp::DiagnosticSeverity::WARNING,
2083                            message: "message 2".to_string(),
2084                            is_primary: true,
2085                            ..Default::default()
2086                        }
2087                    }
2088                ]
2089            );
2090        });
2091    }
2092
2093    #[gpui::test(iterations = 10)]
2094    async fn test_collaborating_with_completion(
2095        cx_a: &mut TestAppContext,
2096        cx_b: &mut TestAppContext,
2097    ) {
2098        cx_a.foreground().forbid_parking();
2099        let mut lang_registry = Arc::new(LanguageRegistry::test());
2100        let fs = FakeFs::new(cx_a.background());
2101
2102        // Set up a fake language server.
2103        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2104        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2105            completion_provider: Some(lsp::CompletionOptions {
2106                trigger_characters: Some(vec![".".to_string()]),
2107                ..Default::default()
2108            }),
2109            ..Default::default()
2110        });
2111        Arc::get_mut(&mut lang_registry)
2112            .unwrap()
2113            .add(Arc::new(Language::new(
2114                LanguageConfig {
2115                    name: "Rust".into(),
2116                    path_suffixes: vec!["rs".to_string()],
2117                    language_server: Some(language_server_config),
2118                    ..Default::default()
2119                },
2120                Some(tree_sitter_rust::language()),
2121            )));
2122
2123        // Connect to a server as 2 clients.
2124        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2125        let client_a = server.create_client(cx_a, "user_a").await;
2126        let client_b = server.create_client(cx_b, "user_b").await;
2127
2128        // Share a project as client A
2129        fs.insert_tree(
2130            "/a",
2131            json!({
2132                ".zed.toml": r#"collaborators = ["user_b"]"#,
2133                "main.rs": "fn main() { a }",
2134                "other.rs": "",
2135            }),
2136        )
2137        .await;
2138        let project_a = cx_a.update(|cx| {
2139            Project::local(
2140                client_a.clone(),
2141                client_a.user_store.clone(),
2142                lang_registry.clone(),
2143                fs.clone(),
2144                cx,
2145            )
2146        });
2147        let (worktree_a, _) = project_a
2148            .update(cx_a, |p, cx| {
2149                p.find_or_create_local_worktree("/a", true, cx)
2150            })
2151            .await
2152            .unwrap();
2153        worktree_a
2154            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2155            .await;
2156        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2157        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2158        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2159
2160        // Join the worktree as client B.
2161        let project_b = Project::remote(
2162            project_id,
2163            client_b.clone(),
2164            client_b.user_store.clone(),
2165            lang_registry.clone(),
2166            fs.clone(),
2167            &mut cx_b.to_async(),
2168        )
2169        .await
2170        .unwrap();
2171
2172        // Open a file in an editor as the guest.
2173        let buffer_b = project_b
2174            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2175            .await
2176            .unwrap();
2177        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2178        let editor_b = cx_b.add_view(window_b, |cx| {
2179            Editor::for_buffer(
2180                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2181                Some(project_b.clone()),
2182                cx,
2183            )
2184        });
2185
2186        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2187        buffer_b
2188            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2189            .await;
2190
2191        // Type a completion trigger character as the guest.
2192        editor_b.update(cx_b, |editor, cx| {
2193            editor.select_ranges([13..13], None, cx);
2194            editor.handle_input(&Input(".".into()), cx);
2195            cx.focus(&editor_b);
2196        });
2197
2198        // Receive a completion request as the host's language server.
2199        // Return some completions from the host's language server.
2200        cx_a.foreground().start_waiting();
2201        fake_language_server
2202            .handle_request::<lsp::request::Completion, _>(|params, _| {
2203                assert_eq!(
2204                    params.text_document_position.text_document.uri,
2205                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2206                );
2207                assert_eq!(
2208                    params.text_document_position.position,
2209                    lsp::Position::new(0, 14),
2210                );
2211
2212                Some(lsp::CompletionResponse::Array(vec![
2213                    lsp::CompletionItem {
2214                        label: "first_method(…)".into(),
2215                        detail: Some("fn(&mut self, B) -> C".into()),
2216                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2217                            new_text: "first_method($1)".to_string(),
2218                            range: lsp::Range::new(
2219                                lsp::Position::new(0, 14),
2220                                lsp::Position::new(0, 14),
2221                            ),
2222                        })),
2223                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2224                        ..Default::default()
2225                    },
2226                    lsp::CompletionItem {
2227                        label: "second_method(…)".into(),
2228                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2229                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2230                            new_text: "second_method()".to_string(),
2231                            range: lsp::Range::new(
2232                                lsp::Position::new(0, 14),
2233                                lsp::Position::new(0, 14),
2234                            ),
2235                        })),
2236                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2237                        ..Default::default()
2238                    },
2239                ]))
2240            })
2241            .next()
2242            .await
2243            .unwrap();
2244        cx_a.foreground().finish_waiting();
2245
2246        // Open the buffer on the host.
2247        let buffer_a = project_a
2248            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2249            .await
2250            .unwrap();
2251        buffer_a
2252            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2253            .await;
2254
2255        // Confirm a completion on the guest.
2256        editor_b
2257            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2258            .await;
2259        editor_b.update(cx_b, |editor, cx| {
2260            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2261            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2262        });
2263
2264        // Return a resolved completion from the host's language server.
2265        // The resolved completion has an additional text edit.
2266        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2267            |params, _| {
2268                assert_eq!(params.label, "first_method(…)");
2269                lsp::CompletionItem {
2270                    label: "first_method(…)".into(),
2271                    detail: Some("fn(&mut self, B) -> C".into()),
2272                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2273                        new_text: "first_method($1)".to_string(),
2274                        range: lsp::Range::new(
2275                            lsp::Position::new(0, 14),
2276                            lsp::Position::new(0, 14),
2277                        ),
2278                    })),
2279                    additional_text_edits: Some(vec![lsp::TextEdit {
2280                        new_text: "use d::SomeTrait;\n".to_string(),
2281                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2282                    }]),
2283                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2284                    ..Default::default()
2285                }
2286            },
2287        );
2288
2289        // The additional edit is applied.
2290        buffer_a
2291            .condition(&cx_a, |buffer, _| {
2292                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2293            })
2294            .await;
2295        buffer_b
2296            .condition(&cx_b, |buffer, _| {
2297                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2298            })
2299            .await;
2300    }
2301
2302    #[gpui::test(iterations = 10)]
2303    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2304        cx_a.foreground().forbid_parking();
2305        let mut lang_registry = Arc::new(LanguageRegistry::test());
2306        let fs = FakeFs::new(cx_a.background());
2307
2308        // Set up a fake language server.
2309        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2310        Arc::get_mut(&mut lang_registry)
2311            .unwrap()
2312            .add(Arc::new(Language::new(
2313                LanguageConfig {
2314                    name: "Rust".into(),
2315                    path_suffixes: vec!["rs".to_string()],
2316                    language_server: Some(language_server_config),
2317                    ..Default::default()
2318                },
2319                Some(tree_sitter_rust::language()),
2320            )));
2321
2322        // Connect to a server as 2 clients.
2323        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2324        let client_a = server.create_client(cx_a, "user_a").await;
2325        let client_b = server.create_client(cx_b, "user_b").await;
2326
2327        // Share a project as client A
2328        fs.insert_tree(
2329            "/a",
2330            json!({
2331                ".zed.toml": r#"collaborators = ["user_b"]"#,
2332                "a.rs": "let one = two",
2333            }),
2334        )
2335        .await;
2336        let project_a = cx_a.update(|cx| {
2337            Project::local(
2338                client_a.clone(),
2339                client_a.user_store.clone(),
2340                lang_registry.clone(),
2341                fs.clone(),
2342                cx,
2343            )
2344        });
2345        let (worktree_a, _) = project_a
2346            .update(cx_a, |p, cx| {
2347                p.find_or_create_local_worktree("/a", true, cx)
2348            })
2349            .await
2350            .unwrap();
2351        worktree_a
2352            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2353            .await;
2354        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2355        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2356        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2357
2358        // Join the worktree as client B.
2359        let project_b = Project::remote(
2360            project_id,
2361            client_b.clone(),
2362            client_b.user_store.clone(),
2363            lang_registry.clone(),
2364            fs.clone(),
2365            &mut cx_b.to_async(),
2366        )
2367        .await
2368        .unwrap();
2369
2370        let buffer_b = cx_b
2371            .background()
2372            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2373            .await
2374            .unwrap();
2375
2376        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2377        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2378            Some(vec![
2379                lsp::TextEdit {
2380                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2381                    new_text: "h".to_string(),
2382                },
2383                lsp::TextEdit {
2384                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2385                    new_text: "y".to_string(),
2386                },
2387            ])
2388        });
2389
2390        project_b
2391            .update(cx_b, |project, cx| {
2392                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2393            })
2394            .await
2395            .unwrap();
2396        assert_eq!(
2397            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2398            "let honey = two"
2399        );
2400    }
2401
2402    #[gpui::test(iterations = 10)]
2403    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2404        cx_a.foreground().forbid_parking();
2405        let mut lang_registry = Arc::new(LanguageRegistry::test());
2406        let fs = FakeFs::new(cx_a.background());
2407        fs.insert_tree(
2408            "/root-1",
2409            json!({
2410                ".zed.toml": r#"collaborators = ["user_b"]"#,
2411                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2412            }),
2413        )
2414        .await;
2415        fs.insert_tree(
2416            "/root-2",
2417            json!({
2418                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2419            }),
2420        )
2421        .await;
2422
2423        // Set up a fake language server.
2424        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2425        Arc::get_mut(&mut lang_registry)
2426            .unwrap()
2427            .add(Arc::new(Language::new(
2428                LanguageConfig {
2429                    name: "Rust".into(),
2430                    path_suffixes: vec!["rs".to_string()],
2431                    language_server: Some(language_server_config),
2432                    ..Default::default()
2433                },
2434                Some(tree_sitter_rust::language()),
2435            )));
2436
2437        // Connect to a server as 2 clients.
2438        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2439        let client_a = server.create_client(cx_a, "user_a").await;
2440        let client_b = server.create_client(cx_b, "user_b").await;
2441
2442        // Share a project as client A
2443        let project_a = cx_a.update(|cx| {
2444            Project::local(
2445                client_a.clone(),
2446                client_a.user_store.clone(),
2447                lang_registry.clone(),
2448                fs.clone(),
2449                cx,
2450            )
2451        });
2452        let (worktree_a, _) = project_a
2453            .update(cx_a, |p, cx| {
2454                p.find_or_create_local_worktree("/root-1", true, cx)
2455            })
2456            .await
2457            .unwrap();
2458        worktree_a
2459            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2460            .await;
2461        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2462        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2463        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2464
2465        // Join the worktree as client B.
2466        let project_b = Project::remote(
2467            project_id,
2468            client_b.clone(),
2469            client_b.user_store.clone(),
2470            lang_registry.clone(),
2471            fs.clone(),
2472            &mut cx_b.to_async(),
2473        )
2474        .await
2475        .unwrap();
2476
2477        // Open the file on client B.
2478        let buffer_b = cx_b
2479            .background()
2480            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2481            .await
2482            .unwrap();
2483
2484        // Request the definition of a symbol as the guest.
2485        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2486        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2487            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2488                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2489                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2490            )))
2491        });
2492
2493        let definitions_1 = project_b
2494            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2495            .await
2496            .unwrap();
2497        cx_b.read(|cx| {
2498            assert_eq!(definitions_1.len(), 1);
2499            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2500            let target_buffer = definitions_1[0].buffer.read(cx);
2501            assert_eq!(
2502                target_buffer.text(),
2503                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2504            );
2505            assert_eq!(
2506                definitions_1[0].range.to_point(target_buffer),
2507                Point::new(0, 6)..Point::new(0, 9)
2508            );
2509        });
2510
2511        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2512        // the previous call to `definition`.
2513        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2514            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2515                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2516                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2517            )))
2518        });
2519
2520        let definitions_2 = project_b
2521            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2522            .await
2523            .unwrap();
2524        cx_b.read(|cx| {
2525            assert_eq!(definitions_2.len(), 1);
2526            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2527            let target_buffer = definitions_2[0].buffer.read(cx);
2528            assert_eq!(
2529                target_buffer.text(),
2530                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2531            );
2532            assert_eq!(
2533                definitions_2[0].range.to_point(target_buffer),
2534                Point::new(1, 6)..Point::new(1, 11)
2535            );
2536        });
2537        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2538    }
2539
2540    #[gpui::test(iterations = 10)]
2541    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2542        cx_a.foreground().forbid_parking();
2543        let mut lang_registry = Arc::new(LanguageRegistry::test());
2544        let fs = FakeFs::new(cx_a.background());
2545        fs.insert_tree(
2546            "/root-1",
2547            json!({
2548                ".zed.toml": r#"collaborators = ["user_b"]"#,
2549                "one.rs": "const ONE: usize = 1;",
2550                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2551            }),
2552        )
2553        .await;
2554        fs.insert_tree(
2555            "/root-2",
2556            json!({
2557                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2558            }),
2559        )
2560        .await;
2561
2562        // Set up a fake language server.
2563        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2564        Arc::get_mut(&mut lang_registry)
2565            .unwrap()
2566            .add(Arc::new(Language::new(
2567                LanguageConfig {
2568                    name: "Rust".into(),
2569                    path_suffixes: vec!["rs".to_string()],
2570                    language_server: Some(language_server_config),
2571                    ..Default::default()
2572                },
2573                Some(tree_sitter_rust::language()),
2574            )));
2575
2576        // Connect to a server as 2 clients.
2577        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2578        let client_a = server.create_client(cx_a, "user_a").await;
2579        let client_b = server.create_client(cx_b, "user_b").await;
2580
2581        // Share a project as client A
2582        let project_a = cx_a.update(|cx| {
2583            Project::local(
2584                client_a.clone(),
2585                client_a.user_store.clone(),
2586                lang_registry.clone(),
2587                fs.clone(),
2588                cx,
2589            )
2590        });
2591        let (worktree_a, _) = project_a
2592            .update(cx_a, |p, cx| {
2593                p.find_or_create_local_worktree("/root-1", true, cx)
2594            })
2595            .await
2596            .unwrap();
2597        worktree_a
2598            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2599            .await;
2600        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2601        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2602        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2603
2604        // Join the worktree as client B.
2605        let project_b = Project::remote(
2606            project_id,
2607            client_b.clone(),
2608            client_b.user_store.clone(),
2609            lang_registry.clone(),
2610            fs.clone(),
2611            &mut cx_b.to_async(),
2612        )
2613        .await
2614        .unwrap();
2615
2616        // Open the file on client B.
2617        let buffer_b = cx_b
2618            .background()
2619            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2620            .await
2621            .unwrap();
2622
2623        // Request references to a symbol as the guest.
2624        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2625        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2626            assert_eq!(
2627                params.text_document_position.text_document.uri.as_str(),
2628                "file:///root-1/one.rs"
2629            );
2630            Some(vec![
2631                lsp::Location {
2632                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2633                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2634                },
2635                lsp::Location {
2636                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2637                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2638                },
2639                lsp::Location {
2640                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2641                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2642                },
2643            ])
2644        });
2645
2646        let references = project_b
2647            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2648            .await
2649            .unwrap();
2650        cx_b.read(|cx| {
2651            assert_eq!(references.len(), 3);
2652            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2653
2654            let two_buffer = references[0].buffer.read(cx);
2655            let three_buffer = references[2].buffer.read(cx);
2656            assert_eq!(
2657                two_buffer.file().unwrap().path().as_ref(),
2658                Path::new("two.rs")
2659            );
2660            assert_eq!(references[1].buffer, references[0].buffer);
2661            assert_eq!(
2662                three_buffer.file().unwrap().full_path(cx),
2663                Path::new("three.rs")
2664            );
2665
2666            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2667            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2668            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2669        });
2670    }
2671
2672    #[gpui::test(iterations = 10)]
2673    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2674        cx_a.foreground().forbid_parking();
2675        let lang_registry = Arc::new(LanguageRegistry::test());
2676        let fs = FakeFs::new(cx_a.background());
2677        fs.insert_tree(
2678            "/root-1",
2679            json!({
2680                ".zed.toml": r#"collaborators = ["user_b"]"#,
2681                "a": "hello world",
2682                "b": "goodnight moon",
2683                "c": "a world of goo",
2684                "d": "world champion of clown world",
2685            }),
2686        )
2687        .await;
2688        fs.insert_tree(
2689            "/root-2",
2690            json!({
2691                "e": "disney world is fun",
2692            }),
2693        )
2694        .await;
2695
2696        // Connect to a server as 2 clients.
2697        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2698        let client_a = server.create_client(cx_a, "user_a").await;
2699        let client_b = server.create_client(cx_b, "user_b").await;
2700
2701        // Share a project as client A
2702        let project_a = cx_a.update(|cx| {
2703            Project::local(
2704                client_a.clone(),
2705                client_a.user_store.clone(),
2706                lang_registry.clone(),
2707                fs.clone(),
2708                cx,
2709            )
2710        });
2711        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2712
2713        let (worktree_1, _) = project_a
2714            .update(cx_a, |p, cx| {
2715                p.find_or_create_local_worktree("/root-1", true, cx)
2716            })
2717            .await
2718            .unwrap();
2719        worktree_1
2720            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2721            .await;
2722        let (worktree_2, _) = project_a
2723            .update(cx_a, |p, cx| {
2724                p.find_or_create_local_worktree("/root-2", true, cx)
2725            })
2726            .await
2727            .unwrap();
2728        worktree_2
2729            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2730            .await;
2731
2732        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2733
2734        // Join the worktree as client B.
2735        let project_b = Project::remote(
2736            project_id,
2737            client_b.clone(),
2738            client_b.user_store.clone(),
2739            lang_registry.clone(),
2740            fs.clone(),
2741            &mut cx_b.to_async(),
2742        )
2743        .await
2744        .unwrap();
2745
2746        let results = project_b
2747            .update(cx_b, |project, cx| {
2748                project.search(SearchQuery::text("world", false, false), cx)
2749            })
2750            .await
2751            .unwrap();
2752
2753        let mut ranges_by_path = results
2754            .into_iter()
2755            .map(|(buffer, ranges)| {
2756                buffer.read_with(cx_b, |buffer, cx| {
2757                    let path = buffer.file().unwrap().full_path(cx);
2758                    let offset_ranges = ranges
2759                        .into_iter()
2760                        .map(|range| range.to_offset(buffer))
2761                        .collect::<Vec<_>>();
2762                    (path, offset_ranges)
2763                })
2764            })
2765            .collect::<Vec<_>>();
2766        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2767
2768        assert_eq!(
2769            ranges_by_path,
2770            &[
2771                (PathBuf::from("root-1/a"), vec![6..11]),
2772                (PathBuf::from("root-1/c"), vec![2..7]),
2773                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2774                (PathBuf::from("root-2/e"), vec![7..12]),
2775            ]
2776        );
2777    }
2778
2779    #[gpui::test(iterations = 10)]
2780    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2781        cx_a.foreground().forbid_parking();
2782        let lang_registry = Arc::new(LanguageRegistry::test());
2783        let fs = FakeFs::new(cx_a.background());
2784        fs.insert_tree(
2785            "/root-1",
2786            json!({
2787                ".zed.toml": r#"collaborators = ["user_b"]"#,
2788                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2789            }),
2790        )
2791        .await;
2792
2793        // Set up a fake language server.
2794        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2795        lang_registry.add(Arc::new(Language::new(
2796            LanguageConfig {
2797                name: "Rust".into(),
2798                path_suffixes: vec!["rs".to_string()],
2799                language_server: Some(language_server_config),
2800                ..Default::default()
2801            },
2802            Some(tree_sitter_rust::language()),
2803        )));
2804
2805        // Connect to a server as 2 clients.
2806        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2807        let client_a = server.create_client(cx_a, "user_a").await;
2808        let client_b = server.create_client(cx_b, "user_b").await;
2809
2810        // Share a project as client A
2811        let project_a = cx_a.update(|cx| {
2812            Project::local(
2813                client_a.clone(),
2814                client_a.user_store.clone(),
2815                lang_registry.clone(),
2816                fs.clone(),
2817                cx,
2818            )
2819        });
2820        let (worktree_a, _) = project_a
2821            .update(cx_a, |p, cx| {
2822                p.find_or_create_local_worktree("/root-1", true, cx)
2823            })
2824            .await
2825            .unwrap();
2826        worktree_a
2827            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2828            .await;
2829        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2830        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2831        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2832
2833        // Join the worktree as client B.
2834        let project_b = Project::remote(
2835            project_id,
2836            client_b.clone(),
2837            client_b.user_store.clone(),
2838            lang_registry.clone(),
2839            fs.clone(),
2840            &mut cx_b.to_async(),
2841        )
2842        .await
2843        .unwrap();
2844
2845        // Open the file on client B.
2846        let buffer_b = cx_b
2847            .background()
2848            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2849            .await
2850            .unwrap();
2851
2852        // Request document highlights as the guest.
2853        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2854        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2855            |params, _| {
2856                assert_eq!(
2857                    params
2858                        .text_document_position_params
2859                        .text_document
2860                        .uri
2861                        .as_str(),
2862                    "file:///root-1/main.rs"
2863                );
2864                assert_eq!(
2865                    params.text_document_position_params.position,
2866                    lsp::Position::new(0, 34)
2867                );
2868                Some(vec![
2869                    lsp::DocumentHighlight {
2870                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2871                        range: lsp::Range::new(
2872                            lsp::Position::new(0, 10),
2873                            lsp::Position::new(0, 16),
2874                        ),
2875                    },
2876                    lsp::DocumentHighlight {
2877                        kind: Some(lsp::DocumentHighlightKind::READ),
2878                        range: lsp::Range::new(
2879                            lsp::Position::new(0, 32),
2880                            lsp::Position::new(0, 38),
2881                        ),
2882                    },
2883                    lsp::DocumentHighlight {
2884                        kind: Some(lsp::DocumentHighlightKind::READ),
2885                        range: lsp::Range::new(
2886                            lsp::Position::new(0, 41),
2887                            lsp::Position::new(0, 47),
2888                        ),
2889                    },
2890                ])
2891            },
2892        );
2893
2894        let highlights = project_b
2895            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2896            .await
2897            .unwrap();
2898        buffer_b.read_with(cx_b, |buffer, _| {
2899            let snapshot = buffer.snapshot();
2900
2901            let highlights = highlights
2902                .into_iter()
2903                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2904                .collect::<Vec<_>>();
2905            assert_eq!(
2906                highlights,
2907                &[
2908                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2909                    (lsp::DocumentHighlightKind::READ, 32..38),
2910                    (lsp::DocumentHighlightKind::READ, 41..47)
2911                ]
2912            )
2913        });
2914    }
2915
2916    #[gpui::test(iterations = 10)]
2917    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2918        cx_a.foreground().forbid_parking();
2919        let mut lang_registry = Arc::new(LanguageRegistry::test());
2920        let fs = FakeFs::new(cx_a.background());
2921        fs.insert_tree(
2922            "/code",
2923            json!({
2924                "crate-1": {
2925                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2926                    "one.rs": "const ONE: usize = 1;",
2927                },
2928                "crate-2": {
2929                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2930                },
2931                "private": {
2932                    "passwords.txt": "the-password",
2933                }
2934            }),
2935        )
2936        .await;
2937
2938        // Set up a fake language server.
2939        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2940        Arc::get_mut(&mut lang_registry)
2941            .unwrap()
2942            .add(Arc::new(Language::new(
2943                LanguageConfig {
2944                    name: "Rust".into(),
2945                    path_suffixes: vec!["rs".to_string()],
2946                    language_server: Some(language_server_config),
2947                    ..Default::default()
2948                },
2949                Some(tree_sitter_rust::language()),
2950            )));
2951
2952        // Connect to a server as 2 clients.
2953        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2954        let client_a = server.create_client(cx_a, "user_a").await;
2955        let client_b = server.create_client(cx_b, "user_b").await;
2956
2957        // Share a project as client A
2958        let project_a = cx_a.update(|cx| {
2959            Project::local(
2960                client_a.clone(),
2961                client_a.user_store.clone(),
2962                lang_registry.clone(),
2963                fs.clone(),
2964                cx,
2965            )
2966        });
2967        let (worktree_a, _) = project_a
2968            .update(cx_a, |p, cx| {
2969                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2970            })
2971            .await
2972            .unwrap();
2973        worktree_a
2974            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2975            .await;
2976        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2977        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2978        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2979
2980        // Join the worktree as client B.
2981        let project_b = Project::remote(
2982            project_id,
2983            client_b.clone(),
2984            client_b.user_store.clone(),
2985            lang_registry.clone(),
2986            fs.clone(),
2987            &mut cx_b.to_async(),
2988        )
2989        .await
2990        .unwrap();
2991
2992        // Cause the language server to start.
2993        let _buffer = cx_b
2994            .background()
2995            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2996            .await
2997            .unwrap();
2998
2999        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3000        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3001            #[allow(deprecated)]
3002            Some(vec![lsp::SymbolInformation {
3003                name: "TWO".into(),
3004                location: lsp::Location {
3005                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3006                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3007                },
3008                kind: lsp::SymbolKind::CONSTANT,
3009                tags: None,
3010                container_name: None,
3011                deprecated: None,
3012            }])
3013        });
3014
3015        // Request the definition of a symbol as the guest.
3016        let symbols = project_b
3017            .update(cx_b, |p, cx| p.symbols("two", cx))
3018            .await
3019            .unwrap();
3020        assert_eq!(symbols.len(), 1);
3021        assert_eq!(symbols[0].name, "TWO");
3022
3023        // Open one of the returned symbols.
3024        let buffer_b_2 = project_b
3025            .update(cx_b, |project, cx| {
3026                project.open_buffer_for_symbol(&symbols[0], cx)
3027            })
3028            .await
3029            .unwrap();
3030        buffer_b_2.read_with(cx_b, |buffer, _| {
3031            assert_eq!(
3032                buffer.file().unwrap().path().as_ref(),
3033                Path::new("../crate-2/two.rs")
3034            );
3035        });
3036
3037        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3038        let mut fake_symbol = symbols[0].clone();
3039        fake_symbol.path = Path::new("/code/secrets").into();
3040        let error = project_b
3041            .update(cx_b, |project, cx| {
3042                project.open_buffer_for_symbol(&fake_symbol, cx)
3043            })
3044            .await
3045            .unwrap_err();
3046        assert!(error.to_string().contains("invalid symbol signature"));
3047    }
3048
3049    #[gpui::test(iterations = 10)]
3050    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3051        cx_a: &mut TestAppContext,
3052        cx_b: &mut TestAppContext,
3053        mut rng: StdRng,
3054    ) {
3055        cx_a.foreground().forbid_parking();
3056        let mut lang_registry = Arc::new(LanguageRegistry::test());
3057        let fs = FakeFs::new(cx_a.background());
3058        fs.insert_tree(
3059            "/root",
3060            json!({
3061                ".zed.toml": r#"collaborators = ["user_b"]"#,
3062                "a.rs": "const ONE: usize = b::TWO;",
3063                "b.rs": "const TWO: usize = 2",
3064            }),
3065        )
3066        .await;
3067
3068        // Set up a fake language server.
3069        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3070
3071        Arc::get_mut(&mut lang_registry)
3072            .unwrap()
3073            .add(Arc::new(Language::new(
3074                LanguageConfig {
3075                    name: "Rust".into(),
3076                    path_suffixes: vec!["rs".to_string()],
3077                    language_server: Some(language_server_config),
3078                    ..Default::default()
3079                },
3080                Some(tree_sitter_rust::language()),
3081            )));
3082
3083        // Connect to a server as 2 clients.
3084        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3085        let client_a = server.create_client(cx_a, "user_a").await;
3086        let client_b = server.create_client(cx_b, "user_b").await;
3087
3088        // Share a project as client A
3089        let project_a = cx_a.update(|cx| {
3090            Project::local(
3091                client_a.clone(),
3092                client_a.user_store.clone(),
3093                lang_registry.clone(),
3094                fs.clone(),
3095                cx,
3096            )
3097        });
3098
3099        let (worktree_a, _) = project_a
3100            .update(cx_a, |p, cx| {
3101                p.find_or_create_local_worktree("/root", true, cx)
3102            })
3103            .await
3104            .unwrap();
3105        worktree_a
3106            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3107            .await;
3108        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3109        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3110        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3111
3112        // Join the worktree as client B.
3113        let project_b = Project::remote(
3114            project_id,
3115            client_b.clone(),
3116            client_b.user_store.clone(),
3117            lang_registry.clone(),
3118            fs.clone(),
3119            &mut cx_b.to_async(),
3120        )
3121        .await
3122        .unwrap();
3123
3124        let buffer_b1 = cx_b
3125            .background()
3126            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3127            .await
3128            .unwrap();
3129
3130        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3131        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3132            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3133                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3134                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3135            )))
3136        });
3137
3138        let definitions;
3139        let buffer_b2;
3140        if rng.gen() {
3141            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3142            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3143        } else {
3144            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3145            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3146        }
3147
3148        let buffer_b2 = buffer_b2.await.unwrap();
3149        let definitions = definitions.await.unwrap();
3150        assert_eq!(definitions.len(), 1);
3151        assert_eq!(definitions[0].buffer, buffer_b2);
3152    }
3153
3154    #[gpui::test(iterations = 10)]
3155    async fn test_collaborating_with_code_actions(
3156        cx_a: &mut TestAppContext,
3157        cx_b: &mut TestAppContext,
3158    ) {
3159        cx_a.foreground().forbid_parking();
3160        let mut lang_registry = Arc::new(LanguageRegistry::test());
3161        let fs = FakeFs::new(cx_a.background());
3162        let mut path_openers_b = Vec::new();
3163        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3164
3165        // Set up a fake language server.
3166        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3167        Arc::get_mut(&mut lang_registry)
3168            .unwrap()
3169            .add(Arc::new(Language::new(
3170                LanguageConfig {
3171                    name: "Rust".into(),
3172                    path_suffixes: vec!["rs".to_string()],
3173                    language_server: Some(language_server_config),
3174                    ..Default::default()
3175                },
3176                Some(tree_sitter_rust::language()),
3177            )));
3178
3179        // Connect to a server as 2 clients.
3180        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3181        let client_a = server.create_client(cx_a, "user_a").await;
3182        let client_b = server.create_client(cx_b, "user_b").await;
3183
3184        // Share a project as client A
3185        fs.insert_tree(
3186            "/a",
3187            json!({
3188                ".zed.toml": r#"collaborators = ["user_b"]"#,
3189                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3190                "other.rs": "pub fn foo() -> usize { 4 }",
3191            }),
3192        )
3193        .await;
3194        let project_a = cx_a.update(|cx| {
3195            Project::local(
3196                client_a.clone(),
3197                client_a.user_store.clone(),
3198                lang_registry.clone(),
3199                fs.clone(),
3200                cx,
3201            )
3202        });
3203        let (worktree_a, _) = project_a
3204            .update(cx_a, |p, cx| {
3205                p.find_or_create_local_worktree("/a", true, cx)
3206            })
3207            .await
3208            .unwrap();
3209        worktree_a
3210            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3211            .await;
3212        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3213        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3214        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3215
3216        // Join the worktree as client B.
3217        let project_b = Project::remote(
3218            project_id,
3219            client_b.clone(),
3220            client_b.user_store.clone(),
3221            lang_registry.clone(),
3222            fs.clone(),
3223            &mut cx_b.to_async(),
3224        )
3225        .await
3226        .unwrap();
3227        let mut params = cx_b.update(WorkspaceParams::test);
3228        params.languages = lang_registry.clone();
3229        params.client = client_b.client.clone();
3230        params.user_store = client_b.user_store.clone();
3231        params.project = project_b;
3232        params.path_openers = path_openers_b.into();
3233
3234        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3235        let editor_b = workspace_b
3236            .update(cx_b, |workspace, cx| {
3237                workspace.open_path((worktree_id, "main.rs").into(), cx)
3238            })
3239            .await
3240            .unwrap()
3241            .downcast::<Editor>()
3242            .unwrap();
3243
3244        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3245        fake_language_server
3246            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3247                assert_eq!(
3248                    params.text_document.uri,
3249                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3250                );
3251                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3252                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3253                None
3254            })
3255            .next()
3256            .await;
3257
3258        // Move cursor to a location that contains code actions.
3259        editor_b.update(cx_b, |editor, cx| {
3260            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3261            cx.focus(&editor_b);
3262        });
3263
3264        fake_language_server
3265            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3266                assert_eq!(
3267                    params.text_document.uri,
3268                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3269                );
3270                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3271                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3272
3273                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3274                    lsp::CodeAction {
3275                        title: "Inline into all callers".to_string(),
3276                        edit: Some(lsp::WorkspaceEdit {
3277                            changes: Some(
3278                                [
3279                                    (
3280                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3281                                        vec![lsp::TextEdit::new(
3282                                            lsp::Range::new(
3283                                                lsp::Position::new(1, 22),
3284                                                lsp::Position::new(1, 34),
3285                                            ),
3286                                            "4".to_string(),
3287                                        )],
3288                                    ),
3289                                    (
3290                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3291                                        vec![lsp::TextEdit::new(
3292                                            lsp::Range::new(
3293                                                lsp::Position::new(0, 0),
3294                                                lsp::Position::new(0, 27),
3295                                            ),
3296                                            "".to_string(),
3297                                        )],
3298                                    ),
3299                                ]
3300                                .into_iter()
3301                                .collect(),
3302                            ),
3303                            ..Default::default()
3304                        }),
3305                        data: Some(json!({
3306                            "codeActionParams": {
3307                                "range": {
3308                                    "start": {"line": 1, "column": 31},
3309                                    "end": {"line": 1, "column": 31},
3310                                }
3311                            }
3312                        })),
3313                        ..Default::default()
3314                    },
3315                )])
3316            })
3317            .next()
3318            .await;
3319
3320        // Toggle code actions and wait for them to display.
3321        editor_b.update(cx_b, |editor, cx| {
3322            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3323        });
3324        editor_b
3325            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3326            .await;
3327
3328        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3329
3330        // Confirming the code action will trigger a resolve request.
3331        let confirm_action = workspace_b
3332            .update(cx_b, |workspace, cx| {
3333                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3334            })
3335            .unwrap();
3336        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3337            lsp::CodeAction {
3338                title: "Inline into all callers".to_string(),
3339                edit: Some(lsp::WorkspaceEdit {
3340                    changes: Some(
3341                        [
3342                            (
3343                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3344                                vec![lsp::TextEdit::new(
3345                                    lsp::Range::new(
3346                                        lsp::Position::new(1, 22),
3347                                        lsp::Position::new(1, 34),
3348                                    ),
3349                                    "4".to_string(),
3350                                )],
3351                            ),
3352                            (
3353                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3354                                vec![lsp::TextEdit::new(
3355                                    lsp::Range::new(
3356                                        lsp::Position::new(0, 0),
3357                                        lsp::Position::new(0, 27),
3358                                    ),
3359                                    "".to_string(),
3360                                )],
3361                            ),
3362                        ]
3363                        .into_iter()
3364                        .collect(),
3365                    ),
3366                    ..Default::default()
3367                }),
3368                ..Default::default()
3369            }
3370        });
3371
3372        // After the action is confirmed, an editor containing both modified files is opened.
3373        confirm_action.await.unwrap();
3374        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3375            workspace
3376                .active_item(cx)
3377                .unwrap()
3378                .downcast::<Editor>()
3379                .unwrap()
3380        });
3381        code_action_editor.update(cx_b, |editor, cx| {
3382            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3383            editor.undo(&Undo, cx);
3384            assert_eq!(
3385                editor.text(cx),
3386                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3387            );
3388            editor.redo(&Redo, cx);
3389            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3390        });
3391    }
3392
3393    #[gpui::test(iterations = 10)]
3394    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3395        cx_a.foreground().forbid_parking();
3396        let mut lang_registry = Arc::new(LanguageRegistry::test());
3397        let fs = FakeFs::new(cx_a.background());
3398        let mut path_openers_b = Vec::new();
3399        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3400
3401        // Set up a fake language server.
3402        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3403        Arc::get_mut(&mut lang_registry)
3404            .unwrap()
3405            .add(Arc::new(Language::new(
3406                LanguageConfig {
3407                    name: "Rust".into(),
3408                    path_suffixes: vec!["rs".to_string()],
3409                    language_server: Some(language_server_config),
3410                    ..Default::default()
3411                },
3412                Some(tree_sitter_rust::language()),
3413            )));
3414
3415        // Connect to a server as 2 clients.
3416        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3417        let client_a = server.create_client(cx_a, "user_a").await;
3418        let client_b = server.create_client(cx_b, "user_b").await;
3419
3420        // Share a project as client A
3421        fs.insert_tree(
3422            "/dir",
3423            json!({
3424                ".zed.toml": r#"collaborators = ["user_b"]"#,
3425                "one.rs": "const ONE: usize = 1;",
3426                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3427            }),
3428        )
3429        .await;
3430        let project_a = cx_a.update(|cx| {
3431            Project::local(
3432                client_a.clone(),
3433                client_a.user_store.clone(),
3434                lang_registry.clone(),
3435                fs.clone(),
3436                cx,
3437            )
3438        });
3439        let (worktree_a, _) = project_a
3440            .update(cx_a, |p, cx| {
3441                p.find_or_create_local_worktree("/dir", true, cx)
3442            })
3443            .await
3444            .unwrap();
3445        worktree_a
3446            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3447            .await;
3448        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3449        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3450        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3451
3452        // Join the worktree as client B.
3453        let project_b = Project::remote(
3454            project_id,
3455            client_b.clone(),
3456            client_b.user_store.clone(),
3457            lang_registry.clone(),
3458            fs.clone(),
3459            &mut cx_b.to_async(),
3460        )
3461        .await
3462        .unwrap();
3463        let mut params = cx_b.update(WorkspaceParams::test);
3464        params.languages = lang_registry.clone();
3465        params.client = client_b.client.clone();
3466        params.user_store = client_b.user_store.clone();
3467        params.project = project_b;
3468        params.path_openers = path_openers_b.into();
3469
3470        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3471        let editor_b = workspace_b
3472            .update(cx_b, |workspace, cx| {
3473                workspace.open_path((worktree_id, "one.rs").into(), cx)
3474            })
3475            .await
3476            .unwrap()
3477            .downcast::<Editor>()
3478            .unwrap();
3479        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3480
3481        // Move cursor to a location that can be renamed.
3482        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3483            editor.select_ranges([7..7], None, cx);
3484            editor.rename(&Rename, cx).unwrap()
3485        });
3486
3487        fake_language_server
3488            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3489                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3490                assert_eq!(params.position, lsp::Position::new(0, 7));
3491                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3492                    lsp::Position::new(0, 6),
3493                    lsp::Position::new(0, 9),
3494                )))
3495            })
3496            .next()
3497            .await
3498            .unwrap();
3499        prepare_rename.await.unwrap();
3500        editor_b.update(cx_b, |editor, cx| {
3501            let rename = editor.pending_rename().unwrap();
3502            let buffer = editor.buffer().read(cx).snapshot(cx);
3503            assert_eq!(
3504                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3505                6..9
3506            );
3507            rename.editor.update(cx, |rename_editor, cx| {
3508                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3509                    rename_buffer.edit([0..3], "THREE", cx);
3510                });
3511            });
3512        });
3513
3514        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3515            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3516        });
3517        fake_language_server
3518            .handle_request::<lsp::request::Rename, _>(|params, _| {
3519                assert_eq!(
3520                    params.text_document_position.text_document.uri.as_str(),
3521                    "file:///dir/one.rs"
3522                );
3523                assert_eq!(
3524                    params.text_document_position.position,
3525                    lsp::Position::new(0, 6)
3526                );
3527                assert_eq!(params.new_name, "THREE");
3528                Some(lsp::WorkspaceEdit {
3529                    changes: Some(
3530                        [
3531                            (
3532                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3533                                vec![lsp::TextEdit::new(
3534                                    lsp::Range::new(
3535                                        lsp::Position::new(0, 6),
3536                                        lsp::Position::new(0, 9),
3537                                    ),
3538                                    "THREE".to_string(),
3539                                )],
3540                            ),
3541                            (
3542                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3543                                vec![
3544                                    lsp::TextEdit::new(
3545                                        lsp::Range::new(
3546                                            lsp::Position::new(0, 24),
3547                                            lsp::Position::new(0, 27),
3548                                        ),
3549                                        "THREE".to_string(),
3550                                    ),
3551                                    lsp::TextEdit::new(
3552                                        lsp::Range::new(
3553                                            lsp::Position::new(0, 35),
3554                                            lsp::Position::new(0, 38),
3555                                        ),
3556                                        "THREE".to_string(),
3557                                    ),
3558                                ],
3559                            ),
3560                        ]
3561                        .into_iter()
3562                        .collect(),
3563                    ),
3564                    ..Default::default()
3565                })
3566            })
3567            .next()
3568            .await
3569            .unwrap();
3570        confirm_rename.await.unwrap();
3571
3572        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3573            workspace
3574                .active_item(cx)
3575                .unwrap()
3576                .downcast::<Editor>()
3577                .unwrap()
3578        });
3579        rename_editor.update(cx_b, |editor, cx| {
3580            assert_eq!(
3581                editor.text(cx),
3582                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3583            );
3584            editor.undo(&Undo, cx);
3585            assert_eq!(
3586                editor.text(cx),
3587                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3588            );
3589            editor.redo(&Redo, cx);
3590            assert_eq!(
3591                editor.text(cx),
3592                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3593            );
3594        });
3595
3596        // Ensure temporary rename edits cannot be undone/redone.
3597        editor_b.update(cx_b, |editor, cx| {
3598            editor.undo(&Undo, cx);
3599            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3600            editor.undo(&Undo, cx);
3601            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3602            editor.redo(&Redo, cx);
3603            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3604        })
3605    }
3606
3607    #[gpui::test(iterations = 10)]
3608    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3609        cx_a.foreground().forbid_parking();
3610
3611        // Connect to a server as 2 clients.
3612        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3613        let client_a = server.create_client(cx_a, "user_a").await;
3614        let client_b = server.create_client(cx_b, "user_b").await;
3615
3616        // Create an org that includes these 2 users.
3617        let db = &server.app_state.db;
3618        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3619        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3620            .await
3621            .unwrap();
3622        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3623            .await
3624            .unwrap();
3625
3626        // Create a channel that includes all the users.
3627        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3628        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3629            .await
3630            .unwrap();
3631        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3632            .await
3633            .unwrap();
3634        db.create_channel_message(
3635            channel_id,
3636            client_b.current_user_id(&cx_b),
3637            "hello A, it's B.",
3638            OffsetDateTime::now_utc(),
3639            1,
3640        )
3641        .await
3642        .unwrap();
3643
3644        let channels_a = cx_a
3645            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3646        channels_a
3647            .condition(cx_a, |list, _| list.available_channels().is_some())
3648            .await;
3649        channels_a.read_with(cx_a, |list, _| {
3650            assert_eq!(
3651                list.available_channels().unwrap(),
3652                &[ChannelDetails {
3653                    id: channel_id.to_proto(),
3654                    name: "test-channel".to_string()
3655                }]
3656            )
3657        });
3658        let channel_a = channels_a.update(cx_a, |this, cx| {
3659            this.get_channel(channel_id.to_proto(), cx).unwrap()
3660        });
3661        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3662        channel_a
3663            .condition(&cx_a, |channel, _| {
3664                channel_messages(channel)
3665                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3666            })
3667            .await;
3668
3669        let channels_b = cx_b
3670            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3671        channels_b
3672            .condition(cx_b, |list, _| list.available_channels().is_some())
3673            .await;
3674        channels_b.read_with(cx_b, |list, _| {
3675            assert_eq!(
3676                list.available_channels().unwrap(),
3677                &[ChannelDetails {
3678                    id: channel_id.to_proto(),
3679                    name: "test-channel".to_string()
3680                }]
3681            )
3682        });
3683
3684        let channel_b = channels_b.update(cx_b, |this, cx| {
3685            this.get_channel(channel_id.to_proto(), cx).unwrap()
3686        });
3687        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3688        channel_b
3689            .condition(&cx_b, |channel, _| {
3690                channel_messages(channel)
3691                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3692            })
3693            .await;
3694
3695        channel_a
3696            .update(cx_a, |channel, cx| {
3697                channel
3698                    .send_message("oh, hi B.".to_string(), cx)
3699                    .unwrap()
3700                    .detach();
3701                let task = channel.send_message("sup".to_string(), cx).unwrap();
3702                assert_eq!(
3703                    channel_messages(channel),
3704                    &[
3705                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3706                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3707                        ("user_a".to_string(), "sup".to_string(), true)
3708                    ]
3709                );
3710                task
3711            })
3712            .await
3713            .unwrap();
3714
3715        channel_b
3716            .condition(&cx_b, |channel, _| {
3717                channel_messages(channel)
3718                    == [
3719                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3720                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3721                        ("user_a".to_string(), "sup".to_string(), false),
3722                    ]
3723            })
3724            .await;
3725
3726        assert_eq!(
3727            server
3728                .state()
3729                .await
3730                .channel(channel_id)
3731                .unwrap()
3732                .connection_ids
3733                .len(),
3734            2
3735        );
3736        cx_b.update(|_| drop(channel_b));
3737        server
3738            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3739            .await;
3740
3741        cx_a.update(|_| drop(channel_a));
3742        server
3743            .condition(|state| state.channel(channel_id).is_none())
3744            .await;
3745    }
3746
3747    #[gpui::test(iterations = 10)]
3748    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3749        cx_a.foreground().forbid_parking();
3750
3751        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3752        let client_a = server.create_client(cx_a, "user_a").await;
3753
3754        let db = &server.app_state.db;
3755        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3756        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3757        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3758            .await
3759            .unwrap();
3760        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3761            .await
3762            .unwrap();
3763
3764        let channels_a = cx_a
3765            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3766        channels_a
3767            .condition(cx_a, |list, _| list.available_channels().is_some())
3768            .await;
3769        let channel_a = channels_a.update(cx_a, |this, cx| {
3770            this.get_channel(channel_id.to_proto(), cx).unwrap()
3771        });
3772
3773        // Messages aren't allowed to be too long.
3774        channel_a
3775            .update(cx_a, |channel, cx| {
3776                let long_body = "this is long.\n".repeat(1024);
3777                channel.send_message(long_body, cx).unwrap()
3778            })
3779            .await
3780            .unwrap_err();
3781
3782        // Messages aren't allowed to be blank.
3783        channel_a.update(cx_a, |channel, cx| {
3784            channel.send_message(String::new(), cx).unwrap_err()
3785        });
3786
3787        // Leading and trailing whitespace are trimmed.
3788        channel_a
3789            .update(cx_a, |channel, cx| {
3790                channel
3791                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3792                    .unwrap()
3793            })
3794            .await
3795            .unwrap();
3796        assert_eq!(
3797            db.get_channel_messages(channel_id, 10, None)
3798                .await
3799                .unwrap()
3800                .iter()
3801                .map(|m| &m.body)
3802                .collect::<Vec<_>>(),
3803            &["surrounded by whitespace"]
3804        );
3805    }
3806
3807    #[gpui::test(iterations = 10)]
3808    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3809        cx_a.foreground().forbid_parking();
3810
3811        // Connect to a server as 2 clients.
3812        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3813        let client_a = server.create_client(cx_a, "user_a").await;
3814        let client_b = server.create_client(cx_b, "user_b").await;
3815        let mut status_b = client_b.status();
3816
3817        // Create an org that includes these 2 users.
3818        let db = &server.app_state.db;
3819        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3820        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3821            .await
3822            .unwrap();
3823        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3824            .await
3825            .unwrap();
3826
3827        // Create a channel that includes all the users.
3828        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3829        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3830            .await
3831            .unwrap();
3832        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3833            .await
3834            .unwrap();
3835        db.create_channel_message(
3836            channel_id,
3837            client_b.current_user_id(&cx_b),
3838            "hello A, it's B.",
3839            OffsetDateTime::now_utc(),
3840            2,
3841        )
3842        .await
3843        .unwrap();
3844
3845        let channels_a = cx_a
3846            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3847        channels_a
3848            .condition(cx_a, |list, _| list.available_channels().is_some())
3849            .await;
3850
3851        channels_a.read_with(cx_a, |list, _| {
3852            assert_eq!(
3853                list.available_channels().unwrap(),
3854                &[ChannelDetails {
3855                    id: channel_id.to_proto(),
3856                    name: "test-channel".to_string()
3857                }]
3858            )
3859        });
3860        let channel_a = channels_a.update(cx_a, |this, cx| {
3861            this.get_channel(channel_id.to_proto(), cx).unwrap()
3862        });
3863        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3864        channel_a
3865            .condition(&cx_a, |channel, _| {
3866                channel_messages(channel)
3867                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3868            })
3869            .await;
3870
3871        let channels_b = cx_b
3872            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3873        channels_b
3874            .condition(cx_b, |list, _| list.available_channels().is_some())
3875            .await;
3876        channels_b.read_with(cx_b, |list, _| {
3877            assert_eq!(
3878                list.available_channels().unwrap(),
3879                &[ChannelDetails {
3880                    id: channel_id.to_proto(),
3881                    name: "test-channel".to_string()
3882                }]
3883            )
3884        });
3885
3886        let channel_b = channels_b.update(cx_b, |this, cx| {
3887            this.get_channel(channel_id.to_proto(), cx).unwrap()
3888        });
3889        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3890        channel_b
3891            .condition(&cx_b, |channel, _| {
3892                channel_messages(channel)
3893                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3894            })
3895            .await;
3896
3897        // Disconnect client B, ensuring we can still access its cached channel data.
3898        server.forbid_connections();
3899        server.disconnect_client(client_b.current_user_id(&cx_b));
3900        cx_b.foreground().advance_clock(Duration::from_secs(3));
3901        while !matches!(
3902            status_b.next().await,
3903            Some(client::Status::ReconnectionError { .. })
3904        ) {}
3905
3906        channels_b.read_with(cx_b, |channels, _| {
3907            assert_eq!(
3908                channels.available_channels().unwrap(),
3909                [ChannelDetails {
3910                    id: channel_id.to_proto(),
3911                    name: "test-channel".to_string()
3912                }]
3913            )
3914        });
3915        channel_b.read_with(cx_b, |channel, _| {
3916            assert_eq!(
3917                channel_messages(channel),
3918                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3919            )
3920        });
3921
3922        // Send a message from client B while it is disconnected.
3923        channel_b
3924            .update(cx_b, |channel, cx| {
3925                let task = channel
3926                    .send_message("can you see this?".to_string(), cx)
3927                    .unwrap();
3928                assert_eq!(
3929                    channel_messages(channel),
3930                    &[
3931                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3932                        ("user_b".to_string(), "can you see this?".to_string(), true)
3933                    ]
3934                );
3935                task
3936            })
3937            .await
3938            .unwrap_err();
3939
3940        // Send a message from client A while B is disconnected.
3941        channel_a
3942            .update(cx_a, |channel, cx| {
3943                channel
3944                    .send_message("oh, hi B.".to_string(), cx)
3945                    .unwrap()
3946                    .detach();
3947                let task = channel.send_message("sup".to_string(), cx).unwrap();
3948                assert_eq!(
3949                    channel_messages(channel),
3950                    &[
3951                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3952                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3953                        ("user_a".to_string(), "sup".to_string(), true)
3954                    ]
3955                );
3956                task
3957            })
3958            .await
3959            .unwrap();
3960
3961        // Give client B a chance to reconnect.
3962        server.allow_connections();
3963        cx_b.foreground().advance_clock(Duration::from_secs(10));
3964
3965        // Verify that B sees the new messages upon reconnection, as well as the message client B
3966        // sent while offline.
3967        channel_b
3968            .condition(&cx_b, |channel, _| {
3969                channel_messages(channel)
3970                    == [
3971                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3972                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3973                        ("user_a".to_string(), "sup".to_string(), false),
3974                        ("user_b".to_string(), "can you see this?".to_string(), false),
3975                    ]
3976            })
3977            .await;
3978
3979        // Ensure client A and B can communicate normally after reconnection.
3980        channel_a
3981            .update(cx_a, |channel, cx| {
3982                channel.send_message("you online?".to_string(), cx).unwrap()
3983            })
3984            .await
3985            .unwrap();
3986        channel_b
3987            .condition(&cx_b, |channel, _| {
3988                channel_messages(channel)
3989                    == [
3990                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3991                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3992                        ("user_a".to_string(), "sup".to_string(), false),
3993                        ("user_b".to_string(), "can you see this?".to_string(), false),
3994                        ("user_a".to_string(), "you online?".to_string(), false),
3995                    ]
3996            })
3997            .await;
3998
3999        channel_b
4000            .update(cx_b, |channel, cx| {
4001                channel.send_message("yep".to_string(), cx).unwrap()
4002            })
4003            .await
4004            .unwrap();
4005        channel_a
4006            .condition(&cx_a, |channel, _| {
4007                channel_messages(channel)
4008                    == [
4009                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4010                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4011                        ("user_a".to_string(), "sup".to_string(), false),
4012                        ("user_b".to_string(), "can you see this?".to_string(), false),
4013                        ("user_a".to_string(), "you online?".to_string(), false),
4014                        ("user_b".to_string(), "yep".to_string(), false),
4015                    ]
4016            })
4017            .await;
4018    }
4019
4020    #[gpui::test(iterations = 10)]
4021    async fn test_contacts(
4022        cx_a: &mut TestAppContext,
4023        cx_b: &mut TestAppContext,
4024        cx_c: &mut TestAppContext,
4025    ) {
4026        cx_a.foreground().forbid_parking();
4027        let lang_registry = Arc::new(LanguageRegistry::test());
4028        let fs = FakeFs::new(cx_a.background());
4029
4030        // Connect to a server as 3 clients.
4031        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4032        let client_a = server.create_client(cx_a, "user_a").await;
4033        let client_b = server.create_client(cx_b, "user_b").await;
4034        let client_c = server.create_client(cx_c, "user_c").await;
4035
4036        // Share a worktree as client A.
4037        fs.insert_tree(
4038            "/a",
4039            json!({
4040                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4041            }),
4042        )
4043        .await;
4044
4045        let project_a = cx_a.update(|cx| {
4046            Project::local(
4047                client_a.clone(),
4048                client_a.user_store.clone(),
4049                lang_registry.clone(),
4050                fs.clone(),
4051                cx,
4052            )
4053        });
4054        let (worktree_a, _) = project_a
4055            .update(cx_a, |p, cx| {
4056                p.find_or_create_local_worktree("/a", true, cx)
4057            })
4058            .await
4059            .unwrap();
4060        worktree_a
4061            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4062            .await;
4063
4064        client_a
4065            .user_store
4066            .condition(&cx_a, |user_store, _| {
4067                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4068            })
4069            .await;
4070        client_b
4071            .user_store
4072            .condition(&cx_b, |user_store, _| {
4073                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4074            })
4075            .await;
4076        client_c
4077            .user_store
4078            .condition(&cx_c, |user_store, _| {
4079                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4080            })
4081            .await;
4082
4083        let project_id = project_a
4084            .update(cx_a, |project, _| project.next_remote_id())
4085            .await;
4086        project_a
4087            .update(cx_a, |project, cx| project.share(cx))
4088            .await
4089            .unwrap();
4090
4091        let _project_b = Project::remote(
4092            project_id,
4093            client_b.clone(),
4094            client_b.user_store.clone(),
4095            lang_registry.clone(),
4096            fs.clone(),
4097            &mut cx_b.to_async(),
4098        )
4099        .await
4100        .unwrap();
4101
4102        client_a
4103            .user_store
4104            .condition(&cx_a, |user_store, _| {
4105                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4106            })
4107            .await;
4108        client_b
4109            .user_store
4110            .condition(&cx_b, |user_store, _| {
4111                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4112            })
4113            .await;
4114        client_c
4115            .user_store
4116            .condition(&cx_c, |user_store, _| {
4117                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4118            })
4119            .await;
4120
4121        project_a
4122            .condition(&cx_a, |project, _| {
4123                project.collaborators().contains_key(&client_b.peer_id)
4124            })
4125            .await;
4126
4127        cx_a.update(move |_| drop(project_a));
4128        client_a
4129            .user_store
4130            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4131            .await;
4132        client_b
4133            .user_store
4134            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4135            .await;
4136        client_c
4137            .user_store
4138            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4139            .await;
4140
4141        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4142            user_store
4143                .contacts()
4144                .iter()
4145                .map(|contact| {
4146                    let worktrees = contact
4147                        .projects
4148                        .iter()
4149                        .map(|p| {
4150                            (
4151                                p.worktree_root_names[0].as_str(),
4152                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4153                            )
4154                        })
4155                        .collect();
4156                    (contact.user.github_login.as_str(), worktrees)
4157                })
4158                .collect()
4159        }
4160    }
4161
4162    #[gpui::test(iterations = 100)]
4163    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4164        cx.foreground().forbid_parking();
4165        let max_peers = env::var("MAX_PEERS")
4166            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4167            .unwrap_or(5);
4168        let max_operations = env::var("OPERATIONS")
4169            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4170            .unwrap_or(10);
4171
4172        let rng = Arc::new(Mutex::new(rng));
4173
4174        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4175        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4176
4177        let fs = FakeFs::new(cx.background());
4178        fs.insert_tree(
4179            "/_collab",
4180            json!({
4181                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4182            }),
4183        )
4184        .await;
4185
4186        let operations = Rc::new(Cell::new(0));
4187        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4188        let mut clients = Vec::new();
4189
4190        let mut next_entity_id = 100000;
4191        let mut host_cx = TestAppContext::new(
4192            cx.foreground_platform(),
4193            cx.platform(),
4194            cx.foreground(),
4195            cx.background(),
4196            cx.font_cache(),
4197            cx.leak_detector(),
4198            next_entity_id,
4199        );
4200        let host = server.create_client(&mut host_cx, "host").await;
4201        let host_project = host_cx.update(|cx| {
4202            Project::local(
4203                host.client.clone(),
4204                host.user_store.clone(),
4205                Arc::new(LanguageRegistry::test()),
4206                fs.clone(),
4207                cx,
4208            )
4209        });
4210        let host_project_id = host_project
4211            .update(&mut host_cx, |p, _| p.next_remote_id())
4212            .await;
4213
4214        let (collab_worktree, _) = host_project
4215            .update(&mut host_cx, |project, cx| {
4216                project.find_or_create_local_worktree("/_collab", true, cx)
4217            })
4218            .await
4219            .unwrap();
4220        collab_worktree
4221            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4222            .await;
4223        host_project
4224            .update(&mut host_cx, |project, cx| project.share(cx))
4225            .await
4226            .unwrap();
4227
4228        clients.push(cx.foreground().spawn(host.simulate_host(
4229            host_project,
4230            language_server_config,
4231            operations.clone(),
4232            max_operations,
4233            rng.clone(),
4234            host_cx,
4235        )));
4236
4237        while operations.get() < max_operations {
4238            cx.background().simulate_random_delay().await;
4239            if clients.len() >= max_peers {
4240                break;
4241            } else if rng.lock().gen_bool(0.05) {
4242                operations.set(operations.get() + 1);
4243
4244                let guest_id = clients.len();
4245                log::info!("Adding guest {}", guest_id);
4246                next_entity_id += 100000;
4247                let mut guest_cx = TestAppContext::new(
4248                    cx.foreground_platform(),
4249                    cx.platform(),
4250                    cx.foreground(),
4251                    cx.background(),
4252                    cx.font_cache(),
4253                    cx.leak_detector(),
4254                    next_entity_id,
4255                );
4256                let guest = server
4257                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4258                    .await;
4259                let guest_project = Project::remote(
4260                    host_project_id,
4261                    guest.client.clone(),
4262                    guest.user_store.clone(),
4263                    guest_lang_registry.clone(),
4264                    FakeFs::new(cx.background()),
4265                    &mut guest_cx.to_async(),
4266                )
4267                .await
4268                .unwrap();
4269                clients.push(cx.foreground().spawn(guest.simulate_guest(
4270                    guest_id,
4271                    guest_project,
4272                    operations.clone(),
4273                    max_operations,
4274                    rng.clone(),
4275                    guest_cx,
4276                )));
4277
4278                log::info!("Guest {} added", guest_id);
4279            }
4280        }
4281
4282        let mut clients = futures::future::join_all(clients).await;
4283        cx.foreground().run_until_parked();
4284
4285        let (host_client, mut host_cx) = clients.remove(0);
4286        let host_project = host_client.project.as_ref().unwrap();
4287        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4288            project
4289                .worktrees(cx)
4290                .map(|worktree| {
4291                    let snapshot = worktree.read(cx).snapshot();
4292                    (snapshot.id(), snapshot)
4293                })
4294                .collect::<BTreeMap<_, _>>()
4295        });
4296
4297        host_client
4298            .project
4299            .as_ref()
4300            .unwrap()
4301            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4302
4303        for (guest_client, mut guest_cx) in clients.into_iter() {
4304            let guest_id = guest_client.client.id();
4305            let worktree_snapshots =
4306                guest_client
4307                    .project
4308                    .as_ref()
4309                    .unwrap()
4310                    .read_with(&guest_cx, |project, cx| {
4311                        project
4312                            .worktrees(cx)
4313                            .map(|worktree| {
4314                                let worktree = worktree.read(cx);
4315                                (worktree.id(), worktree.snapshot())
4316                            })
4317                            .collect::<BTreeMap<_, _>>()
4318                    });
4319
4320            assert_eq!(
4321                worktree_snapshots.keys().collect::<Vec<_>>(),
4322                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4323                "guest {} has different worktrees than the host",
4324                guest_id
4325            );
4326            for (id, host_snapshot) in &host_worktree_snapshots {
4327                let guest_snapshot = &worktree_snapshots[id];
4328                assert_eq!(
4329                    guest_snapshot.root_name(),
4330                    host_snapshot.root_name(),
4331                    "guest {} has different root name than the host for worktree {}",
4332                    guest_id,
4333                    id
4334                );
4335                assert_eq!(
4336                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4337                    host_snapshot.entries(false).collect::<Vec<_>>(),
4338                    "guest {} has different snapshot than the host for worktree {}",
4339                    guest_id,
4340                    id
4341                );
4342            }
4343
4344            guest_client
4345                .project
4346                .as_ref()
4347                .unwrap()
4348                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4349
4350            for guest_buffer in &guest_client.buffers {
4351                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4352                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4353                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4354                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4355                        guest_id, guest_client.peer_id, buffer_id
4356                    ))
4357                });
4358                let path = host_buffer
4359                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4360
4361                assert_eq!(
4362                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4363                    0,
4364                    "guest {}, buffer {}, path {:?} has deferred operations",
4365                    guest_id,
4366                    buffer_id,
4367                    path,
4368                );
4369                assert_eq!(
4370                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4371                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4372                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4373                    guest_id,
4374                    buffer_id,
4375                    path
4376                );
4377            }
4378
4379            guest_cx.update(|_| drop(guest_client));
4380        }
4381
4382        host_cx.update(|_| drop(host_client));
4383    }
4384
4385    struct TestServer {
4386        peer: Arc<Peer>,
4387        app_state: Arc<AppState>,
4388        server: Arc<Server>,
4389        foreground: Rc<executor::Foreground>,
4390        notifications: mpsc::UnboundedReceiver<()>,
4391        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4392        forbid_connections: Arc<AtomicBool>,
4393        _test_db: TestDb,
4394    }
4395
4396    impl TestServer {
4397        async fn start(
4398            foreground: Rc<executor::Foreground>,
4399            background: Arc<executor::Background>,
4400        ) -> Self {
4401            let test_db = TestDb::fake(background);
4402            let app_state = Self::build_app_state(&test_db).await;
4403            let peer = Peer::new();
4404            let notifications = mpsc::unbounded();
4405            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4406            Self {
4407                peer,
4408                app_state,
4409                server,
4410                foreground,
4411                notifications: notifications.1,
4412                connection_killers: Default::default(),
4413                forbid_connections: Default::default(),
4414                _test_db: test_db,
4415            }
4416        }
4417
4418        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4419            cx.update(|cx| {
4420                let settings = Settings::test(cx);
4421                cx.add_app_state(settings);
4422            });
4423
4424            let http = FakeHttpClient::with_404_response();
4425            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4426            let client_name = name.to_string();
4427            let mut client = Client::new(http.clone());
4428            let server = self.server.clone();
4429            let connection_killers = self.connection_killers.clone();
4430            let forbid_connections = self.forbid_connections.clone();
4431            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4432
4433            Arc::get_mut(&mut client)
4434                .unwrap()
4435                .override_authenticate(move |cx| {
4436                    cx.spawn(|_| async move {
4437                        let access_token = "the-token".to_string();
4438                        Ok(Credentials {
4439                            user_id: user_id.0 as u64,
4440                            access_token,
4441                        })
4442                    })
4443                })
4444                .override_establish_connection(move |credentials, cx| {
4445                    assert_eq!(credentials.user_id, user_id.0 as u64);
4446                    assert_eq!(credentials.access_token, "the-token");
4447
4448                    let server = server.clone();
4449                    let connection_killers = connection_killers.clone();
4450                    let forbid_connections = forbid_connections.clone();
4451                    let client_name = client_name.clone();
4452                    let connection_id_tx = connection_id_tx.clone();
4453                    cx.spawn(move |cx| async move {
4454                        if forbid_connections.load(SeqCst) {
4455                            Err(EstablishConnectionError::other(anyhow!(
4456                                "server is forbidding connections"
4457                            )))
4458                        } else {
4459                            let (client_conn, server_conn, kill_conn) =
4460                                Connection::in_memory(cx.background());
4461                            connection_killers.lock().insert(user_id, kill_conn);
4462                            cx.background()
4463                                .spawn(server.handle_connection(
4464                                    server_conn,
4465                                    client_name,
4466                                    user_id,
4467                                    Some(connection_id_tx),
4468                                    cx.background(),
4469                                ))
4470                                .detach();
4471                            Ok(client_conn)
4472                        }
4473                    })
4474                });
4475
4476            client
4477                .authenticate_and_connect(&cx.to_async())
4478                .await
4479                .unwrap();
4480
4481            Channel::init(&client);
4482            Project::init(&client);
4483
4484            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4485            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4486
4487            let client = TestClient {
4488                client,
4489                peer_id,
4490                user_store,
4491                project: Default::default(),
4492                buffers: Default::default(),
4493            };
4494            client.wait_for_current_user(cx).await;
4495            client
4496        }
4497
4498        fn disconnect_client(&self, user_id: UserId) {
4499            self.connection_killers.lock().remove(&user_id);
4500        }
4501
4502        fn forbid_connections(&self) {
4503            self.forbid_connections.store(true, SeqCst);
4504        }
4505
4506        fn allow_connections(&self) {
4507            self.forbid_connections.store(false, SeqCst);
4508        }
4509
4510        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4511            let mut config = Config::default();
4512            config.session_secret = "a".repeat(32);
4513            config.database_url = test_db.url.clone();
4514            let github_client = github::AppClient::test();
4515            Arc::new(AppState {
4516                db: test_db.db().clone(),
4517                handlebars: Default::default(),
4518                auth_client: auth::build_client("", ""),
4519                repo_client: github::RepoClient::test(&github_client),
4520                github_client,
4521                config,
4522            })
4523        }
4524
4525        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4526            self.server.store.read()
4527        }
4528
4529        async fn condition<F>(&mut self, mut predicate: F)
4530        where
4531            F: FnMut(&Store) -> bool,
4532        {
4533            async_std::future::timeout(Duration::from_millis(500), async {
4534                while !(predicate)(&*self.server.store.read()) {
4535                    self.foreground.start_waiting();
4536                    self.notifications.next().await;
4537                    self.foreground.finish_waiting();
4538                }
4539            })
4540            .await
4541            .expect("condition timed out");
4542        }
4543    }
4544
4545    impl Drop for TestServer {
4546        fn drop(&mut self) {
4547            self.peer.reset();
4548        }
4549    }
4550
4551    struct TestClient {
4552        client: Arc<Client>,
4553        pub peer_id: PeerId,
4554        pub user_store: ModelHandle<UserStore>,
4555        project: Option<ModelHandle<Project>>,
4556        buffers: HashSet<ModelHandle<language::Buffer>>,
4557    }
4558
4559    impl Deref for TestClient {
4560        type Target = Arc<Client>;
4561
4562        fn deref(&self) -> &Self::Target {
4563            &self.client
4564        }
4565    }
4566
4567    impl TestClient {
4568        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4569            UserId::from_proto(
4570                self.user_store
4571                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4572            )
4573        }
4574
4575        async fn wait_for_current_user(&self, cx: &TestAppContext) {
4576            let mut authed_user = self
4577                .user_store
4578                .read_with(cx, |user_store, _| user_store.watch_current_user());
4579            while authed_user.next().await.unwrap().is_none() {}
4580        }
4581
4582        fn simulate_host(
4583            mut self,
4584            project: ModelHandle<Project>,
4585            mut language_server_config: LanguageServerConfig,
4586            operations: Rc<Cell<usize>>,
4587            max_operations: usize,
4588            rng: Arc<Mutex<StdRng>>,
4589            mut cx: TestAppContext,
4590        ) -> impl Future<Output = (Self, TestAppContext)> {
4591            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4592
4593            // Set up a fake language server.
4594            language_server_config.set_fake_initializer({
4595                let rng = rng.clone();
4596                let files = files.clone();
4597                let project = project.downgrade();
4598                move |fake_server| {
4599                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4600                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4601                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4602                                range: lsp::Range::new(
4603                                    lsp::Position::new(0, 0),
4604                                    lsp::Position::new(0, 0),
4605                                ),
4606                                new_text: "the-new-text".to_string(),
4607                            })),
4608                            ..Default::default()
4609                        }]))
4610                    });
4611
4612                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4613                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4614                            lsp::CodeAction {
4615                                title: "the-code-action".to_string(),
4616                                ..Default::default()
4617                            },
4618                        )])
4619                    });
4620
4621                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4622                        |params, _| {
4623                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4624                                params.position,
4625                                params.position,
4626                            )))
4627                        },
4628                    );
4629
4630                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4631                        let files = files.clone();
4632                        let rng = rng.clone();
4633                        move |_, _| {
4634                            let files = files.lock();
4635                            let mut rng = rng.lock();
4636                            let count = rng.gen_range::<usize, _>(1..3);
4637                            let files = (0..count)
4638                                .map(|_| files.choose(&mut *rng).unwrap())
4639                                .collect::<Vec<_>>();
4640                            log::info!("LSP: Returning definitions in files {:?}", &files);
4641                            Some(lsp::GotoDefinitionResponse::Array(
4642                                files
4643                                    .into_iter()
4644                                    .map(|file| lsp::Location {
4645                                        uri: lsp::Url::from_file_path(file).unwrap(),
4646                                        range: Default::default(),
4647                                    })
4648                                    .collect(),
4649                            ))
4650                        }
4651                    });
4652
4653                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4654                        let rng = rng.clone();
4655                        let project = project.clone();
4656                        move |params, mut cx| {
4657                            if let Some(project) = project.upgrade(&cx) {
4658                                project.update(&mut cx, |project, cx| {
4659                                    let path = params
4660                                        .text_document_position_params
4661                                        .text_document
4662                                        .uri
4663                                        .to_file_path()
4664                                        .unwrap();
4665                                    let (worktree, relative_path) =
4666                                        project.find_local_worktree(&path, cx)?;
4667                                    let project_path =
4668                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4669                                    let buffer =
4670                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4671
4672                                    let mut highlights = Vec::new();
4673                                    let highlight_count = rng.lock().gen_range(1..=5);
4674                                    let mut prev_end = 0;
4675                                    for _ in 0..highlight_count {
4676                                        let range =
4677                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4678                                        let start = buffer
4679                                            .offset_to_point_utf16(range.start)
4680                                            .to_lsp_position();
4681                                        let end = buffer
4682                                            .offset_to_point_utf16(range.end)
4683                                            .to_lsp_position();
4684                                        highlights.push(lsp::DocumentHighlight {
4685                                            range: lsp::Range::new(start, end),
4686                                            kind: Some(lsp::DocumentHighlightKind::READ),
4687                                        });
4688                                        prev_end = range.end;
4689                                    }
4690                                    Some(highlights)
4691                                })
4692                            } else {
4693                                None
4694                            }
4695                        }
4696                    });
4697                }
4698            });
4699
4700            project.update(&mut cx, |project, _| {
4701                project.languages().add(Arc::new(Language::new(
4702                    LanguageConfig {
4703                        name: "Rust".into(),
4704                        path_suffixes: vec!["rs".to_string()],
4705                        language_server: Some(language_server_config),
4706                        ..Default::default()
4707                    },
4708                    None,
4709                )));
4710            });
4711
4712            async move {
4713                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4714                while operations.get() < max_operations {
4715                    operations.set(operations.get() + 1);
4716
4717                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4718                    match distribution {
4719                        0..=20 if !files.lock().is_empty() => {
4720                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4721                            let mut path = path.as_path();
4722                            while let Some(parent_path) = path.parent() {
4723                                path = parent_path;
4724                                if rng.lock().gen() {
4725                                    break;
4726                                }
4727                            }
4728
4729                            log::info!("Host: find/create local worktree {:?}", path);
4730                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4731                                project.find_or_create_local_worktree(path, true, cx)
4732                            });
4733                            let find_or_create_worktree = async move {
4734                                find_or_create_worktree.await.unwrap();
4735                            };
4736                            if rng.lock().gen() {
4737                                cx.background().spawn(find_or_create_worktree).detach();
4738                            } else {
4739                                find_or_create_worktree.await;
4740                            }
4741                        }
4742                        10..=80 if !files.lock().is_empty() => {
4743                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4744                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4745                                let (worktree, path) = project
4746                                    .update(&mut cx, |project, cx| {
4747                                        project.find_or_create_local_worktree(
4748                                            file.clone(),
4749                                            true,
4750                                            cx,
4751                                        )
4752                                    })
4753                                    .await
4754                                    .unwrap();
4755                                let project_path =
4756                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4757                                log::info!(
4758                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4759                                    file,
4760                                    project_path.0,
4761                                    project_path.1
4762                                );
4763                                let buffer = project
4764                                    .update(&mut cx, |project, cx| {
4765                                        project.open_buffer(project_path, cx)
4766                                    })
4767                                    .await
4768                                    .unwrap();
4769                                self.buffers.insert(buffer.clone());
4770                                buffer
4771                            } else {
4772                                self.buffers
4773                                    .iter()
4774                                    .choose(&mut *rng.lock())
4775                                    .unwrap()
4776                                    .clone()
4777                            };
4778
4779                            if rng.lock().gen_bool(0.1) {
4780                                cx.update(|cx| {
4781                                    log::info!(
4782                                        "Host: dropping buffer {:?}",
4783                                        buffer.read(cx).file().unwrap().full_path(cx)
4784                                    );
4785                                    self.buffers.remove(&buffer);
4786                                    drop(buffer);
4787                                });
4788                            } else {
4789                                buffer.update(&mut cx, |buffer, cx| {
4790                                    log::info!(
4791                                        "Host: updating buffer {:?} ({})",
4792                                        buffer.file().unwrap().full_path(cx),
4793                                        buffer.remote_id()
4794                                    );
4795                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4796                                });
4797                            }
4798                        }
4799                        _ => loop {
4800                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4801                            let mut path = PathBuf::new();
4802                            path.push("/");
4803                            for _ in 0..path_component_count {
4804                                let letter = rng.lock().gen_range(b'a'..=b'z');
4805                                path.push(std::str::from_utf8(&[letter]).unwrap());
4806                            }
4807                            path.set_extension("rs");
4808                            let parent_path = path.parent().unwrap();
4809
4810                            log::info!("Host: creating file {:?}", path,);
4811
4812                            if fs.create_dir(&parent_path).await.is_ok()
4813                                && fs.create_file(&path, Default::default()).await.is_ok()
4814                            {
4815                                files.lock().push(path);
4816                                break;
4817                            } else {
4818                                log::info!("Host: cannot create file");
4819                            }
4820                        },
4821                    }
4822
4823                    cx.background().simulate_random_delay().await;
4824                }
4825
4826                log::info!("Host done");
4827
4828                self.project = Some(project);
4829                (self, cx)
4830            }
4831        }
4832
4833        pub async fn simulate_guest(
4834            mut self,
4835            guest_id: usize,
4836            project: ModelHandle<Project>,
4837            operations: Rc<Cell<usize>>,
4838            max_operations: usize,
4839            rng: Arc<Mutex<StdRng>>,
4840            mut cx: TestAppContext,
4841        ) -> (Self, TestAppContext) {
4842            while operations.get() < max_operations {
4843                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4844                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4845                        project
4846                            .worktrees(&cx)
4847                            .filter(|worktree| {
4848                                let worktree = worktree.read(cx);
4849                                worktree.is_visible()
4850                                    && worktree.entries(false).any(|e| e.is_file())
4851                            })
4852                            .choose(&mut *rng.lock())
4853                    }) {
4854                        worktree
4855                    } else {
4856                        cx.background().simulate_random_delay().await;
4857                        continue;
4858                    };
4859
4860                    operations.set(operations.get() + 1);
4861                    let (worktree_root_name, project_path) =
4862                        worktree.read_with(&cx, |worktree, _| {
4863                            let entry = worktree
4864                                .entries(false)
4865                                .filter(|e| e.is_file())
4866                                .choose(&mut *rng.lock())
4867                                .unwrap();
4868                            (
4869                                worktree.root_name().to_string(),
4870                                (worktree.id(), entry.path.clone()),
4871                            )
4872                        });
4873                    log::info!(
4874                        "Guest {}: opening path {:?} in worktree {} ({})",
4875                        guest_id,
4876                        project_path.1,
4877                        project_path.0,
4878                        worktree_root_name,
4879                    );
4880                    let buffer = project
4881                        .update(&mut cx, |project, cx| {
4882                            project.open_buffer(project_path.clone(), cx)
4883                        })
4884                        .await
4885                        .unwrap();
4886                    log::info!(
4887                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4888                        guest_id,
4889                        project_path.1,
4890                        project_path.0,
4891                        worktree_root_name,
4892                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4893                    );
4894                    self.buffers.insert(buffer.clone());
4895                    buffer
4896                } else {
4897                    operations.set(operations.get() + 1);
4898
4899                    self.buffers
4900                        .iter()
4901                        .choose(&mut *rng.lock())
4902                        .unwrap()
4903                        .clone()
4904                };
4905
4906                let choice = rng.lock().gen_range(0..100);
4907                match choice {
4908                    0..=9 => {
4909                        cx.update(|cx| {
4910                            log::info!(
4911                                "Guest {}: dropping buffer {:?}",
4912                                guest_id,
4913                                buffer.read(cx).file().unwrap().full_path(cx)
4914                            );
4915                            self.buffers.remove(&buffer);
4916                            drop(buffer);
4917                        });
4918                    }
4919                    10..=19 => {
4920                        let completions = project.update(&mut cx, |project, cx| {
4921                            log::info!(
4922                                "Guest {}: requesting completions for buffer {} ({:?})",
4923                                guest_id,
4924                                buffer.read(cx).remote_id(),
4925                                buffer.read(cx).file().unwrap().full_path(cx)
4926                            );
4927                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4928                            project.completions(&buffer, offset, cx)
4929                        });
4930                        let completions = cx.background().spawn(async move {
4931                            completions.await.expect("completions request failed");
4932                        });
4933                        if rng.lock().gen_bool(0.3) {
4934                            log::info!("Guest {}: detaching completions request", guest_id);
4935                            completions.detach();
4936                        } else {
4937                            completions.await;
4938                        }
4939                    }
4940                    20..=29 => {
4941                        let code_actions = project.update(&mut cx, |project, cx| {
4942                            log::info!(
4943                                "Guest {}: requesting code actions for buffer {} ({:?})",
4944                                guest_id,
4945                                buffer.read(cx).remote_id(),
4946                                buffer.read(cx).file().unwrap().full_path(cx)
4947                            );
4948                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4949                            project.code_actions(&buffer, range, cx)
4950                        });
4951                        let code_actions = cx.background().spawn(async move {
4952                            code_actions.await.expect("code actions request failed");
4953                        });
4954                        if rng.lock().gen_bool(0.3) {
4955                            log::info!("Guest {}: detaching code actions request", guest_id);
4956                            code_actions.detach();
4957                        } else {
4958                            code_actions.await;
4959                        }
4960                    }
4961                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4962                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4963                            log::info!(
4964                                "Guest {}: saving buffer {} ({:?})",
4965                                guest_id,
4966                                buffer.remote_id(),
4967                                buffer.file().unwrap().full_path(cx)
4968                            );
4969                            (buffer.version(), buffer.save(cx))
4970                        });
4971                        let save = cx.background().spawn(async move {
4972                            let (saved_version, _) = save.await.expect("save request failed");
4973                            assert!(saved_version.observed_all(&requested_version));
4974                        });
4975                        if rng.lock().gen_bool(0.3) {
4976                            log::info!("Guest {}: detaching save request", guest_id);
4977                            save.detach();
4978                        } else {
4979                            save.await;
4980                        }
4981                    }
4982                    40..=44 => {
4983                        let prepare_rename = project.update(&mut cx, |project, cx| {
4984                            log::info!(
4985                                "Guest {}: preparing rename for buffer {} ({:?})",
4986                                guest_id,
4987                                buffer.read(cx).remote_id(),
4988                                buffer.read(cx).file().unwrap().full_path(cx)
4989                            );
4990                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4991                            project.prepare_rename(buffer, offset, cx)
4992                        });
4993                        let prepare_rename = cx.background().spawn(async move {
4994                            prepare_rename.await.expect("prepare rename request failed");
4995                        });
4996                        if rng.lock().gen_bool(0.3) {
4997                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4998                            prepare_rename.detach();
4999                        } else {
5000                            prepare_rename.await;
5001                        }
5002                    }
5003                    45..=49 => {
5004                        let definitions = project.update(&mut cx, |project, cx| {
5005                            log::info!(
5006                                "Guest {}: requesting definitions for buffer {} ({:?})",
5007                                guest_id,
5008                                buffer.read(cx).remote_id(),
5009                                buffer.read(cx).file().unwrap().full_path(cx)
5010                            );
5011                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5012                            project.definition(&buffer, offset, cx)
5013                        });
5014                        let definitions = cx.background().spawn(async move {
5015                            definitions.await.expect("definitions request failed")
5016                        });
5017                        if rng.lock().gen_bool(0.3) {
5018                            log::info!("Guest {}: detaching definitions request", guest_id);
5019                            definitions.detach();
5020                        } else {
5021                            self.buffers
5022                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5023                        }
5024                    }
5025                    50..=54 => {
5026                        let highlights = project.update(&mut cx, |project, cx| {
5027                            log::info!(
5028                                "Guest {}: requesting highlights for buffer {} ({:?})",
5029                                guest_id,
5030                                buffer.read(cx).remote_id(),
5031                                buffer.read(cx).file().unwrap().full_path(cx)
5032                            );
5033                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5034                            project.document_highlights(&buffer, offset, cx)
5035                        });
5036                        let highlights = cx.background().spawn(async move {
5037                            highlights.await.expect("highlights request failed");
5038                        });
5039                        if rng.lock().gen_bool(0.3) {
5040                            log::info!("Guest {}: detaching highlights request", guest_id);
5041                            highlights.detach();
5042                        } else {
5043                            highlights.await;
5044                        }
5045                    }
5046                    55..=59 => {
5047                        let search = project.update(&mut cx, |project, cx| {
5048                            let query = rng.lock().gen_range('a'..='z');
5049                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5050                            project.search(SearchQuery::text(query, false, false), cx)
5051                        });
5052                        let search = cx
5053                            .background()
5054                            .spawn(async move { search.await.expect("search request failed") });
5055                        if rng.lock().gen_bool(0.3) {
5056                            log::info!("Guest {}: detaching search request", guest_id);
5057                            search.detach();
5058                        } else {
5059                            self.buffers.extend(search.await.into_keys());
5060                        }
5061                    }
5062                    _ => {
5063                        buffer.update(&mut cx, |buffer, cx| {
5064                            log::info!(
5065                                "Guest {}: updating buffer {} ({:?})",
5066                                guest_id,
5067                                buffer.remote_id(),
5068                                buffer.file().unwrap().full_path(cx)
5069                            );
5070                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5071                        });
5072                    }
5073                }
5074                cx.background().simulate_random_delay().await;
5075            }
5076
5077            log::info!("Guest {} done", guest_id);
5078
5079            self.project = Some(project);
5080            (self, cx)
5081        }
5082    }
5083
5084    impl Drop for TestClient {
5085        fn drop(&mut self) {
5086            self.client.tear_down();
5087        }
5088    }
5089
5090    impl Executor for Arc<gpui::executor::Background> {
5091        type Timer = gpui::executor::Timer;
5092
5093        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5094            self.spawn(future).detach();
5095        }
5096
5097        fn timer(&self, duration: Duration) -> Self::Timer {
5098            self.as_ref().timer(duration)
5099        }
5100    }
5101
5102    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5103        channel
5104            .messages()
5105            .cursor::<()>()
5106            .map(|m| {
5107                (
5108                    m.sender.github_login.clone(),
5109                    m.body.clone(),
5110                    m.is_pending(),
5111                )
5112            })
5113            .collect()
5114    }
5115
5116    struct EmptyView;
5117
5118    impl gpui::Entity for EmptyView {
5119        type Event = ();
5120    }
5121
5122    impl gpui::View for EmptyView {
5123        fn ui_name() -> &'static str {
5124            "empty view"
5125        }
5126
5127        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5128            gpui::Element::boxed(gpui::elements::Empty)
5129        }
5130    }
5131}