rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::start_language_server)
  87            .add_message_handler(Server::update_language_server)
  88            .add_message_handler(Server::update_diagnostic_summary)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  96            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  97            .add_request_handler(
  98                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  99            )
 100            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 101            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 102            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 105            .add_request_handler(Server::update_buffer)
 106            .add_message_handler(Server::update_buffer_file)
 107            .add_message_handler(Server::buffer_reloaded)
 108            .add_message_handler(Server::buffer_saved)
 109            .add_request_handler(Server::save_buffer)
 110            .add_request_handler(Server::get_channels)
 111            .add_request_handler(Server::get_users)
 112            .add_request_handler(Server::join_channel)
 113            .add_message_handler(Server::leave_channel)
 114            .add_request_handler(Server::send_channel_message)
 115            .add_request_handler(Server::get_channel_messages);
 116
 117        Arc::new(server)
 118    }
 119
 120    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 121    where
 122        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 123        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 124        M: EnvelopedMessage,
 125    {
 126        let prev_handler = self.handlers.insert(
 127            TypeId::of::<M>(),
 128            Box::new(move |server, envelope| {
 129                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 130                (handler)(server, *envelope).boxed()
 131            }),
 132        );
 133        if prev_handler.is_some() {
 134            panic!("registered a handler for the same message twice");
 135        }
 136        self
 137    }
 138
 139    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 140    where
 141        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 142        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 143        M: RequestMessage,
 144    {
 145        self.add_message_handler(move |server, envelope| {
 146            let receipt = envelope.receipt();
 147            let response = (handler)(server.clone(), envelope);
 148            async move {
 149                match response.await {
 150                    Ok(response) => {
 151                        server.peer.respond(receipt, response)?;
 152                        Ok(())
 153                    }
 154                    Err(error) => {
 155                        server.peer.respond_with_error(
 156                            receipt,
 157                            proto::Error {
 158                                message: error.to_string(),
 159                            },
 160                        )?;
 161                        Err(error)
 162                    }
 163                }
 164            }
 165        })
 166    }
 167
 168    pub fn handle_connection<E: Executor>(
 169        self: &Arc<Self>,
 170        connection: Connection,
 171        addr: String,
 172        user_id: UserId,
 173        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 174        executor: E,
 175    ) -> impl Future<Output = ()> {
 176        let mut this = self.clone();
 177        async move {
 178            let (connection_id, handle_io, mut incoming_rx) = this
 179                .peer
 180                .add_connection(connection, {
 181                    let executor = executor.clone();
 182                    move |duration| {
 183                        let timer = executor.timer(duration);
 184                        async move {
 185                            timer.await;
 186                        }
 187                    }
 188                })
 189                .await;
 190
 191            if let Some(send_connection_id) = send_connection_id.as_mut() {
 192                let _ = send_connection_id.send(connection_id).await;
 193            }
 194
 195            this.state_mut().add_connection(connection_id, user_id);
 196            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 197                log::error!("error updating contacts for {:?}: {}", user_id, err);
 198            }
 199
 200            let handle_io = handle_io.fuse();
 201            futures::pin_mut!(handle_io);
 202            loop {
 203                let next_message = incoming_rx.next().fuse();
 204                futures::pin_mut!(next_message);
 205                futures::select_biased! {
 206                    result = handle_io => {
 207                        if let Err(err) = result {
 208                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 209                        }
 210                        break;
 211                    }
 212                    message = next_message => {
 213                        if let Some(message) = message {
 214                            let start_time = Instant::now();
 215                            let type_name = message.payload_type_name();
 216                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 217                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 218                                let notifications = this.notifications.clone();
 219                                let is_background = message.is_background();
 220                                let handle_message = (handler)(this.clone(), message);
 221                                let handle_message = async move {
 222                                    if let Err(err) = handle_message.await {
 223                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 224                                    } else {
 225                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 226                                    }
 227                                    if let Some(mut notifications) = notifications {
 228                                        let _ = notifications.send(()).await;
 229                                    }
 230                                };
 231                                if is_background {
 232                                    executor.spawn_detached(handle_message);
 233                                } else {
 234                                    handle_message.await;
 235                                }
 236                            } else {
 237                                log::warn!("unhandled message: {}", type_name);
 238                            }
 239                        } else {
 240                            log::info!("rpc connection closed {:?}", addr);
 241                            break;
 242                        }
 243                    }
 244                }
 245            }
 246
 247            if let Err(err) = this.sign_out(connection_id).await {
 248                log::error!("error signing out connection {:?} - {:?}", addr, err);
 249            }
 250        }
 251    }
 252
 253    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 254        self.peer.disconnect(connection_id);
 255        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 256
 257        for (project_id, project) in removed_connection.hosted_projects {
 258            if let Some(share) = project.share {
 259                broadcast(
 260                    connection_id,
 261                    share.guests.keys().copied().collect(),
 262                    |conn_id| {
 263                        self.peer
 264                            .send(conn_id, proto::UnshareProject { project_id })
 265                    },
 266                )?;
 267            }
 268        }
 269
 270        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 271            broadcast(connection_id, peer_ids, |conn_id| {
 272                self.peer.send(
 273                    conn_id,
 274                    proto::RemoveProjectCollaborator {
 275                        project_id,
 276                        peer_id: connection_id.0,
 277                    },
 278                )
 279            })?;
 280        }
 281
 282        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 283        Ok(())
 284    }
 285
 286    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 287        Ok(proto::Ack {})
 288    }
 289
 290    async fn register_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::RegisterProject>,
 293    ) -> tide::Result<proto::RegisterProjectResponse> {
 294        let project_id = {
 295            let mut state = self.state_mut();
 296            let user_id = state.user_id_for_connection(request.sender_id)?;
 297            state.register_project(request.sender_id, user_id)
 298        };
 299        Ok(proto::RegisterProjectResponse { project_id })
 300    }
 301
 302    async fn unregister_project(
 303        mut self: Arc<Server>,
 304        request: TypedEnvelope<proto::UnregisterProject>,
 305    ) -> tide::Result<()> {
 306        let project = self
 307            .state_mut()
 308            .unregister_project(request.payload.project_id, request.sender_id)?;
 309        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 310        Ok(())
 311    }
 312
 313    async fn share_project(
 314        mut self: Arc<Server>,
 315        request: TypedEnvelope<proto::ShareProject>,
 316    ) -> tide::Result<proto::Ack> {
 317        self.state_mut()
 318            .share_project(request.payload.project_id, request.sender_id);
 319        Ok(proto::Ack {})
 320    }
 321
 322    async fn unshare_project(
 323        mut self: Arc<Server>,
 324        request: TypedEnvelope<proto::UnshareProject>,
 325    ) -> tide::Result<()> {
 326        let project_id = request.payload.project_id;
 327        let project = self
 328            .state_mut()
 329            .unshare_project(project_id, request.sender_id)?;
 330
 331        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 332            self.peer
 333                .send(conn_id, proto::UnshareProject { project_id })
 334        })?;
 335        self.update_contacts_for_users(&project.authorized_user_ids)?;
 336        Ok(())
 337    }
 338
 339    async fn join_project(
 340        mut self: Arc<Server>,
 341        request: TypedEnvelope<proto::JoinProject>,
 342    ) -> tide::Result<proto::JoinProjectResponse> {
 343        let project_id = request.payload.project_id;
 344
 345        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 346        let (response, connection_ids, contact_user_ids) = self
 347            .state_mut()
 348            .join_project(request.sender_id, user_id, project_id)
 349            .and_then(|joined| {
 350                let share = joined.project.share()?;
 351                let peer_count = share.guests.len();
 352                let mut collaborators = Vec::with_capacity(peer_count);
 353                collaborators.push(proto::Collaborator {
 354                    peer_id: joined.project.host_connection_id.0,
 355                    replica_id: 0,
 356                    user_id: joined.project.host_user_id.to_proto(),
 357                });
 358                let worktrees = share
 359                    .worktrees
 360                    .iter()
 361                    .filter_map(|(id, shared_worktree)| {
 362                        let worktree = joined.project.worktrees.get(&id)?;
 363                        Some(proto::Worktree {
 364                            id: *id,
 365                            root_name: worktree.root_name.clone(),
 366                            entries: shared_worktree.entries.values().cloned().collect(),
 367                            diagnostic_summaries: shared_worktree
 368                                .diagnostic_summaries
 369                                .values()
 370                                .cloned()
 371                                .collect(),
 372                            visible: worktree.visible,
 373                        })
 374                    })
 375                    .collect();
 376                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 377                    if *peer_conn_id != request.sender_id {
 378                        collaborators.push(proto::Collaborator {
 379                            peer_id: peer_conn_id.0,
 380                            replica_id: *peer_replica_id as u32,
 381                            user_id: peer_user_id.to_proto(),
 382                        });
 383                    }
 384                }
 385                let response = proto::JoinProjectResponse {
 386                    worktrees,
 387                    replica_id: joined.replica_id as u32,
 388                    collaborators,
 389                    language_servers: joined.project.language_servers.clone(),
 390                };
 391                let connection_ids = joined.project.connection_ids();
 392                let contact_user_ids = joined.project.authorized_user_ids();
 393                Ok((response, connection_ids, contact_user_ids))
 394            })?;
 395
 396        broadcast(request.sender_id, connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::AddProjectCollaborator {
 400                    project_id,
 401                    collaborator: Some(proto::Collaborator {
 402                        peer_id: request.sender_id.0,
 403                        replica_id: response.replica_id,
 404                        user_id: user_id.to_proto(),
 405                    }),
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&contact_user_ids)?;
 410        Ok(response)
 411    }
 412
 413    async fn leave_project(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::LeaveProject>,
 416    ) -> tide::Result<()> {
 417        let sender_id = request.sender_id;
 418        let project_id = request.payload.project_id;
 419        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 420
 421        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 422            self.peer.send(
 423                conn_id,
 424                proto::RemoveProjectCollaborator {
 425                    project_id,
 426                    peer_id: sender_id.0,
 427                },
 428            )
 429        })?;
 430        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 431
 432        Ok(())
 433    }
 434
 435    async fn register_worktree(
 436        mut self: Arc<Server>,
 437        request: TypedEnvelope<proto::RegisterWorktree>,
 438    ) -> tide::Result<proto::Ack> {
 439        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 440
 441        let mut contact_user_ids = HashSet::default();
 442        contact_user_ids.insert(host_user_id);
 443        for github_login in &request.payload.authorized_logins {
 444            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 445            contact_user_ids.insert(contact_user_id);
 446        }
 447
 448        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 449        let guest_connection_ids;
 450        {
 451            let mut state = self.state_mut();
 452            guest_connection_ids = state
 453                .read_project(request.payload.project_id, request.sender_id)?
 454                .guest_connection_ids();
 455            state.register_worktree(
 456                request.payload.project_id,
 457                request.payload.worktree_id,
 458                request.sender_id,
 459                Worktree {
 460                    authorized_user_ids: contact_user_ids.clone(),
 461                    root_name: request.payload.root_name.clone(),
 462                    visible: request.payload.visible,
 463                },
 464            )?;
 465        }
 466        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 467            self.peer
 468                .forward_send(request.sender_id, connection_id, request.payload.clone())
 469        })?;
 470        self.update_contacts_for_users(&contact_user_ids)?;
 471        Ok(proto::Ack {})
 472    }
 473
 474    async fn unregister_worktree(
 475        mut self: Arc<Server>,
 476        request: TypedEnvelope<proto::UnregisterWorktree>,
 477    ) -> tide::Result<()> {
 478        let project_id = request.payload.project_id;
 479        let worktree_id = request.payload.worktree_id;
 480        let (worktree, guest_connection_ids) =
 481            self.state_mut()
 482                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 483        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 484            self.peer.send(
 485                conn_id,
 486                proto::UnregisterWorktree {
 487                    project_id,
 488                    worktree_id,
 489                },
 490            )
 491        })?;
 492        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 493        Ok(())
 494    }
 495
 496    async fn update_worktree(
 497        mut self: Arc<Server>,
 498        request: TypedEnvelope<proto::UpdateWorktree>,
 499    ) -> tide::Result<proto::Ack> {
 500        let connection_ids = self.state_mut().update_worktree(
 501            request.sender_id,
 502            request.payload.project_id,
 503            request.payload.worktree_id,
 504            &request.payload.removed_entries,
 505            &request.payload.updated_entries,
 506        )?;
 507
 508        broadcast(request.sender_id, connection_ids, |connection_id| {
 509            self.peer
 510                .forward_send(request.sender_id, connection_id, request.payload.clone())
 511        })?;
 512
 513        Ok(proto::Ack {})
 514    }
 515
 516    async fn update_diagnostic_summary(
 517        mut self: Arc<Server>,
 518        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 519    ) -> tide::Result<()> {
 520        let summary = request
 521            .payload
 522            .summary
 523            .clone()
 524            .ok_or_else(|| anyhow!("invalid summary"))?;
 525        let receiver_ids = self.state_mut().update_diagnostic_summary(
 526            request.payload.project_id,
 527            request.payload.worktree_id,
 528            request.sender_id,
 529            summary,
 530        )?;
 531
 532        broadcast(request.sender_id, receiver_ids, |connection_id| {
 533            self.peer
 534                .forward_send(request.sender_id, connection_id, request.payload.clone())
 535        })?;
 536        Ok(())
 537    }
 538
 539    async fn start_language_server(
 540        mut self: Arc<Server>,
 541        request: TypedEnvelope<proto::StartLanguageServer>,
 542    ) -> tide::Result<()> {
 543        let receiver_ids = self.state_mut().start_language_server(
 544            request.payload.project_id,
 545            request.sender_id,
 546            request
 547                .payload
 548                .server
 549                .clone()
 550                .ok_or_else(|| anyhow!("invalid language server"))?,
 551        )?;
 552        broadcast(request.sender_id, receiver_ids, |connection_id| {
 553            self.peer
 554                .forward_send(request.sender_id, connection_id, request.payload.clone())
 555        })?;
 556        Ok(())
 557    }
 558
 559    async fn update_language_server(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::UpdateLanguageServer>,
 562    ) -> tide::Result<()> {
 563        let receiver_ids = self
 564            .state()
 565            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn forward_project_request<T>(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<T>,
 576    ) -> tide::Result<T::Response>
 577    where
 578        T: EntityMessage + RequestMessage,
 579    {
 580        let host_connection_id = self
 581            .state()
 582            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 583            .host_connection_id;
 584        Ok(self
 585            .peer
 586            .forward_request(request.sender_id, host_connection_id, request.payload)
 587            .await?)
 588    }
 589
 590    async fn save_buffer(
 591        self: Arc<Server>,
 592        request: TypedEnvelope<proto::SaveBuffer>,
 593    ) -> tide::Result<proto::BufferSaved> {
 594        let host;
 595        let mut guests;
 596        {
 597            let state = self.state();
 598            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 599            host = project.host_connection_id;
 600            guests = project.guest_connection_ids()
 601        }
 602
 603        let response = self
 604            .peer
 605            .forward_request(request.sender_id, host, request.payload.clone())
 606            .await?;
 607
 608        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 609        broadcast(host, guests, |conn_id| {
 610            self.peer.forward_send(host, conn_id, response.clone())
 611        })?;
 612
 613        Ok(response)
 614    }
 615
 616    async fn update_buffer(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::UpdateBuffer>,
 619    ) -> tide::Result<proto::Ack> {
 620        let receiver_ids = self
 621            .state()
 622            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 623        broadcast(request.sender_id, receiver_ids, |connection_id| {
 624            self.peer
 625                .forward_send(request.sender_id, connection_id, request.payload.clone())
 626        })?;
 627        Ok(proto::Ack {})
 628    }
 629
 630    async fn update_buffer_file(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::UpdateBufferFile>,
 633    ) -> tide::Result<()> {
 634        let receiver_ids = self
 635            .state()
 636            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 637        broadcast(request.sender_id, receiver_ids, |connection_id| {
 638            self.peer
 639                .forward_send(request.sender_id, connection_id, request.payload.clone())
 640        })?;
 641        Ok(())
 642    }
 643
 644    async fn buffer_reloaded(
 645        self: Arc<Server>,
 646        request: TypedEnvelope<proto::BufferReloaded>,
 647    ) -> tide::Result<()> {
 648        let receiver_ids = self
 649            .state()
 650            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 651        broadcast(request.sender_id, receiver_ids, |connection_id| {
 652            self.peer
 653                .forward_send(request.sender_id, connection_id, request.payload.clone())
 654        })?;
 655        Ok(())
 656    }
 657
 658    async fn buffer_saved(
 659        self: Arc<Server>,
 660        request: TypedEnvelope<proto::BufferSaved>,
 661    ) -> tide::Result<()> {
 662        let receiver_ids = self
 663            .state()
 664            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 665        broadcast(request.sender_id, receiver_ids, |connection_id| {
 666            self.peer
 667                .forward_send(request.sender_id, connection_id, request.payload.clone())
 668        })?;
 669        Ok(())
 670    }
 671
 672    async fn get_channels(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::GetChannels>,
 675    ) -> tide::Result<proto::GetChannelsResponse> {
 676        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 677        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 678        Ok(proto::GetChannelsResponse {
 679            channels: channels
 680                .into_iter()
 681                .map(|chan| proto::Channel {
 682                    id: chan.id.to_proto(),
 683                    name: chan.name,
 684                })
 685                .collect(),
 686        })
 687    }
 688
 689    async fn get_users(
 690        self: Arc<Server>,
 691        request: TypedEnvelope<proto::GetUsers>,
 692    ) -> tide::Result<proto::GetUsersResponse> {
 693        let user_ids = request
 694            .payload
 695            .user_ids
 696            .into_iter()
 697            .map(UserId::from_proto)
 698            .collect();
 699        let users = self
 700            .app_state
 701            .db
 702            .get_users_by_ids(user_ids)
 703            .await?
 704            .into_iter()
 705            .map(|user| proto::User {
 706                id: user.id.to_proto(),
 707                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 708                github_login: user.github_login,
 709            })
 710            .collect();
 711        Ok(proto::GetUsersResponse { users })
 712    }
 713
 714    fn update_contacts_for_users<'a>(
 715        self: &Arc<Server>,
 716        user_ids: impl IntoIterator<Item = &'a UserId>,
 717    ) -> anyhow::Result<()> {
 718        let mut result = Ok(());
 719        let state = self.state();
 720        for user_id in user_ids {
 721            let contacts = state.contacts_for_user(*user_id);
 722            for connection_id in state.connection_ids_for_user(*user_id) {
 723                if let Err(error) = self.peer.send(
 724                    connection_id,
 725                    proto::UpdateContacts {
 726                        contacts: contacts.clone(),
 727                    },
 728                ) {
 729                    result = Err(error);
 730                }
 731            }
 732        }
 733        result
 734    }
 735
 736    async fn join_channel(
 737        mut self: Arc<Self>,
 738        request: TypedEnvelope<proto::JoinChannel>,
 739    ) -> tide::Result<proto::JoinChannelResponse> {
 740        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 741        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 742        if !self
 743            .app_state
 744            .db
 745            .can_user_access_channel(user_id, channel_id)
 746            .await?
 747        {
 748            Err(anyhow!("access denied"))?;
 749        }
 750
 751        self.state_mut().join_channel(request.sender_id, channel_id);
 752        let messages = self
 753            .app_state
 754            .db
 755            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 756            .await?
 757            .into_iter()
 758            .map(|msg| proto::ChannelMessage {
 759                id: msg.id.to_proto(),
 760                body: msg.body,
 761                timestamp: msg.sent_at.unix_timestamp() as u64,
 762                sender_id: msg.sender_id.to_proto(),
 763                nonce: Some(msg.nonce.as_u128().into()),
 764            })
 765            .collect::<Vec<_>>();
 766        Ok(proto::JoinChannelResponse {
 767            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 768            messages,
 769        })
 770    }
 771
 772    async fn leave_channel(
 773        mut self: Arc<Self>,
 774        request: TypedEnvelope<proto::LeaveChannel>,
 775    ) -> tide::Result<()> {
 776        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 777        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 778        if !self
 779            .app_state
 780            .db
 781            .can_user_access_channel(user_id, channel_id)
 782            .await?
 783        {
 784            Err(anyhow!("access denied"))?;
 785        }
 786
 787        self.state_mut()
 788            .leave_channel(request.sender_id, channel_id);
 789
 790        Ok(())
 791    }
 792
 793    async fn send_channel_message(
 794        self: Arc<Self>,
 795        request: TypedEnvelope<proto::SendChannelMessage>,
 796    ) -> tide::Result<proto::SendChannelMessageResponse> {
 797        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 798        let user_id;
 799        let connection_ids;
 800        {
 801            let state = self.state();
 802            user_id = state.user_id_for_connection(request.sender_id)?;
 803            connection_ids = state.channel_connection_ids(channel_id)?;
 804        }
 805
 806        // Validate the message body.
 807        let body = request.payload.body.trim().to_string();
 808        if body.len() > MAX_MESSAGE_LEN {
 809            return Err(anyhow!("message is too long"))?;
 810        }
 811        if body.is_empty() {
 812            return Err(anyhow!("message can't be blank"))?;
 813        }
 814
 815        let timestamp = OffsetDateTime::now_utc();
 816        let nonce = request
 817            .payload
 818            .nonce
 819            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 820
 821        let message_id = self
 822            .app_state
 823            .db
 824            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 825            .await?
 826            .to_proto();
 827        let message = proto::ChannelMessage {
 828            sender_id: user_id.to_proto(),
 829            id: message_id,
 830            body,
 831            timestamp: timestamp.unix_timestamp() as u64,
 832            nonce: Some(nonce),
 833        };
 834        broadcast(request.sender_id, connection_ids, |conn_id| {
 835            self.peer.send(
 836                conn_id,
 837                proto::ChannelMessageSent {
 838                    channel_id: channel_id.to_proto(),
 839                    message: Some(message.clone()),
 840                },
 841            )
 842        })?;
 843        Ok(proto::SendChannelMessageResponse {
 844            message: Some(message),
 845        })
 846    }
 847
 848    async fn get_channel_messages(
 849        self: Arc<Self>,
 850        request: TypedEnvelope<proto::GetChannelMessages>,
 851    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 852        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 853        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 854        if !self
 855            .app_state
 856            .db
 857            .can_user_access_channel(user_id, channel_id)
 858            .await?
 859        {
 860            Err(anyhow!("access denied"))?;
 861        }
 862
 863        let messages = self
 864            .app_state
 865            .db
 866            .get_channel_messages(
 867                channel_id,
 868                MESSAGE_COUNT_PER_PAGE,
 869                Some(MessageId::from_proto(request.payload.before_message_id)),
 870            )
 871            .await?
 872            .into_iter()
 873            .map(|msg| proto::ChannelMessage {
 874                id: msg.id.to_proto(),
 875                body: msg.body,
 876                timestamp: msg.sent_at.unix_timestamp() as u64,
 877                sender_id: msg.sender_id.to_proto(),
 878                nonce: Some(msg.nonce.as_u128().into()),
 879            })
 880            .collect::<Vec<_>>();
 881
 882        Ok(proto::GetChannelMessagesResponse {
 883            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 884            messages,
 885        })
 886    }
 887
 888    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 889        self.store.read()
 890    }
 891
 892    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 893        self.store.write()
 894    }
 895}
 896
 897impl Executor for RealExecutor {
 898    type Timer = Timer;
 899
 900    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 901        task::spawn(future);
 902    }
 903
 904    fn timer(&self, duration: Duration) -> Self::Timer {
 905        Timer::after(duration)
 906    }
 907}
 908
 909fn broadcast<F>(
 910    sender_id: ConnectionId,
 911    receiver_ids: Vec<ConnectionId>,
 912    mut f: F,
 913) -> anyhow::Result<()>
 914where
 915    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 916{
 917    let mut result = Ok(());
 918    for receiver_id in receiver_ids {
 919        if receiver_id != sender_id {
 920            if let Err(error) = f(receiver_id) {
 921                if result.is_ok() {
 922                    result = Err(error);
 923                }
 924            }
 925        }
 926    }
 927    result
 928}
 929
 930pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 931    let server = Server::new(app.state().clone(), rpc.clone(), None);
 932    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 933        let server = server.clone();
 934        async move {
 935            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 936
 937            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 938            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 939            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 940            let client_protocol_version: Option<u32> = request
 941                .header("X-Zed-Protocol-Version")
 942                .and_then(|v| v.as_str().parse().ok());
 943
 944            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 945                return Ok(Response::new(StatusCode::UpgradeRequired));
 946            }
 947
 948            let header = match request.header("Sec-Websocket-Key") {
 949                Some(h) => h.as_str(),
 950                None => return Err(anyhow!("expected sec-websocket-key"))?,
 951            };
 952
 953            let user_id = process_auth_header(&request).await?;
 954
 955            let mut response = Response::new(StatusCode::SwitchingProtocols);
 956            response.insert_header(UPGRADE, "websocket");
 957            response.insert_header(CONNECTION, "Upgrade");
 958            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 959            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 960            response.insert_header("Sec-Websocket-Version", "13");
 961
 962            let http_res: &mut tide::http::Response = response.as_mut();
 963            let upgrade_receiver = http_res.recv_upgrade().await;
 964            let addr = request.remote().unwrap_or("unknown").to_string();
 965            task::spawn(async move {
 966                if let Some(stream) = upgrade_receiver.await {
 967                    server
 968                        .handle_connection(
 969                            Connection::new(
 970                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 971                            ),
 972                            addr,
 973                            user_id,
 974                            None,
 975                            RealExecutor,
 976                        )
 977                        .await;
 978                }
 979            });
 980
 981            Ok(response)
 982        }
 983    });
 984}
 985
 986fn header_contains_ignore_case<T>(
 987    request: &tide::Request<T>,
 988    header_name: HeaderName,
 989    value: &str,
 990) -> bool {
 991    request
 992        .header(header_name)
 993        .map(|h| {
 994            h.as_str()
 995                .split(',')
 996                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 997        })
 998        .unwrap_or(false)
 999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004    use crate::{
1005        auth,
1006        db::{tests::TestDb, UserId},
1007        github, AppState, Config,
1008    };
1009    use ::rpc::Peer;
1010    use client::{
1011        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1012        EstablishConnectionError, UserStore,
1013    };
1014    use collections::BTreeMap;
1015    use editor::{
1016        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1017        Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1018    };
1019    use gpui::{executor, ModelHandle, TestAppContext};
1020    use language::{
1021        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1022        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1023    };
1024    use lsp;
1025    use parking_lot::Mutex;
1026    use postage::barrier;
1027    use project::{
1028        fs::{FakeFs, Fs as _},
1029        search::SearchQuery,
1030        worktree::WorktreeHandle,
1031        DiagnosticSummary, Project, ProjectPath,
1032    };
1033    use rand::prelude::*;
1034    use rpc::PeerId;
1035    use serde_json::json;
1036    use sqlx::types::time::OffsetDateTime;
1037    use std::{
1038        cell::Cell,
1039        env,
1040        ops::Deref,
1041        path::{Path, PathBuf},
1042        rc::Rc,
1043        sync::{
1044            atomic::{AtomicBool, Ordering::SeqCst},
1045            Arc,
1046        },
1047        time::Duration,
1048    };
1049    use workspace::{Settings, Workspace, WorkspaceParams};
1050
1051    #[cfg(test)]
1052    #[ctor::ctor]
1053    fn init_logger() {
1054        if std::env::var("RUST_LOG").is_ok() {
1055            env_logger::init();
1056        }
1057    }
1058
1059    #[gpui::test(iterations = 10)]
1060    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1061        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1062        let lang_registry = Arc::new(LanguageRegistry::test());
1063        let fs = FakeFs::new(cx_a.background());
1064        cx_a.foreground().forbid_parking();
1065
1066        // Connect to a server as 2 clients.
1067        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1068        let client_a = server.create_client(cx_a, "user_a").await;
1069        let client_b = server.create_client(cx_b, "user_b").await;
1070
1071        // Share a project as client A
1072        fs.insert_tree(
1073            "/a",
1074            json!({
1075                ".zed.toml": r#"collaborators = ["user_b"]"#,
1076                "a.txt": "a-contents",
1077                "b.txt": "b-contents",
1078            }),
1079        )
1080        .await;
1081        let project_a = cx_a.update(|cx| {
1082            Project::local(
1083                client_a.clone(),
1084                client_a.user_store.clone(),
1085                lang_registry.clone(),
1086                fs.clone(),
1087                cx,
1088            )
1089        });
1090        let (worktree_a, _) = project_a
1091            .update(cx_a, |p, cx| {
1092                p.find_or_create_local_worktree("/a", true, cx)
1093            })
1094            .await
1095            .unwrap();
1096        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1097        worktree_a
1098            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1099            .await;
1100        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1101        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1102
1103        // Join that project as client B
1104        let project_b = Project::remote(
1105            project_id,
1106            client_b.clone(),
1107            client_b.user_store.clone(),
1108            lang_registry.clone(),
1109            fs.clone(),
1110            &mut cx_b.to_async(),
1111        )
1112        .await
1113        .unwrap();
1114
1115        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1116            assert_eq!(
1117                project
1118                    .collaborators()
1119                    .get(&client_a.peer_id)
1120                    .unwrap()
1121                    .user
1122                    .github_login,
1123                "user_a"
1124            );
1125            project.replica_id()
1126        });
1127        project_a
1128            .condition(&cx_a, |tree, _| {
1129                tree.collaborators()
1130                    .get(&client_b.peer_id)
1131                    .map_or(false, |collaborator| {
1132                        collaborator.replica_id == replica_id_b
1133                            && collaborator.user.github_login == "user_b"
1134                    })
1135            })
1136            .await;
1137
1138        // Open the same file as client B and client A.
1139        let buffer_b = project_b
1140            .update(cx_b, |p, cx| {
1141                p.open_buffer_for_path((worktree_id, "b.txt"), cx)
1142            })
1143            .await
1144            .unwrap();
1145        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1146        buffer_b.read_with(cx_b, |buf, cx| {
1147            assert_eq!(buf.read(cx).text(), "b-contents")
1148        });
1149        project_a.read_with(cx_a, |project, cx| {
1150            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1151        });
1152        let buffer_a = project_a
1153            .update(cx_a, |p, cx| {
1154                p.open_buffer_for_path((worktree_id, "b.txt"), cx)
1155            })
1156            .await
1157            .unwrap();
1158
1159        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1160
1161        // TODO
1162        // // Create a selection set as client B and see that selection set as client A.
1163        // buffer_a
1164        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1165        //     .await;
1166
1167        // Edit the buffer as client B and see that edit as client A.
1168        editor_b.update(cx_b, |editor, cx| {
1169            editor.handle_input(&Input("ok, ".into()), cx)
1170        });
1171        buffer_a
1172            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1173            .await;
1174
1175        // TODO
1176        // // Remove the selection set as client B, see those selections disappear as client A.
1177        cx_b.update(move |_| drop(editor_b));
1178        // buffer_a
1179        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1180        //     .await;
1181
1182        // Dropping the client B's project removes client B from client A's collaborators.
1183        cx_b.update(move |_| drop(project_b));
1184        project_a
1185            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1186            .await;
1187    }
1188
1189    #[gpui::test(iterations = 10)]
1190    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1191        let lang_registry = Arc::new(LanguageRegistry::test());
1192        let fs = FakeFs::new(cx_a.background());
1193        cx_a.foreground().forbid_parking();
1194
1195        // Connect to a server as 2 clients.
1196        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1197        let client_a = server.create_client(cx_a, "user_a").await;
1198        let client_b = server.create_client(cx_b, "user_b").await;
1199
1200        // Share a project as client A
1201        fs.insert_tree(
1202            "/a",
1203            json!({
1204                ".zed.toml": r#"collaborators = ["user_b"]"#,
1205                "a.txt": "a-contents",
1206                "b.txt": "b-contents",
1207            }),
1208        )
1209        .await;
1210        let project_a = cx_a.update(|cx| {
1211            Project::local(
1212                client_a.clone(),
1213                client_a.user_store.clone(),
1214                lang_registry.clone(),
1215                fs.clone(),
1216                cx,
1217            )
1218        });
1219        let (worktree_a, _) = project_a
1220            .update(cx_a, |p, cx| {
1221                p.find_or_create_local_worktree("/a", true, cx)
1222            })
1223            .await
1224            .unwrap();
1225        worktree_a
1226            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1227            .await;
1228        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1229        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1230        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1231        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1232
1233        // Join that project as client B
1234        let project_b = Project::remote(
1235            project_id,
1236            client_b.clone(),
1237            client_b.user_store.clone(),
1238            lang_registry.clone(),
1239            fs.clone(),
1240            &mut cx_b.to_async(),
1241        )
1242        .await
1243        .unwrap();
1244        project_b
1245            .update(cx_b, |p, cx| {
1246                p.open_buffer_for_path((worktree_id, "a.txt"), cx)
1247            })
1248            .await
1249            .unwrap();
1250
1251        // Unshare the project as client A
1252        project_a
1253            .update(cx_a, |project, cx| project.unshare(cx))
1254            .await
1255            .unwrap();
1256        project_b
1257            .condition(cx_b, |project, _| project.is_read_only())
1258            .await;
1259        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1260        cx_b.update(|_| {
1261            drop(project_b);
1262        });
1263
1264        // Share the project again and ensure guests can still join.
1265        project_a
1266            .update(cx_a, |project, cx| project.share(cx))
1267            .await
1268            .unwrap();
1269        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1270
1271        let project_b2 = Project::remote(
1272            project_id,
1273            client_b.clone(),
1274            client_b.user_store.clone(),
1275            lang_registry.clone(),
1276            fs.clone(),
1277            &mut cx_b.to_async(),
1278        )
1279        .await
1280        .unwrap();
1281        project_b2
1282            .update(cx_b, |p, cx| {
1283                p.open_buffer_for_path((worktree_id, "a.txt"), cx)
1284            })
1285            .await
1286            .unwrap();
1287    }
1288
1289    #[gpui::test(iterations = 10)]
1290    async fn test_propagate_saves_and_fs_changes(
1291        cx_a: &mut TestAppContext,
1292        cx_b: &mut TestAppContext,
1293        cx_c: &mut TestAppContext,
1294    ) {
1295        let lang_registry = Arc::new(LanguageRegistry::test());
1296        let fs = FakeFs::new(cx_a.background());
1297        cx_a.foreground().forbid_parking();
1298
1299        // Connect to a server as 3 clients.
1300        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1301        let client_a = server.create_client(cx_a, "user_a").await;
1302        let client_b = server.create_client(cx_b, "user_b").await;
1303        let client_c = server.create_client(cx_c, "user_c").await;
1304
1305        // Share a worktree as client A.
1306        fs.insert_tree(
1307            "/a",
1308            json!({
1309                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1310                "file1": "",
1311                "file2": ""
1312            }),
1313        )
1314        .await;
1315        let project_a = cx_a.update(|cx| {
1316            Project::local(
1317                client_a.clone(),
1318                client_a.user_store.clone(),
1319                lang_registry.clone(),
1320                fs.clone(),
1321                cx,
1322            )
1323        });
1324        let (worktree_a, _) = project_a
1325            .update(cx_a, |p, cx| {
1326                p.find_or_create_local_worktree("/a", true, cx)
1327            })
1328            .await
1329            .unwrap();
1330        worktree_a
1331            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1332            .await;
1333        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1334        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1335        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1336
1337        // Join that worktree as clients B and C.
1338        let project_b = Project::remote(
1339            project_id,
1340            client_b.clone(),
1341            client_b.user_store.clone(),
1342            lang_registry.clone(),
1343            fs.clone(),
1344            &mut cx_b.to_async(),
1345        )
1346        .await
1347        .unwrap();
1348        let project_c = Project::remote(
1349            project_id,
1350            client_c.clone(),
1351            client_c.user_store.clone(),
1352            lang_registry.clone(),
1353            fs.clone(),
1354            &mut cx_c.to_async(),
1355        )
1356        .await
1357        .unwrap();
1358        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1359        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1360
1361        // Open and edit a buffer as both guests B and C.
1362        let buffer_b = project_b
1363            .update(cx_b, |p, cx| {
1364                p.open_buffer_for_path((worktree_id, "file1"), cx)
1365            })
1366            .await
1367            .unwrap();
1368        let buffer_c = project_c
1369            .update(cx_c, |p, cx| {
1370                p.open_buffer_for_path((worktree_id, "file1"), cx)
1371            })
1372            .await
1373            .unwrap();
1374        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1375        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1376
1377        // Open and edit that buffer as the host.
1378        let buffer_a = project_a
1379            .update(cx_a, |p, cx| {
1380                p.open_buffer_for_path((worktree_id, "file1"), cx)
1381            })
1382            .await
1383            .unwrap();
1384
1385        buffer_a
1386            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1387            .await;
1388        buffer_a.update(cx_a, |buf, cx| {
1389            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1390        });
1391
1392        // Wait for edits to propagate
1393        buffer_a
1394            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1395            .await;
1396        buffer_b
1397            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1398            .await;
1399        buffer_c
1400            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1401            .await;
1402
1403        // Edit the buffer as the host and concurrently save as guest B.
1404        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1405        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1406        save_b.await.unwrap();
1407        assert_eq!(
1408            fs.load("/a/file1".as_ref()).await.unwrap(),
1409            "hi-a, i-am-c, i-am-b, i-am-a"
1410        );
1411        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1412        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1413        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1414
1415        worktree_a.flush_fs_events(cx_a).await;
1416
1417        // Make changes on host's file system, see those changes on guest worktrees.
1418        fs.rename(
1419            "/a/file1".as_ref(),
1420            "/a/file1-renamed".as_ref(),
1421            Default::default(),
1422        )
1423        .await
1424        .unwrap();
1425
1426        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1427            .await
1428            .unwrap();
1429        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1430
1431        worktree_a
1432            .condition(&cx_a, |tree, _| {
1433                tree.paths()
1434                    .map(|p| p.to_string_lossy())
1435                    .collect::<Vec<_>>()
1436                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1437            })
1438            .await;
1439        worktree_b
1440            .condition(&cx_b, |tree, _| {
1441                tree.paths()
1442                    .map(|p| p.to_string_lossy())
1443                    .collect::<Vec<_>>()
1444                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1445            })
1446            .await;
1447        worktree_c
1448            .condition(&cx_c, |tree, _| {
1449                tree.paths()
1450                    .map(|p| p.to_string_lossy())
1451                    .collect::<Vec<_>>()
1452                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1453            })
1454            .await;
1455
1456        // Ensure buffer files are updated as well.
1457        buffer_a
1458            .condition(&cx_a, |buf, _| {
1459                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1460            })
1461            .await;
1462        buffer_b
1463            .condition(&cx_b, |buf, _| {
1464                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1465            })
1466            .await;
1467        buffer_c
1468            .condition(&cx_c, |buf, _| {
1469                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1470            })
1471            .await;
1472    }
1473
1474    #[gpui::test(iterations = 10)]
1475    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1476        cx_a.foreground().forbid_parking();
1477        let lang_registry = Arc::new(LanguageRegistry::test());
1478        let fs = FakeFs::new(cx_a.background());
1479
1480        // Connect to a server as 2 clients.
1481        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1482        let client_a = server.create_client(cx_a, "user_a").await;
1483        let client_b = server.create_client(cx_b, "user_b").await;
1484
1485        // Share a project as client A
1486        fs.insert_tree(
1487            "/dir",
1488            json!({
1489                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1490                "a.txt": "a-contents",
1491            }),
1492        )
1493        .await;
1494
1495        let project_a = cx_a.update(|cx| {
1496            Project::local(
1497                client_a.clone(),
1498                client_a.user_store.clone(),
1499                lang_registry.clone(),
1500                fs.clone(),
1501                cx,
1502            )
1503        });
1504        let (worktree_a, _) = project_a
1505            .update(cx_a, |p, cx| {
1506                p.find_or_create_local_worktree("/dir", true, cx)
1507            })
1508            .await
1509            .unwrap();
1510        worktree_a
1511            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1512            .await;
1513        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1514        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1515        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1516
1517        // Join that project as client B
1518        let project_b = Project::remote(
1519            project_id,
1520            client_b.clone(),
1521            client_b.user_store.clone(),
1522            lang_registry.clone(),
1523            fs.clone(),
1524            &mut cx_b.to_async(),
1525        )
1526        .await
1527        .unwrap();
1528
1529        // Open a buffer as client B
1530        let buffer_b = project_b
1531            .update(cx_b, |p, cx| {
1532                p.open_buffer_for_path((worktree_id, "a.txt"), cx)
1533            })
1534            .await
1535            .unwrap();
1536
1537        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1538        buffer_b.read_with(cx_b, |buf, _| {
1539            assert!(buf.is_dirty());
1540            assert!(!buf.has_conflict());
1541        });
1542
1543        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1544        buffer_b
1545            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1546            .await;
1547        buffer_b.read_with(cx_b, |buf, _| {
1548            assert!(!buf.has_conflict());
1549        });
1550
1551        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1552        buffer_b.read_with(cx_b, |buf, _| {
1553            assert!(buf.is_dirty());
1554            assert!(!buf.has_conflict());
1555        });
1556    }
1557
1558    #[gpui::test(iterations = 10)]
1559    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1560        cx_a.foreground().forbid_parking();
1561        let lang_registry = Arc::new(LanguageRegistry::test());
1562        let fs = FakeFs::new(cx_a.background());
1563
1564        // Connect to a server as 2 clients.
1565        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1566        let client_a = server.create_client(cx_a, "user_a").await;
1567        let client_b = server.create_client(cx_b, "user_b").await;
1568
1569        // Share a project as client A
1570        fs.insert_tree(
1571            "/dir",
1572            json!({
1573                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1574                "a.txt": "a-contents",
1575            }),
1576        )
1577        .await;
1578
1579        let project_a = cx_a.update(|cx| {
1580            Project::local(
1581                client_a.clone(),
1582                client_a.user_store.clone(),
1583                lang_registry.clone(),
1584                fs.clone(),
1585                cx,
1586            )
1587        });
1588        let (worktree_a, _) = project_a
1589            .update(cx_a, |p, cx| {
1590                p.find_or_create_local_worktree("/dir", true, cx)
1591            })
1592            .await
1593            .unwrap();
1594        worktree_a
1595            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1596            .await;
1597        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1598        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1599        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1600
1601        // Join that project as client B
1602        let project_b = Project::remote(
1603            project_id,
1604            client_b.clone(),
1605            client_b.user_store.clone(),
1606            lang_registry.clone(),
1607            fs.clone(),
1608            &mut cx_b.to_async(),
1609        )
1610        .await
1611        .unwrap();
1612        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1613
1614        // Open a buffer as client B
1615        let buffer_b = project_b
1616            .update(cx_b, |p, cx| {
1617                p.open_buffer_for_path((worktree_id, "a.txt"), cx)
1618            })
1619            .await
1620            .unwrap();
1621        buffer_b.read_with(cx_b, |buf, _| {
1622            assert!(!buf.is_dirty());
1623            assert!(!buf.has_conflict());
1624        });
1625
1626        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1627            .await
1628            .unwrap();
1629        buffer_b
1630            .condition(&cx_b, |buf, _| {
1631                buf.text() == "new contents" && !buf.is_dirty()
1632            })
1633            .await;
1634        buffer_b.read_with(cx_b, |buf, _| {
1635            assert!(!buf.has_conflict());
1636        });
1637    }
1638
1639    #[gpui::test(iterations = 10)]
1640    async fn test_editing_while_guest_opens_buffer(
1641        cx_a: &mut TestAppContext,
1642        cx_b: &mut TestAppContext,
1643    ) {
1644        cx_a.foreground().forbid_parking();
1645        let lang_registry = Arc::new(LanguageRegistry::test());
1646        let fs = FakeFs::new(cx_a.background());
1647
1648        // Connect to a server as 2 clients.
1649        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1650        let client_a = server.create_client(cx_a, "user_a").await;
1651        let client_b = server.create_client(cx_b, "user_b").await;
1652
1653        // Share a project as client A
1654        fs.insert_tree(
1655            "/dir",
1656            json!({
1657                ".zed.toml": r#"collaborators = ["user_b"]"#,
1658                "a.txt": "a-contents",
1659            }),
1660        )
1661        .await;
1662        let project_a = cx_a.update(|cx| {
1663            Project::local(
1664                client_a.clone(),
1665                client_a.user_store.clone(),
1666                lang_registry.clone(),
1667                fs.clone(),
1668                cx,
1669            )
1670        });
1671        let (worktree_a, _) = project_a
1672            .update(cx_a, |p, cx| {
1673                p.find_or_create_local_worktree("/dir", true, cx)
1674            })
1675            .await
1676            .unwrap();
1677        worktree_a
1678            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1679            .await;
1680        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1681        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1682        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1683
1684        // Join that project as client B
1685        let project_b = Project::remote(
1686            project_id,
1687            client_b.clone(),
1688            client_b.user_store.clone(),
1689            lang_registry.clone(),
1690            fs.clone(),
1691            &mut cx_b.to_async(),
1692        )
1693        .await
1694        .unwrap();
1695
1696        // Open a buffer as client A
1697        let buffer_a = project_a
1698            .update(cx_a, |p, cx| {
1699                p.open_buffer_for_path((worktree_id, "a.txt"), cx)
1700            })
1701            .await
1702            .unwrap();
1703
1704        // Start opening the same buffer as client B
1705        let buffer_b = cx_b.background().spawn(project_b.update(cx_b, |p, cx| {
1706            p.open_buffer_for_path((worktree_id, "a.txt"), cx)
1707        }));
1708
1709        // Edit the buffer as client A while client B is still opening it.
1710        cx_b.background().simulate_random_delay().await;
1711        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1712        cx_b.background().simulate_random_delay().await;
1713        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1714
1715        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1716        let buffer_b = buffer_b.await.unwrap();
1717        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1718    }
1719
1720    #[gpui::test(iterations = 10)]
1721    async fn test_leaving_worktree_while_opening_buffer(
1722        cx_a: &mut TestAppContext,
1723        cx_b: &mut TestAppContext,
1724    ) {
1725        cx_a.foreground().forbid_parking();
1726        let lang_registry = Arc::new(LanguageRegistry::test());
1727        let fs = FakeFs::new(cx_a.background());
1728
1729        // Connect to a server as 2 clients.
1730        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1731        let client_a = server.create_client(cx_a, "user_a").await;
1732        let client_b = server.create_client(cx_b, "user_b").await;
1733
1734        // Share a project as client A
1735        fs.insert_tree(
1736            "/dir",
1737            json!({
1738                ".zed.toml": r#"collaborators = ["user_b"]"#,
1739                "a.txt": "a-contents",
1740            }),
1741        )
1742        .await;
1743        let project_a = cx_a.update(|cx| {
1744            Project::local(
1745                client_a.clone(),
1746                client_a.user_store.clone(),
1747                lang_registry.clone(),
1748                fs.clone(),
1749                cx,
1750            )
1751        });
1752        let (worktree_a, _) = project_a
1753            .update(cx_a, |p, cx| {
1754                p.find_or_create_local_worktree("/dir", true, cx)
1755            })
1756            .await
1757            .unwrap();
1758        worktree_a
1759            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1760            .await;
1761        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1762        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1763        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1764
1765        // Join that project as client B
1766        let project_b = Project::remote(
1767            project_id,
1768            client_b.clone(),
1769            client_b.user_store.clone(),
1770            lang_registry.clone(),
1771            fs.clone(),
1772            &mut cx_b.to_async(),
1773        )
1774        .await
1775        .unwrap();
1776
1777        // See that a guest has joined as client A.
1778        project_a
1779            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1780            .await;
1781
1782        // Begin opening a buffer as client B, but leave the project before the open completes.
1783        let buffer_b = cx_b.background().spawn(project_b.update(cx_b, |p, cx| {
1784            p.open_buffer_for_path((worktree_id, "a.txt"), cx)
1785        }));
1786        cx_b.update(|_| drop(project_b));
1787        drop(buffer_b);
1788
1789        // See that the guest has left.
1790        project_a
1791            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1792            .await;
1793    }
1794
1795    #[gpui::test(iterations = 10)]
1796    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1797        cx_a.foreground().forbid_parking();
1798        let lang_registry = Arc::new(LanguageRegistry::test());
1799        let fs = FakeFs::new(cx_a.background());
1800
1801        // Connect to a server as 2 clients.
1802        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1803        let client_a = server.create_client(cx_a, "user_a").await;
1804        let client_b = server.create_client(cx_b, "user_b").await;
1805
1806        // Share a project as client A
1807        fs.insert_tree(
1808            "/a",
1809            json!({
1810                ".zed.toml": r#"collaborators = ["user_b"]"#,
1811                "a.txt": "a-contents",
1812                "b.txt": "b-contents",
1813            }),
1814        )
1815        .await;
1816        let project_a = cx_a.update(|cx| {
1817            Project::local(
1818                client_a.clone(),
1819                client_a.user_store.clone(),
1820                lang_registry.clone(),
1821                fs.clone(),
1822                cx,
1823            )
1824        });
1825        let (worktree_a, _) = project_a
1826            .update(cx_a, |p, cx| {
1827                p.find_or_create_local_worktree("/a", true, cx)
1828            })
1829            .await
1830            .unwrap();
1831        worktree_a
1832            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1833            .await;
1834        let project_id = project_a
1835            .update(cx_a, |project, _| project.next_remote_id())
1836            .await;
1837        project_a
1838            .update(cx_a, |project, cx| project.share(cx))
1839            .await
1840            .unwrap();
1841
1842        // Join that project as client B
1843        let _project_b = Project::remote(
1844            project_id,
1845            client_b.clone(),
1846            client_b.user_store.clone(),
1847            lang_registry.clone(),
1848            fs.clone(),
1849            &mut cx_b.to_async(),
1850        )
1851        .await
1852        .unwrap();
1853
1854        // Client A sees that a guest has joined.
1855        project_a
1856            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1857            .await;
1858
1859        // Drop client B's connection and ensure client A observes client B leaving the project.
1860        client_b.disconnect(&cx_b.to_async()).unwrap();
1861        project_a
1862            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1863            .await;
1864
1865        // Rejoin the project as client B
1866        let _project_b = Project::remote(
1867            project_id,
1868            client_b.clone(),
1869            client_b.user_store.clone(),
1870            lang_registry.clone(),
1871            fs.clone(),
1872            &mut cx_b.to_async(),
1873        )
1874        .await
1875        .unwrap();
1876
1877        // Client A sees that a guest has re-joined.
1878        project_a
1879            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1880            .await;
1881
1882        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1883        client_b.wait_for_current_user(cx_b).await;
1884        server.disconnect_client(client_b.current_user_id(cx_b));
1885        cx_a.foreground().advance_clock(Duration::from_secs(3));
1886        project_a
1887            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1888            .await;
1889    }
1890
1891    #[gpui::test(iterations = 10)]
1892    async fn test_collaborating_with_diagnostics(
1893        cx_a: &mut TestAppContext,
1894        cx_b: &mut TestAppContext,
1895    ) {
1896        cx_a.foreground().forbid_parking();
1897        let mut lang_registry = Arc::new(LanguageRegistry::test());
1898        let fs = FakeFs::new(cx_a.background());
1899
1900        // Set up a fake language server.
1901        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1902        Arc::get_mut(&mut lang_registry)
1903            .unwrap()
1904            .add(Arc::new(Language::new(
1905                LanguageConfig {
1906                    name: "Rust".into(),
1907                    path_suffixes: vec!["rs".to_string()],
1908                    language_server: Some(language_server_config),
1909                    ..Default::default()
1910                },
1911                Some(tree_sitter_rust::language()),
1912            )));
1913
1914        // Connect to a server as 2 clients.
1915        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1916        let client_a = server.create_client(cx_a, "user_a").await;
1917        let client_b = server.create_client(cx_b, "user_b").await;
1918
1919        // Share a project as client A
1920        fs.insert_tree(
1921            "/a",
1922            json!({
1923                ".zed.toml": r#"collaborators = ["user_b"]"#,
1924                "a.rs": "let one = two",
1925                "other.rs": "",
1926            }),
1927        )
1928        .await;
1929        let project_a = cx_a.update(|cx| {
1930            Project::local(
1931                client_a.clone(),
1932                client_a.user_store.clone(),
1933                lang_registry.clone(),
1934                fs.clone(),
1935                cx,
1936            )
1937        });
1938        let (worktree_a, _) = project_a
1939            .update(cx_a, |p, cx| {
1940                p.find_or_create_local_worktree("/a", true, cx)
1941            })
1942            .await
1943            .unwrap();
1944        worktree_a
1945            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1946            .await;
1947        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1948        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1949        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1950
1951        // Cause the language server to start.
1952        let _ = cx_a
1953            .background()
1954            .spawn(project_a.update(cx_a, |project, cx| {
1955                project.open_buffer_for_path(
1956                    ProjectPath {
1957                        worktree_id,
1958                        path: Path::new("other.rs").into(),
1959                    },
1960                    cx,
1961                )
1962            }))
1963            .await
1964            .unwrap();
1965
1966        // Simulate a language server reporting errors for a file.
1967        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1968        fake_language_server
1969            .receive_notification::<lsp::notification::DidOpenTextDocument>()
1970            .await;
1971        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1972            lsp::PublishDiagnosticsParams {
1973                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1974                version: None,
1975                diagnostics: vec![lsp::Diagnostic {
1976                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1977                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1978                    message: "message 1".to_string(),
1979                    ..Default::default()
1980                }],
1981            },
1982        );
1983
1984        // Wait for server to see the diagnostics update.
1985        server
1986            .condition(|store| {
1987                let worktree = store
1988                    .project(project_id)
1989                    .unwrap()
1990                    .share
1991                    .as_ref()
1992                    .unwrap()
1993                    .worktrees
1994                    .get(&worktree_id.to_proto())
1995                    .unwrap();
1996
1997                !worktree.diagnostic_summaries.is_empty()
1998            })
1999            .await;
2000
2001        // Join the worktree as client B.
2002        let project_b = Project::remote(
2003            project_id,
2004            client_b.clone(),
2005            client_b.user_store.clone(),
2006            lang_registry.clone(),
2007            fs.clone(),
2008            &mut cx_b.to_async(),
2009        )
2010        .await
2011        .unwrap();
2012
2013        project_b.read_with(cx_b, |project, cx| {
2014            assert_eq!(
2015                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2016                &[(
2017                    ProjectPath {
2018                        worktree_id,
2019                        path: Arc::from(Path::new("a.rs")),
2020                    },
2021                    DiagnosticSummary {
2022                        error_count: 1,
2023                        warning_count: 0,
2024                        ..Default::default()
2025                    },
2026                )]
2027            )
2028        });
2029
2030        // Simulate a language server reporting more errors for a file.
2031        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2032            lsp::PublishDiagnosticsParams {
2033                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2034                version: None,
2035                diagnostics: vec![
2036                    lsp::Diagnostic {
2037                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2038                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2039                        message: "message 1".to_string(),
2040                        ..Default::default()
2041                    },
2042                    lsp::Diagnostic {
2043                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2044                        range: lsp::Range::new(
2045                            lsp::Position::new(0, 10),
2046                            lsp::Position::new(0, 13),
2047                        ),
2048                        message: "message 2".to_string(),
2049                        ..Default::default()
2050                    },
2051                ],
2052            },
2053        );
2054
2055        // Client b gets the updated summaries
2056        project_b
2057            .condition(&cx_b, |project, cx| {
2058                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2059                    == &[(
2060                        ProjectPath {
2061                            worktree_id,
2062                            path: Arc::from(Path::new("a.rs")),
2063                        },
2064                        DiagnosticSummary {
2065                            error_count: 1,
2066                            warning_count: 1,
2067                            ..Default::default()
2068                        },
2069                    )]
2070            })
2071            .await;
2072
2073        // Open the file with the errors on client B. They should be present.
2074        let buffer_b = cx_b
2075            .background()
2076            .spawn(project_b.update(cx_b, |p, cx| {
2077                p.open_buffer_for_path((worktree_id, "a.rs"), cx)
2078            }))
2079            .await
2080            .unwrap();
2081
2082        buffer_b.read_with(cx_b, |buffer, _| {
2083            assert_eq!(
2084                buffer
2085                    .snapshot()
2086                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2087                    .map(|entry| entry)
2088                    .collect::<Vec<_>>(),
2089                &[
2090                    DiagnosticEntry {
2091                        range: Point::new(0, 4)..Point::new(0, 7),
2092                        diagnostic: Diagnostic {
2093                            group_id: 0,
2094                            message: "message 1".to_string(),
2095                            severity: lsp::DiagnosticSeverity::ERROR,
2096                            is_primary: true,
2097                            ..Default::default()
2098                        }
2099                    },
2100                    DiagnosticEntry {
2101                        range: Point::new(0, 10)..Point::new(0, 13),
2102                        diagnostic: Diagnostic {
2103                            group_id: 1,
2104                            severity: lsp::DiagnosticSeverity::WARNING,
2105                            message: "message 2".to_string(),
2106                            is_primary: true,
2107                            ..Default::default()
2108                        }
2109                    }
2110                ]
2111            );
2112        });
2113    }
2114
2115    #[gpui::test(iterations = 10)]
2116    async fn test_collaborating_with_completion(
2117        cx_a: &mut TestAppContext,
2118        cx_b: &mut TestAppContext,
2119    ) {
2120        cx_a.foreground().forbid_parking();
2121        let mut lang_registry = Arc::new(LanguageRegistry::test());
2122        let fs = FakeFs::new(cx_a.background());
2123
2124        // Set up a fake language server.
2125        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2126        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2127            completion_provider: Some(lsp::CompletionOptions {
2128                trigger_characters: Some(vec![".".to_string()]),
2129                ..Default::default()
2130            }),
2131            ..Default::default()
2132        });
2133        Arc::get_mut(&mut lang_registry)
2134            .unwrap()
2135            .add(Arc::new(Language::new(
2136                LanguageConfig {
2137                    name: "Rust".into(),
2138                    path_suffixes: vec!["rs".to_string()],
2139                    language_server: Some(language_server_config),
2140                    ..Default::default()
2141                },
2142                Some(tree_sitter_rust::language()),
2143            )));
2144
2145        // Connect to a server as 2 clients.
2146        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2147        let client_a = server.create_client(cx_a, "user_a").await;
2148        let client_b = server.create_client(cx_b, "user_b").await;
2149
2150        // Share a project as client A
2151        fs.insert_tree(
2152            "/a",
2153            json!({
2154                ".zed.toml": r#"collaborators = ["user_b"]"#,
2155                "main.rs": "fn main() { a }",
2156                "other.rs": "",
2157            }),
2158        )
2159        .await;
2160        let project_a = cx_a.update(|cx| {
2161            Project::local(
2162                client_a.clone(),
2163                client_a.user_store.clone(),
2164                lang_registry.clone(),
2165                fs.clone(),
2166                cx,
2167            )
2168        });
2169        let (worktree_a, _) = project_a
2170            .update(cx_a, |p, cx| {
2171                p.find_or_create_local_worktree("/a", true, cx)
2172            })
2173            .await
2174            .unwrap();
2175        worktree_a
2176            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2177            .await;
2178        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2179        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2180        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2181
2182        // Join the worktree as client B.
2183        let project_b = Project::remote(
2184            project_id,
2185            client_b.clone(),
2186            client_b.user_store.clone(),
2187            lang_registry.clone(),
2188            fs.clone(),
2189            &mut cx_b.to_async(),
2190        )
2191        .await
2192        .unwrap();
2193
2194        // Open a file in an editor as the guest.
2195        let buffer_b = project_b
2196            .update(cx_b, |p, cx| {
2197                p.open_buffer_for_path((worktree_id, "main.rs"), cx)
2198            })
2199            .await
2200            .unwrap();
2201        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2202        let editor_b = cx_b.add_view(window_b, |cx| {
2203            Editor::for_buffer(
2204                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2205                Some(project_b.clone()),
2206                cx,
2207            )
2208        });
2209
2210        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2211        buffer_b
2212            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2213            .await;
2214
2215        // Type a completion trigger character as the guest.
2216        editor_b.update(cx_b, |editor, cx| {
2217            editor.select_ranges([13..13], None, cx);
2218            editor.handle_input(&Input(".".into()), cx);
2219            cx.focus(&editor_b);
2220        });
2221
2222        // Receive a completion request as the host's language server.
2223        // Return some completions from the host's language server.
2224        cx_a.foreground().start_waiting();
2225        fake_language_server
2226            .handle_request::<lsp::request::Completion, _>(|params, _| {
2227                assert_eq!(
2228                    params.text_document_position.text_document.uri,
2229                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2230                );
2231                assert_eq!(
2232                    params.text_document_position.position,
2233                    lsp::Position::new(0, 14),
2234                );
2235
2236                Some(lsp::CompletionResponse::Array(vec![
2237                    lsp::CompletionItem {
2238                        label: "first_method(…)".into(),
2239                        detail: Some("fn(&mut self, B) -> C".into()),
2240                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2241                            new_text: "first_method($1)".to_string(),
2242                            range: lsp::Range::new(
2243                                lsp::Position::new(0, 14),
2244                                lsp::Position::new(0, 14),
2245                            ),
2246                        })),
2247                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2248                        ..Default::default()
2249                    },
2250                    lsp::CompletionItem {
2251                        label: "second_method(…)".into(),
2252                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2253                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2254                            new_text: "second_method()".to_string(),
2255                            range: lsp::Range::new(
2256                                lsp::Position::new(0, 14),
2257                                lsp::Position::new(0, 14),
2258                            ),
2259                        })),
2260                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2261                        ..Default::default()
2262                    },
2263                ]))
2264            })
2265            .next()
2266            .await
2267            .unwrap();
2268        cx_a.foreground().finish_waiting();
2269
2270        // Open the buffer on the host.
2271        let buffer_a = project_a
2272            .update(cx_a, |p, cx| {
2273                p.open_buffer_for_path((worktree_id, "main.rs"), cx)
2274            })
2275            .await
2276            .unwrap();
2277        buffer_a
2278            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2279            .await;
2280
2281        // Confirm a completion on the guest.
2282        editor_b
2283            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2284            .await;
2285        editor_b.update(cx_b, |editor, cx| {
2286            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2287            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2288        });
2289
2290        // Return a resolved completion from the host's language server.
2291        // The resolved completion has an additional text edit.
2292        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2293            |params, _| {
2294                assert_eq!(params.label, "first_method(…)");
2295                lsp::CompletionItem {
2296                    label: "first_method(…)".into(),
2297                    detail: Some("fn(&mut self, B) -> C".into()),
2298                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2299                        new_text: "first_method($1)".to_string(),
2300                        range: lsp::Range::new(
2301                            lsp::Position::new(0, 14),
2302                            lsp::Position::new(0, 14),
2303                        ),
2304                    })),
2305                    additional_text_edits: Some(vec![lsp::TextEdit {
2306                        new_text: "use d::SomeTrait;\n".to_string(),
2307                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2308                    }]),
2309                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2310                    ..Default::default()
2311                }
2312            },
2313        );
2314
2315        // The additional edit is applied.
2316        buffer_a
2317            .condition(&cx_a, |buffer, _| {
2318                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2319            })
2320            .await;
2321        buffer_b
2322            .condition(&cx_b, |buffer, _| {
2323                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2324            })
2325            .await;
2326    }
2327
2328    #[gpui::test(iterations = 10)]
2329    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2330        cx_a.foreground().forbid_parking();
2331        let mut lang_registry = Arc::new(LanguageRegistry::test());
2332        let fs = FakeFs::new(cx_a.background());
2333
2334        // Set up a fake language server.
2335        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2336        Arc::get_mut(&mut lang_registry)
2337            .unwrap()
2338            .add(Arc::new(Language::new(
2339                LanguageConfig {
2340                    name: "Rust".into(),
2341                    path_suffixes: vec!["rs".to_string()],
2342                    language_server: Some(language_server_config),
2343                    ..Default::default()
2344                },
2345                Some(tree_sitter_rust::language()),
2346            )));
2347
2348        // Connect to a server as 2 clients.
2349        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2350        let client_a = server.create_client(cx_a, "user_a").await;
2351        let client_b = server.create_client(cx_b, "user_b").await;
2352
2353        // Share a project as client A
2354        fs.insert_tree(
2355            "/a",
2356            json!({
2357                ".zed.toml": r#"collaborators = ["user_b"]"#,
2358                "a.rs": "let one = two",
2359            }),
2360        )
2361        .await;
2362        let project_a = cx_a.update(|cx| {
2363            Project::local(
2364                client_a.clone(),
2365                client_a.user_store.clone(),
2366                lang_registry.clone(),
2367                fs.clone(),
2368                cx,
2369            )
2370        });
2371        let (worktree_a, _) = project_a
2372            .update(cx_a, |p, cx| {
2373                p.find_or_create_local_worktree("/a", true, cx)
2374            })
2375            .await
2376            .unwrap();
2377        worktree_a
2378            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2379            .await;
2380        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2381        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2382        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2383
2384        // Join the worktree as client B.
2385        let project_b = Project::remote(
2386            project_id,
2387            client_b.clone(),
2388            client_b.user_store.clone(),
2389            lang_registry.clone(),
2390            fs.clone(),
2391            &mut cx_b.to_async(),
2392        )
2393        .await
2394        .unwrap();
2395
2396        let buffer_b = cx_b
2397            .background()
2398            .spawn(project_b.update(cx_b, |p, cx| {
2399                p.open_buffer_for_path((worktree_id, "a.rs"), cx)
2400            }))
2401            .await
2402            .unwrap();
2403
2404        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2405        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2406            Some(vec![
2407                lsp::TextEdit {
2408                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2409                    new_text: "h".to_string(),
2410                },
2411                lsp::TextEdit {
2412                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2413                    new_text: "y".to_string(),
2414                },
2415            ])
2416        });
2417
2418        project_b
2419            .update(cx_b, |project, cx| {
2420                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2421            })
2422            .await
2423            .unwrap();
2424        assert_eq!(
2425            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2426            "let honey = two"
2427        );
2428    }
2429
2430    #[gpui::test(iterations = 10)]
2431    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2432        cx_a.foreground().forbid_parking();
2433        let mut lang_registry = Arc::new(LanguageRegistry::test());
2434        let fs = FakeFs::new(cx_a.background());
2435        fs.insert_tree(
2436            "/root-1",
2437            json!({
2438                ".zed.toml": r#"collaborators = ["user_b"]"#,
2439                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2440            }),
2441        )
2442        .await;
2443        fs.insert_tree(
2444            "/root-2",
2445            json!({
2446                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2447            }),
2448        )
2449        .await;
2450
2451        // Set up a fake language server.
2452        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2453        Arc::get_mut(&mut lang_registry)
2454            .unwrap()
2455            .add(Arc::new(Language::new(
2456                LanguageConfig {
2457                    name: "Rust".into(),
2458                    path_suffixes: vec!["rs".to_string()],
2459                    language_server: Some(language_server_config),
2460                    ..Default::default()
2461                },
2462                Some(tree_sitter_rust::language()),
2463            )));
2464
2465        // Connect to a server as 2 clients.
2466        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2467        let client_a = server.create_client(cx_a, "user_a").await;
2468        let client_b = server.create_client(cx_b, "user_b").await;
2469
2470        // Share a project as client A
2471        let project_a = cx_a.update(|cx| {
2472            Project::local(
2473                client_a.clone(),
2474                client_a.user_store.clone(),
2475                lang_registry.clone(),
2476                fs.clone(),
2477                cx,
2478            )
2479        });
2480        let (worktree_a, _) = project_a
2481            .update(cx_a, |p, cx| {
2482                p.find_or_create_local_worktree("/root-1", true, cx)
2483            })
2484            .await
2485            .unwrap();
2486        worktree_a
2487            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2488            .await;
2489        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2490        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2491        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2492
2493        // Join the worktree as client B.
2494        let project_b = Project::remote(
2495            project_id,
2496            client_b.clone(),
2497            client_b.user_store.clone(),
2498            lang_registry.clone(),
2499            fs.clone(),
2500            &mut cx_b.to_async(),
2501        )
2502        .await
2503        .unwrap();
2504
2505        // Open the file on client B.
2506        let buffer_b = cx_b
2507            .background()
2508            .spawn(project_b.update(cx_b, |p, cx| {
2509                p.open_buffer_for_path((worktree_id, "a.rs"), cx)
2510            }))
2511            .await
2512            .unwrap();
2513
2514        // Request the definition of a symbol as the guest.
2515        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2516        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2517            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2518                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2519                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2520            )))
2521        });
2522
2523        let definitions_1 = project_b
2524            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2525            .await
2526            .unwrap();
2527        cx_b.read(|cx| {
2528            assert_eq!(definitions_1.len(), 1);
2529            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2530            let target_buffer = definitions_1[0].buffer.read(cx);
2531            assert_eq!(
2532                target_buffer.text(),
2533                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2534            );
2535            assert_eq!(
2536                definitions_1[0].range.to_point(target_buffer),
2537                Point::new(0, 6)..Point::new(0, 9)
2538            );
2539        });
2540
2541        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2542        // the previous call to `definition`.
2543        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2544            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2545                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2546                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2547            )))
2548        });
2549
2550        let definitions_2 = project_b
2551            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2552            .await
2553            .unwrap();
2554        cx_b.read(|cx| {
2555            assert_eq!(definitions_2.len(), 1);
2556            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2557            let target_buffer = definitions_2[0].buffer.read(cx);
2558            assert_eq!(
2559                target_buffer.text(),
2560                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2561            );
2562            assert_eq!(
2563                definitions_2[0].range.to_point(target_buffer),
2564                Point::new(1, 6)..Point::new(1, 11)
2565            );
2566        });
2567        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2568    }
2569
2570    #[gpui::test(iterations = 10)]
2571    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2572        cx_a.foreground().forbid_parking();
2573        let mut lang_registry = Arc::new(LanguageRegistry::test());
2574        let fs = FakeFs::new(cx_a.background());
2575        fs.insert_tree(
2576            "/root-1",
2577            json!({
2578                ".zed.toml": r#"collaborators = ["user_b"]"#,
2579                "one.rs": "const ONE: usize = 1;",
2580                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2581            }),
2582        )
2583        .await;
2584        fs.insert_tree(
2585            "/root-2",
2586            json!({
2587                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2588            }),
2589        )
2590        .await;
2591
2592        // Set up a fake language server.
2593        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2594        Arc::get_mut(&mut lang_registry)
2595            .unwrap()
2596            .add(Arc::new(Language::new(
2597                LanguageConfig {
2598                    name: "Rust".into(),
2599                    path_suffixes: vec!["rs".to_string()],
2600                    language_server: Some(language_server_config),
2601                    ..Default::default()
2602                },
2603                Some(tree_sitter_rust::language()),
2604            )));
2605
2606        // Connect to a server as 2 clients.
2607        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2608        let client_a = server.create_client(cx_a, "user_a").await;
2609        let client_b = server.create_client(cx_b, "user_b").await;
2610
2611        // Share a project as client A
2612        let project_a = cx_a.update(|cx| {
2613            Project::local(
2614                client_a.clone(),
2615                client_a.user_store.clone(),
2616                lang_registry.clone(),
2617                fs.clone(),
2618                cx,
2619            )
2620        });
2621        let (worktree_a, _) = project_a
2622            .update(cx_a, |p, cx| {
2623                p.find_or_create_local_worktree("/root-1", true, cx)
2624            })
2625            .await
2626            .unwrap();
2627        worktree_a
2628            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2629            .await;
2630        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2631        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2632        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2633
2634        // Join the worktree as client B.
2635        let project_b = Project::remote(
2636            project_id,
2637            client_b.clone(),
2638            client_b.user_store.clone(),
2639            lang_registry.clone(),
2640            fs.clone(),
2641            &mut cx_b.to_async(),
2642        )
2643        .await
2644        .unwrap();
2645
2646        // Open the file on client B.
2647        let buffer_b = cx_b
2648            .background()
2649            .spawn(project_b.update(cx_b, |p, cx| {
2650                p.open_buffer_for_path((worktree_id, "one.rs"), cx)
2651            }))
2652            .await
2653            .unwrap();
2654
2655        // Request references to a symbol as the guest.
2656        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2657        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2658            assert_eq!(
2659                params.text_document_position.text_document.uri.as_str(),
2660                "file:///root-1/one.rs"
2661            );
2662            Some(vec![
2663                lsp::Location {
2664                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2665                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2666                },
2667                lsp::Location {
2668                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2669                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2670                },
2671                lsp::Location {
2672                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2673                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2674                },
2675            ])
2676        });
2677
2678        let references = project_b
2679            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2680            .await
2681            .unwrap();
2682        cx_b.read(|cx| {
2683            assert_eq!(references.len(), 3);
2684            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2685
2686            let two_buffer = references[0].buffer.read(cx);
2687            let three_buffer = references[2].buffer.read(cx);
2688            assert_eq!(
2689                two_buffer.file().unwrap().path().as_ref(),
2690                Path::new("two.rs")
2691            );
2692            assert_eq!(references[1].buffer, references[0].buffer);
2693            assert_eq!(
2694                three_buffer.file().unwrap().full_path(cx),
2695                Path::new("three.rs")
2696            );
2697
2698            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2699            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2700            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2701        });
2702    }
2703
2704    #[gpui::test(iterations = 10)]
2705    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2706        cx_a.foreground().forbid_parking();
2707        let lang_registry = Arc::new(LanguageRegistry::test());
2708        let fs = FakeFs::new(cx_a.background());
2709        fs.insert_tree(
2710            "/root-1",
2711            json!({
2712                ".zed.toml": r#"collaborators = ["user_b"]"#,
2713                "a": "hello world",
2714                "b": "goodnight moon",
2715                "c": "a world of goo",
2716                "d": "world champion of clown world",
2717            }),
2718        )
2719        .await;
2720        fs.insert_tree(
2721            "/root-2",
2722            json!({
2723                "e": "disney world is fun",
2724            }),
2725        )
2726        .await;
2727
2728        // Connect to a server as 2 clients.
2729        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2730        let client_a = server.create_client(cx_a, "user_a").await;
2731        let client_b = server.create_client(cx_b, "user_b").await;
2732
2733        // Share a project as client A
2734        let project_a = cx_a.update(|cx| {
2735            Project::local(
2736                client_a.clone(),
2737                client_a.user_store.clone(),
2738                lang_registry.clone(),
2739                fs.clone(),
2740                cx,
2741            )
2742        });
2743        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2744
2745        let (worktree_1, _) = project_a
2746            .update(cx_a, |p, cx| {
2747                p.find_or_create_local_worktree("/root-1", true, cx)
2748            })
2749            .await
2750            .unwrap();
2751        worktree_1
2752            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2753            .await;
2754        let (worktree_2, _) = project_a
2755            .update(cx_a, |p, cx| {
2756                p.find_or_create_local_worktree("/root-2", true, cx)
2757            })
2758            .await
2759            .unwrap();
2760        worktree_2
2761            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2762            .await;
2763
2764        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2765
2766        // Join the worktree as client B.
2767        let project_b = Project::remote(
2768            project_id,
2769            client_b.clone(),
2770            client_b.user_store.clone(),
2771            lang_registry.clone(),
2772            fs.clone(),
2773            &mut cx_b.to_async(),
2774        )
2775        .await
2776        .unwrap();
2777
2778        let results = project_b
2779            .update(cx_b, |project, cx| {
2780                project.search(SearchQuery::text("world", false, false), cx)
2781            })
2782            .await
2783            .unwrap();
2784
2785        let mut ranges_by_path = results
2786            .into_iter()
2787            .map(|(buffer, ranges)| {
2788                buffer.read_with(cx_b, |buffer, cx| {
2789                    let path = buffer.file().unwrap().full_path(cx);
2790                    let offset_ranges = ranges
2791                        .into_iter()
2792                        .map(|range| range.to_offset(buffer))
2793                        .collect::<Vec<_>>();
2794                    (path, offset_ranges)
2795                })
2796            })
2797            .collect::<Vec<_>>();
2798        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2799
2800        assert_eq!(
2801            ranges_by_path,
2802            &[
2803                (PathBuf::from("root-1/a"), vec![6..11]),
2804                (PathBuf::from("root-1/c"), vec![2..7]),
2805                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2806                (PathBuf::from("root-2/e"), vec![7..12]),
2807            ]
2808        );
2809    }
2810
2811    #[gpui::test(iterations = 10)]
2812    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2813        cx_a.foreground().forbid_parking();
2814        let lang_registry = Arc::new(LanguageRegistry::test());
2815        let fs = FakeFs::new(cx_a.background());
2816        fs.insert_tree(
2817            "/root-1",
2818            json!({
2819                ".zed.toml": r#"collaborators = ["user_b"]"#,
2820                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2821            }),
2822        )
2823        .await;
2824
2825        // Set up a fake language server.
2826        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2827        lang_registry.add(Arc::new(Language::new(
2828            LanguageConfig {
2829                name: "Rust".into(),
2830                path_suffixes: vec!["rs".to_string()],
2831                language_server: Some(language_server_config),
2832                ..Default::default()
2833            },
2834            Some(tree_sitter_rust::language()),
2835        )));
2836
2837        // Connect to a server as 2 clients.
2838        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2839        let client_a = server.create_client(cx_a, "user_a").await;
2840        let client_b = server.create_client(cx_b, "user_b").await;
2841
2842        // Share a project as client A
2843        let project_a = cx_a.update(|cx| {
2844            Project::local(
2845                client_a.clone(),
2846                client_a.user_store.clone(),
2847                lang_registry.clone(),
2848                fs.clone(),
2849                cx,
2850            )
2851        });
2852        let (worktree_a, _) = project_a
2853            .update(cx_a, |p, cx| {
2854                p.find_or_create_local_worktree("/root-1", true, cx)
2855            })
2856            .await
2857            .unwrap();
2858        worktree_a
2859            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2860            .await;
2861        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2862        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2863        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2864
2865        // Join the worktree as client B.
2866        let project_b = Project::remote(
2867            project_id,
2868            client_b.clone(),
2869            client_b.user_store.clone(),
2870            lang_registry.clone(),
2871            fs.clone(),
2872            &mut cx_b.to_async(),
2873        )
2874        .await
2875        .unwrap();
2876
2877        // Open the file on client B.
2878        let buffer_b = cx_b
2879            .background()
2880            .spawn(project_b.update(cx_b, |p, cx| {
2881                p.open_buffer_for_path((worktree_id, "main.rs"), cx)
2882            }))
2883            .await
2884            .unwrap();
2885
2886        // Request document highlights as the guest.
2887        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2888        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2889            |params, _| {
2890                assert_eq!(
2891                    params
2892                        .text_document_position_params
2893                        .text_document
2894                        .uri
2895                        .as_str(),
2896                    "file:///root-1/main.rs"
2897                );
2898                assert_eq!(
2899                    params.text_document_position_params.position,
2900                    lsp::Position::new(0, 34)
2901                );
2902                Some(vec![
2903                    lsp::DocumentHighlight {
2904                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2905                        range: lsp::Range::new(
2906                            lsp::Position::new(0, 10),
2907                            lsp::Position::new(0, 16),
2908                        ),
2909                    },
2910                    lsp::DocumentHighlight {
2911                        kind: Some(lsp::DocumentHighlightKind::READ),
2912                        range: lsp::Range::new(
2913                            lsp::Position::new(0, 32),
2914                            lsp::Position::new(0, 38),
2915                        ),
2916                    },
2917                    lsp::DocumentHighlight {
2918                        kind: Some(lsp::DocumentHighlightKind::READ),
2919                        range: lsp::Range::new(
2920                            lsp::Position::new(0, 41),
2921                            lsp::Position::new(0, 47),
2922                        ),
2923                    },
2924                ])
2925            },
2926        );
2927
2928        let highlights = project_b
2929            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2930            .await
2931            .unwrap();
2932        buffer_b.read_with(cx_b, |buffer, _| {
2933            let snapshot = buffer.snapshot();
2934
2935            let highlights = highlights
2936                .into_iter()
2937                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2938                .collect::<Vec<_>>();
2939            assert_eq!(
2940                highlights,
2941                &[
2942                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2943                    (lsp::DocumentHighlightKind::READ, 32..38),
2944                    (lsp::DocumentHighlightKind::READ, 41..47)
2945                ]
2946            )
2947        });
2948    }
2949
2950    #[gpui::test(iterations = 10)]
2951    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2952        cx_a.foreground().forbid_parking();
2953        let mut lang_registry = Arc::new(LanguageRegistry::test());
2954        let fs = FakeFs::new(cx_a.background());
2955        fs.insert_tree(
2956            "/code",
2957            json!({
2958                "crate-1": {
2959                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2960                    "one.rs": "const ONE: usize = 1;",
2961                },
2962                "crate-2": {
2963                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2964                },
2965                "private": {
2966                    "passwords.txt": "the-password",
2967                }
2968            }),
2969        )
2970        .await;
2971
2972        // Set up a fake language server.
2973        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2974        Arc::get_mut(&mut lang_registry)
2975            .unwrap()
2976            .add(Arc::new(Language::new(
2977                LanguageConfig {
2978                    name: "Rust".into(),
2979                    path_suffixes: vec!["rs".to_string()],
2980                    language_server: Some(language_server_config),
2981                    ..Default::default()
2982                },
2983                Some(tree_sitter_rust::language()),
2984            )));
2985
2986        // Connect to a server as 2 clients.
2987        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2988        let client_a = server.create_client(cx_a, "user_a").await;
2989        let client_b = server.create_client(cx_b, "user_b").await;
2990
2991        // Share a project as client A
2992        let project_a = cx_a.update(|cx| {
2993            Project::local(
2994                client_a.clone(),
2995                client_a.user_store.clone(),
2996                lang_registry.clone(),
2997                fs.clone(),
2998                cx,
2999            )
3000        });
3001        let (worktree_a, _) = project_a
3002            .update(cx_a, |p, cx| {
3003                p.find_or_create_local_worktree("/code/crate-1", true, cx)
3004            })
3005            .await
3006            .unwrap();
3007        worktree_a
3008            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3009            .await;
3010        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3011        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3012        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3013
3014        // Join the worktree as client B.
3015        let project_b = Project::remote(
3016            project_id,
3017            client_b.clone(),
3018            client_b.user_store.clone(),
3019            lang_registry.clone(),
3020            fs.clone(),
3021            &mut cx_b.to_async(),
3022        )
3023        .await
3024        .unwrap();
3025
3026        // Cause the language server to start.
3027        let _buffer = cx_b
3028            .background()
3029            .spawn(project_b.update(cx_b, |p, cx| {
3030                p.open_buffer_for_path((worktree_id, "one.rs"), cx)
3031            }))
3032            .await
3033            .unwrap();
3034
3035        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3036        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3037            #[allow(deprecated)]
3038            Some(vec![lsp::SymbolInformation {
3039                name: "TWO".into(),
3040                location: lsp::Location {
3041                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3042                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3043                },
3044                kind: lsp::SymbolKind::CONSTANT,
3045                tags: None,
3046                container_name: None,
3047                deprecated: None,
3048            }])
3049        });
3050
3051        // Request the definition of a symbol as the guest.
3052        let symbols = project_b
3053            .update(cx_b, |p, cx| p.symbols("two", cx))
3054            .await
3055            .unwrap();
3056        assert_eq!(symbols.len(), 1);
3057        assert_eq!(symbols[0].name, "TWO");
3058
3059        // Open one of the returned symbols.
3060        let buffer_b_2 = project_b
3061            .update(cx_b, |project, cx| {
3062                project.open_buffer_for_symbol(&symbols[0], cx)
3063            })
3064            .await
3065            .unwrap();
3066        buffer_b_2.read_with(cx_b, |buffer, _| {
3067            assert_eq!(
3068                buffer.file().unwrap().path().as_ref(),
3069                Path::new("../crate-2/two.rs")
3070            );
3071        });
3072
3073        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3074        let mut fake_symbol = symbols[0].clone();
3075        fake_symbol.path = Path::new("/code/secrets").into();
3076        let error = project_b
3077            .update(cx_b, |project, cx| {
3078                project.open_buffer_for_symbol(&fake_symbol, cx)
3079            })
3080            .await
3081            .unwrap_err();
3082        assert!(error.to_string().contains("invalid symbol signature"));
3083    }
3084
3085    #[gpui::test(iterations = 10)]
3086    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3087        cx_a: &mut TestAppContext,
3088        cx_b: &mut TestAppContext,
3089        mut rng: StdRng,
3090    ) {
3091        cx_a.foreground().forbid_parking();
3092        let mut lang_registry = Arc::new(LanguageRegistry::test());
3093        let fs = FakeFs::new(cx_a.background());
3094        fs.insert_tree(
3095            "/root",
3096            json!({
3097                ".zed.toml": r#"collaborators = ["user_b"]"#,
3098                "a.rs": "const ONE: usize = b::TWO;",
3099                "b.rs": "const TWO: usize = 2",
3100            }),
3101        )
3102        .await;
3103
3104        // Set up a fake language server.
3105        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3106
3107        Arc::get_mut(&mut lang_registry)
3108            .unwrap()
3109            .add(Arc::new(Language::new(
3110                LanguageConfig {
3111                    name: "Rust".into(),
3112                    path_suffixes: vec!["rs".to_string()],
3113                    language_server: Some(language_server_config),
3114                    ..Default::default()
3115                },
3116                Some(tree_sitter_rust::language()),
3117            )));
3118
3119        // Connect to a server as 2 clients.
3120        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3121        let client_a = server.create_client(cx_a, "user_a").await;
3122        let client_b = server.create_client(cx_b, "user_b").await;
3123
3124        // Share a project as client A
3125        let project_a = cx_a.update(|cx| {
3126            Project::local(
3127                client_a.clone(),
3128                client_a.user_store.clone(),
3129                lang_registry.clone(),
3130                fs.clone(),
3131                cx,
3132            )
3133        });
3134
3135        let (worktree_a, _) = project_a
3136            .update(cx_a, |p, cx| {
3137                p.find_or_create_local_worktree("/root", true, cx)
3138            })
3139            .await
3140            .unwrap();
3141        worktree_a
3142            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3143            .await;
3144        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3145        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3146        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3147
3148        // Join the worktree as client B.
3149        let project_b = Project::remote(
3150            project_id,
3151            client_b.clone(),
3152            client_b.user_store.clone(),
3153            lang_registry.clone(),
3154            fs.clone(),
3155            &mut cx_b.to_async(),
3156        )
3157        .await
3158        .unwrap();
3159
3160        let buffer_b1 = cx_b
3161            .background()
3162            .spawn(project_b.update(cx_b, |p, cx| {
3163                p.open_buffer_for_path((worktree_id, "a.rs"), cx)
3164            }))
3165            .await
3166            .unwrap();
3167
3168        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3169        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3170            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3171                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3172                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3173            )))
3174        });
3175
3176        let definitions;
3177        let buffer_b2;
3178        if rng.gen() {
3179            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3180            buffer_b2 = project_b.update(cx_b, |p, cx| {
3181                p.open_buffer_for_path((worktree_id, "b.rs"), cx)
3182            });
3183        } else {
3184            buffer_b2 = project_b.update(cx_b, |p, cx| {
3185                p.open_buffer_for_path((worktree_id, "b.rs"), cx)
3186            });
3187            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3188        }
3189
3190        let buffer_b2 = buffer_b2.await.unwrap();
3191        let definitions = definitions.await.unwrap();
3192        assert_eq!(definitions.len(), 1);
3193        assert_eq!(definitions[0].buffer, buffer_b2);
3194    }
3195
3196    #[gpui::test(iterations = 10)]
3197    async fn test_collaborating_with_code_actions(
3198        cx_a: &mut TestAppContext,
3199        cx_b: &mut TestAppContext,
3200    ) {
3201        cx_a.foreground().forbid_parking();
3202        let mut lang_registry = Arc::new(LanguageRegistry::test());
3203        let fs = FakeFs::new(cx_a.background());
3204        cx_b.update(|cx| editor::init(cx));
3205
3206        // Set up a fake language server.
3207        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3208        Arc::get_mut(&mut lang_registry)
3209            .unwrap()
3210            .add(Arc::new(Language::new(
3211                LanguageConfig {
3212                    name: "Rust".into(),
3213                    path_suffixes: vec!["rs".to_string()],
3214                    language_server: Some(language_server_config),
3215                    ..Default::default()
3216                },
3217                Some(tree_sitter_rust::language()),
3218            )));
3219
3220        // Connect to a server as 2 clients.
3221        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3222        let client_a = server.create_client(cx_a, "user_a").await;
3223        let client_b = server.create_client(cx_b, "user_b").await;
3224
3225        // Share a project as client A
3226        fs.insert_tree(
3227            "/a",
3228            json!({
3229                ".zed.toml": r#"collaborators = ["user_b"]"#,
3230                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3231                "other.rs": "pub fn foo() -> usize { 4 }",
3232            }),
3233        )
3234        .await;
3235        let project_a = cx_a.update(|cx| {
3236            Project::local(
3237                client_a.clone(),
3238                client_a.user_store.clone(),
3239                lang_registry.clone(),
3240                fs.clone(),
3241                cx,
3242            )
3243        });
3244        let (worktree_a, _) = project_a
3245            .update(cx_a, |p, cx| {
3246                p.find_or_create_local_worktree("/a", true, cx)
3247            })
3248            .await
3249            .unwrap();
3250        worktree_a
3251            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3252            .await;
3253        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3254        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3255        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3256
3257        // Join the worktree as client B.
3258        let project_b = Project::remote(
3259            project_id,
3260            client_b.clone(),
3261            client_b.user_store.clone(),
3262            lang_registry.clone(),
3263            fs.clone(),
3264            &mut cx_b.to_async(),
3265        )
3266        .await
3267        .unwrap();
3268        let mut params = cx_b.update(WorkspaceParams::test);
3269        params.languages = lang_registry.clone();
3270        params.client = client_b.client.clone();
3271        params.user_store = client_b.user_store.clone();
3272        params.project = project_b;
3273
3274        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3275        let editor_b = workspace_b
3276            .update(cx_b, |workspace, cx| {
3277                workspace.open_path((worktree_id, "main.rs").into(), cx)
3278            })
3279            .await
3280            .unwrap()
3281            .downcast::<Editor>()
3282            .unwrap();
3283
3284        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3285        fake_language_server
3286            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3287                assert_eq!(
3288                    params.text_document.uri,
3289                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3290                );
3291                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3292                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3293                None
3294            })
3295            .next()
3296            .await;
3297
3298        // Move cursor to a location that contains code actions.
3299        editor_b.update(cx_b, |editor, cx| {
3300            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3301            cx.focus(&editor_b);
3302        });
3303
3304        fake_language_server
3305            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3306                assert_eq!(
3307                    params.text_document.uri,
3308                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3309                );
3310                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3311                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3312
3313                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3314                    lsp::CodeAction {
3315                        title: "Inline into all callers".to_string(),
3316                        edit: Some(lsp::WorkspaceEdit {
3317                            changes: Some(
3318                                [
3319                                    (
3320                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3321                                        vec![lsp::TextEdit::new(
3322                                            lsp::Range::new(
3323                                                lsp::Position::new(1, 22),
3324                                                lsp::Position::new(1, 34),
3325                                            ),
3326                                            "4".to_string(),
3327                                        )],
3328                                    ),
3329                                    (
3330                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3331                                        vec![lsp::TextEdit::new(
3332                                            lsp::Range::new(
3333                                                lsp::Position::new(0, 0),
3334                                                lsp::Position::new(0, 27),
3335                                            ),
3336                                            "".to_string(),
3337                                        )],
3338                                    ),
3339                                ]
3340                                .into_iter()
3341                                .collect(),
3342                            ),
3343                            ..Default::default()
3344                        }),
3345                        data: Some(json!({
3346                            "codeActionParams": {
3347                                "range": {
3348                                    "start": {"line": 1, "column": 31},
3349                                    "end": {"line": 1, "column": 31},
3350                                }
3351                            }
3352                        })),
3353                        ..Default::default()
3354                    },
3355                )])
3356            })
3357            .next()
3358            .await;
3359
3360        // Toggle code actions and wait for them to display.
3361        editor_b.update(cx_b, |editor, cx| {
3362            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3363        });
3364        editor_b
3365            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3366            .await;
3367
3368        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3369
3370        // Confirming the code action will trigger a resolve request.
3371        let confirm_action = workspace_b
3372            .update(cx_b, |workspace, cx| {
3373                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3374            })
3375            .unwrap();
3376        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3377            lsp::CodeAction {
3378                title: "Inline into all callers".to_string(),
3379                edit: Some(lsp::WorkspaceEdit {
3380                    changes: Some(
3381                        [
3382                            (
3383                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3384                                vec![lsp::TextEdit::new(
3385                                    lsp::Range::new(
3386                                        lsp::Position::new(1, 22),
3387                                        lsp::Position::new(1, 34),
3388                                    ),
3389                                    "4".to_string(),
3390                                )],
3391                            ),
3392                            (
3393                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3394                                vec![lsp::TextEdit::new(
3395                                    lsp::Range::new(
3396                                        lsp::Position::new(0, 0),
3397                                        lsp::Position::new(0, 27),
3398                                    ),
3399                                    "".to_string(),
3400                                )],
3401                            ),
3402                        ]
3403                        .into_iter()
3404                        .collect(),
3405                    ),
3406                    ..Default::default()
3407                }),
3408                ..Default::default()
3409            }
3410        });
3411
3412        // After the action is confirmed, an editor containing both modified files is opened.
3413        confirm_action.await.unwrap();
3414        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3415            workspace
3416                .active_item(cx)
3417                .unwrap()
3418                .downcast::<Editor>()
3419                .unwrap()
3420        });
3421        code_action_editor.update(cx_b, |editor, cx| {
3422            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3423            editor.undo(&Undo, cx);
3424            assert_eq!(
3425                editor.text(cx),
3426                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3427            );
3428            editor.redo(&Redo, cx);
3429            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3430        });
3431    }
3432
3433    #[gpui::test(iterations = 10)]
3434    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3435        cx_a.foreground().forbid_parking();
3436        let mut lang_registry = Arc::new(LanguageRegistry::test());
3437        let fs = FakeFs::new(cx_a.background());
3438        cx_b.update(|cx| editor::init(cx));
3439
3440        // Set up a fake language server.
3441        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3442        Arc::get_mut(&mut lang_registry)
3443            .unwrap()
3444            .add(Arc::new(Language::new(
3445                LanguageConfig {
3446                    name: "Rust".into(),
3447                    path_suffixes: vec!["rs".to_string()],
3448                    language_server: Some(language_server_config),
3449                    ..Default::default()
3450                },
3451                Some(tree_sitter_rust::language()),
3452            )));
3453
3454        // Connect to a server as 2 clients.
3455        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3456        let client_a = server.create_client(cx_a, "user_a").await;
3457        let client_b = server.create_client(cx_b, "user_b").await;
3458
3459        // Share a project as client A
3460        fs.insert_tree(
3461            "/dir",
3462            json!({
3463                ".zed.toml": r#"collaborators = ["user_b"]"#,
3464                "one.rs": "const ONE: usize = 1;",
3465                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3466            }),
3467        )
3468        .await;
3469        let project_a = cx_a.update(|cx| {
3470            Project::local(
3471                client_a.clone(),
3472                client_a.user_store.clone(),
3473                lang_registry.clone(),
3474                fs.clone(),
3475                cx,
3476            )
3477        });
3478        let (worktree_a, _) = project_a
3479            .update(cx_a, |p, cx| {
3480                p.find_or_create_local_worktree("/dir", true, cx)
3481            })
3482            .await
3483            .unwrap();
3484        worktree_a
3485            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3486            .await;
3487        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3488        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3489        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3490
3491        // Join the worktree as client B.
3492        let project_b = Project::remote(
3493            project_id,
3494            client_b.clone(),
3495            client_b.user_store.clone(),
3496            lang_registry.clone(),
3497            fs.clone(),
3498            &mut cx_b.to_async(),
3499        )
3500        .await
3501        .unwrap();
3502        let mut params = cx_b.update(WorkspaceParams::test);
3503        params.languages = lang_registry.clone();
3504        params.client = client_b.client.clone();
3505        params.user_store = client_b.user_store.clone();
3506        params.project = project_b;
3507
3508        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3509        let editor_b = workspace_b
3510            .update(cx_b, |workspace, cx| {
3511                workspace.open_path((worktree_id, "one.rs").into(), cx)
3512            })
3513            .await
3514            .unwrap()
3515            .downcast::<Editor>()
3516            .unwrap();
3517        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3518
3519        // Move cursor to a location that can be renamed.
3520        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3521            editor.select_ranges([7..7], None, cx);
3522            editor.rename(&Rename, cx).unwrap()
3523        });
3524
3525        fake_language_server
3526            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3527                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3528                assert_eq!(params.position, lsp::Position::new(0, 7));
3529                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3530                    lsp::Position::new(0, 6),
3531                    lsp::Position::new(0, 9),
3532                )))
3533            })
3534            .next()
3535            .await
3536            .unwrap();
3537        prepare_rename.await.unwrap();
3538        editor_b.update(cx_b, |editor, cx| {
3539            let rename = editor.pending_rename().unwrap();
3540            let buffer = editor.buffer().read(cx).snapshot(cx);
3541            assert_eq!(
3542                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3543                6..9
3544            );
3545            rename.editor.update(cx, |rename_editor, cx| {
3546                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3547                    rename_buffer.edit([0..3], "THREE", cx);
3548                });
3549            });
3550        });
3551
3552        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3553            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3554        });
3555        fake_language_server
3556            .handle_request::<lsp::request::Rename, _>(|params, _| {
3557                assert_eq!(
3558                    params.text_document_position.text_document.uri.as_str(),
3559                    "file:///dir/one.rs"
3560                );
3561                assert_eq!(
3562                    params.text_document_position.position,
3563                    lsp::Position::new(0, 6)
3564                );
3565                assert_eq!(params.new_name, "THREE");
3566                Some(lsp::WorkspaceEdit {
3567                    changes: Some(
3568                        [
3569                            (
3570                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3571                                vec![lsp::TextEdit::new(
3572                                    lsp::Range::new(
3573                                        lsp::Position::new(0, 6),
3574                                        lsp::Position::new(0, 9),
3575                                    ),
3576                                    "THREE".to_string(),
3577                                )],
3578                            ),
3579                            (
3580                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3581                                vec![
3582                                    lsp::TextEdit::new(
3583                                        lsp::Range::new(
3584                                            lsp::Position::new(0, 24),
3585                                            lsp::Position::new(0, 27),
3586                                        ),
3587                                        "THREE".to_string(),
3588                                    ),
3589                                    lsp::TextEdit::new(
3590                                        lsp::Range::new(
3591                                            lsp::Position::new(0, 35),
3592                                            lsp::Position::new(0, 38),
3593                                        ),
3594                                        "THREE".to_string(),
3595                                    ),
3596                                ],
3597                            ),
3598                        ]
3599                        .into_iter()
3600                        .collect(),
3601                    ),
3602                    ..Default::default()
3603                })
3604            })
3605            .next()
3606            .await
3607            .unwrap();
3608        confirm_rename.await.unwrap();
3609
3610        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3611            workspace
3612                .active_item(cx)
3613                .unwrap()
3614                .downcast::<Editor>()
3615                .unwrap()
3616        });
3617        rename_editor.update(cx_b, |editor, cx| {
3618            assert_eq!(
3619                editor.text(cx),
3620                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3621            );
3622            editor.undo(&Undo, cx);
3623            assert_eq!(
3624                editor.text(cx),
3625                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3626            );
3627            editor.redo(&Redo, cx);
3628            assert_eq!(
3629                editor.text(cx),
3630                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3631            );
3632        });
3633
3634        // Ensure temporary rename edits cannot be undone/redone.
3635        editor_b.update(cx_b, |editor, cx| {
3636            editor.undo(&Undo, cx);
3637            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3638            editor.undo(&Undo, cx);
3639            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3640            editor.redo(&Redo, cx);
3641            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3642        })
3643    }
3644
3645    #[gpui::test(iterations = 10)]
3646    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3647        cx_a.foreground().forbid_parking();
3648
3649        // Connect to a server as 2 clients.
3650        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3651        let client_a = server.create_client(cx_a, "user_a").await;
3652        let client_b = server.create_client(cx_b, "user_b").await;
3653
3654        // Create an org that includes these 2 users.
3655        let db = &server.app_state.db;
3656        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3657        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3658            .await
3659            .unwrap();
3660        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3661            .await
3662            .unwrap();
3663
3664        // Create a channel that includes all the users.
3665        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3666        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3667            .await
3668            .unwrap();
3669        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3670            .await
3671            .unwrap();
3672        db.create_channel_message(
3673            channel_id,
3674            client_b.current_user_id(&cx_b),
3675            "hello A, it's B.",
3676            OffsetDateTime::now_utc(),
3677            1,
3678        )
3679        .await
3680        .unwrap();
3681
3682        let channels_a = cx_a
3683            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3684        channels_a
3685            .condition(cx_a, |list, _| list.available_channels().is_some())
3686            .await;
3687        channels_a.read_with(cx_a, |list, _| {
3688            assert_eq!(
3689                list.available_channels().unwrap(),
3690                &[ChannelDetails {
3691                    id: channel_id.to_proto(),
3692                    name: "test-channel".to_string()
3693                }]
3694            )
3695        });
3696        let channel_a = channels_a.update(cx_a, |this, cx| {
3697            this.get_channel(channel_id.to_proto(), cx).unwrap()
3698        });
3699        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3700        channel_a
3701            .condition(&cx_a, |channel, _| {
3702                channel_messages(channel)
3703                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3704            })
3705            .await;
3706
3707        let channels_b = cx_b
3708            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3709        channels_b
3710            .condition(cx_b, |list, _| list.available_channels().is_some())
3711            .await;
3712        channels_b.read_with(cx_b, |list, _| {
3713            assert_eq!(
3714                list.available_channels().unwrap(),
3715                &[ChannelDetails {
3716                    id: channel_id.to_proto(),
3717                    name: "test-channel".to_string()
3718                }]
3719            )
3720        });
3721
3722        let channel_b = channels_b.update(cx_b, |this, cx| {
3723            this.get_channel(channel_id.to_proto(), cx).unwrap()
3724        });
3725        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3726        channel_b
3727            .condition(&cx_b, |channel, _| {
3728                channel_messages(channel)
3729                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3730            })
3731            .await;
3732
3733        channel_a
3734            .update(cx_a, |channel, cx| {
3735                channel
3736                    .send_message("oh, hi B.".to_string(), cx)
3737                    .unwrap()
3738                    .detach();
3739                let task = channel.send_message("sup".to_string(), cx).unwrap();
3740                assert_eq!(
3741                    channel_messages(channel),
3742                    &[
3743                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3744                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3745                        ("user_a".to_string(), "sup".to_string(), true)
3746                    ]
3747                );
3748                task
3749            })
3750            .await
3751            .unwrap();
3752
3753        channel_b
3754            .condition(&cx_b, |channel, _| {
3755                channel_messages(channel)
3756                    == [
3757                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3758                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3759                        ("user_a".to_string(), "sup".to_string(), false),
3760                    ]
3761            })
3762            .await;
3763
3764        assert_eq!(
3765            server
3766                .state()
3767                .await
3768                .channel(channel_id)
3769                .unwrap()
3770                .connection_ids
3771                .len(),
3772            2
3773        );
3774        cx_b.update(|_| drop(channel_b));
3775        server
3776            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3777            .await;
3778
3779        cx_a.update(|_| drop(channel_a));
3780        server
3781            .condition(|state| state.channel(channel_id).is_none())
3782            .await;
3783    }
3784
3785    #[gpui::test(iterations = 10)]
3786    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3787        cx_a.foreground().forbid_parking();
3788
3789        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3790        let client_a = server.create_client(cx_a, "user_a").await;
3791
3792        let db = &server.app_state.db;
3793        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3794        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3795        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3796            .await
3797            .unwrap();
3798        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3799            .await
3800            .unwrap();
3801
3802        let channels_a = cx_a
3803            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3804        channels_a
3805            .condition(cx_a, |list, _| list.available_channels().is_some())
3806            .await;
3807        let channel_a = channels_a.update(cx_a, |this, cx| {
3808            this.get_channel(channel_id.to_proto(), cx).unwrap()
3809        });
3810
3811        // Messages aren't allowed to be too long.
3812        channel_a
3813            .update(cx_a, |channel, cx| {
3814                let long_body = "this is long.\n".repeat(1024);
3815                channel.send_message(long_body, cx).unwrap()
3816            })
3817            .await
3818            .unwrap_err();
3819
3820        // Messages aren't allowed to be blank.
3821        channel_a.update(cx_a, |channel, cx| {
3822            channel.send_message(String::new(), cx).unwrap_err()
3823        });
3824
3825        // Leading and trailing whitespace are trimmed.
3826        channel_a
3827            .update(cx_a, |channel, cx| {
3828                channel
3829                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3830                    .unwrap()
3831            })
3832            .await
3833            .unwrap();
3834        assert_eq!(
3835            db.get_channel_messages(channel_id, 10, None)
3836                .await
3837                .unwrap()
3838                .iter()
3839                .map(|m| &m.body)
3840                .collect::<Vec<_>>(),
3841            &["surrounded by whitespace"]
3842        );
3843    }
3844
3845    #[gpui::test(iterations = 10)]
3846    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3847        cx_a.foreground().forbid_parking();
3848
3849        // Connect to a server as 2 clients.
3850        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3851        let client_a = server.create_client(cx_a, "user_a").await;
3852        let client_b = server.create_client(cx_b, "user_b").await;
3853        let mut status_b = client_b.status();
3854
3855        // Create an org that includes these 2 users.
3856        let db = &server.app_state.db;
3857        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3858        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3859            .await
3860            .unwrap();
3861        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3862            .await
3863            .unwrap();
3864
3865        // Create a channel that includes all the users.
3866        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3867        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3868            .await
3869            .unwrap();
3870        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3871            .await
3872            .unwrap();
3873        db.create_channel_message(
3874            channel_id,
3875            client_b.current_user_id(&cx_b),
3876            "hello A, it's B.",
3877            OffsetDateTime::now_utc(),
3878            2,
3879        )
3880        .await
3881        .unwrap();
3882
3883        let channels_a = cx_a
3884            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3885        channels_a
3886            .condition(cx_a, |list, _| list.available_channels().is_some())
3887            .await;
3888
3889        channels_a.read_with(cx_a, |list, _| {
3890            assert_eq!(
3891                list.available_channels().unwrap(),
3892                &[ChannelDetails {
3893                    id: channel_id.to_proto(),
3894                    name: "test-channel".to_string()
3895                }]
3896            )
3897        });
3898        let channel_a = channels_a.update(cx_a, |this, cx| {
3899            this.get_channel(channel_id.to_proto(), cx).unwrap()
3900        });
3901        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3902        channel_a
3903            .condition(&cx_a, |channel, _| {
3904                channel_messages(channel)
3905                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3906            })
3907            .await;
3908
3909        let channels_b = cx_b
3910            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3911        channels_b
3912            .condition(cx_b, |list, _| list.available_channels().is_some())
3913            .await;
3914        channels_b.read_with(cx_b, |list, _| {
3915            assert_eq!(
3916                list.available_channels().unwrap(),
3917                &[ChannelDetails {
3918                    id: channel_id.to_proto(),
3919                    name: "test-channel".to_string()
3920                }]
3921            )
3922        });
3923
3924        let channel_b = channels_b.update(cx_b, |this, cx| {
3925            this.get_channel(channel_id.to_proto(), cx).unwrap()
3926        });
3927        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3928        channel_b
3929            .condition(&cx_b, |channel, _| {
3930                channel_messages(channel)
3931                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3932            })
3933            .await;
3934
3935        // Disconnect client B, ensuring we can still access its cached channel data.
3936        server.forbid_connections();
3937        server.disconnect_client(client_b.current_user_id(&cx_b));
3938        cx_b.foreground().advance_clock(Duration::from_secs(3));
3939        while !matches!(
3940            status_b.next().await,
3941            Some(client::Status::ReconnectionError { .. })
3942        ) {}
3943
3944        channels_b.read_with(cx_b, |channels, _| {
3945            assert_eq!(
3946                channels.available_channels().unwrap(),
3947                [ChannelDetails {
3948                    id: channel_id.to_proto(),
3949                    name: "test-channel".to_string()
3950                }]
3951            )
3952        });
3953        channel_b.read_with(cx_b, |channel, _| {
3954            assert_eq!(
3955                channel_messages(channel),
3956                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3957            )
3958        });
3959
3960        // Send a message from client B while it is disconnected.
3961        channel_b
3962            .update(cx_b, |channel, cx| {
3963                let task = channel
3964                    .send_message("can you see this?".to_string(), cx)
3965                    .unwrap();
3966                assert_eq!(
3967                    channel_messages(channel),
3968                    &[
3969                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3970                        ("user_b".to_string(), "can you see this?".to_string(), true)
3971                    ]
3972                );
3973                task
3974            })
3975            .await
3976            .unwrap_err();
3977
3978        // Send a message from client A while B is disconnected.
3979        channel_a
3980            .update(cx_a, |channel, cx| {
3981                channel
3982                    .send_message("oh, hi B.".to_string(), cx)
3983                    .unwrap()
3984                    .detach();
3985                let task = channel.send_message("sup".to_string(), cx).unwrap();
3986                assert_eq!(
3987                    channel_messages(channel),
3988                    &[
3989                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3990                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3991                        ("user_a".to_string(), "sup".to_string(), true)
3992                    ]
3993                );
3994                task
3995            })
3996            .await
3997            .unwrap();
3998
3999        // Give client B a chance to reconnect.
4000        server.allow_connections();
4001        cx_b.foreground().advance_clock(Duration::from_secs(10));
4002
4003        // Verify that B sees the new messages upon reconnection, as well as the message client B
4004        // sent while offline.
4005        channel_b
4006            .condition(&cx_b, |channel, _| {
4007                channel_messages(channel)
4008                    == [
4009                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4010                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4011                        ("user_a".to_string(), "sup".to_string(), false),
4012                        ("user_b".to_string(), "can you see this?".to_string(), false),
4013                    ]
4014            })
4015            .await;
4016
4017        // Ensure client A and B can communicate normally after reconnection.
4018        channel_a
4019            .update(cx_a, |channel, cx| {
4020                channel.send_message("you online?".to_string(), cx).unwrap()
4021            })
4022            .await
4023            .unwrap();
4024        channel_b
4025            .condition(&cx_b, |channel, _| {
4026                channel_messages(channel)
4027                    == [
4028                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4029                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4030                        ("user_a".to_string(), "sup".to_string(), false),
4031                        ("user_b".to_string(), "can you see this?".to_string(), false),
4032                        ("user_a".to_string(), "you online?".to_string(), false),
4033                    ]
4034            })
4035            .await;
4036
4037        channel_b
4038            .update(cx_b, |channel, cx| {
4039                channel.send_message("yep".to_string(), cx).unwrap()
4040            })
4041            .await
4042            .unwrap();
4043        channel_a
4044            .condition(&cx_a, |channel, _| {
4045                channel_messages(channel)
4046                    == [
4047                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4048                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4049                        ("user_a".to_string(), "sup".to_string(), false),
4050                        ("user_b".to_string(), "can you see this?".to_string(), false),
4051                        ("user_a".to_string(), "you online?".to_string(), false),
4052                        ("user_b".to_string(), "yep".to_string(), false),
4053                    ]
4054            })
4055            .await;
4056    }
4057
4058    #[gpui::test(iterations = 10)]
4059    async fn test_contacts(
4060        cx_a: &mut TestAppContext,
4061        cx_b: &mut TestAppContext,
4062        cx_c: &mut TestAppContext,
4063    ) {
4064        cx_a.foreground().forbid_parking();
4065        let lang_registry = Arc::new(LanguageRegistry::test());
4066        let fs = FakeFs::new(cx_a.background());
4067
4068        // Connect to a server as 3 clients.
4069        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4070        let client_a = server.create_client(cx_a, "user_a").await;
4071        let client_b = server.create_client(cx_b, "user_b").await;
4072        let client_c = server.create_client(cx_c, "user_c").await;
4073
4074        // Share a worktree as client A.
4075        fs.insert_tree(
4076            "/a",
4077            json!({
4078                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4079            }),
4080        )
4081        .await;
4082
4083        let project_a = cx_a.update(|cx| {
4084            Project::local(
4085                client_a.clone(),
4086                client_a.user_store.clone(),
4087                lang_registry.clone(),
4088                fs.clone(),
4089                cx,
4090            )
4091        });
4092        let (worktree_a, _) = project_a
4093            .update(cx_a, |p, cx| {
4094                p.find_or_create_local_worktree("/a", true, cx)
4095            })
4096            .await
4097            .unwrap();
4098        worktree_a
4099            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4100            .await;
4101
4102        client_a
4103            .user_store
4104            .condition(&cx_a, |user_store, _| {
4105                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4106            })
4107            .await;
4108        client_b
4109            .user_store
4110            .condition(&cx_b, |user_store, _| {
4111                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4112            })
4113            .await;
4114        client_c
4115            .user_store
4116            .condition(&cx_c, |user_store, _| {
4117                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4118            })
4119            .await;
4120
4121        let project_id = project_a
4122            .update(cx_a, |project, _| project.next_remote_id())
4123            .await;
4124        project_a
4125            .update(cx_a, |project, cx| project.share(cx))
4126            .await
4127            .unwrap();
4128
4129        let _project_b = Project::remote(
4130            project_id,
4131            client_b.clone(),
4132            client_b.user_store.clone(),
4133            lang_registry.clone(),
4134            fs.clone(),
4135            &mut cx_b.to_async(),
4136        )
4137        .await
4138        .unwrap();
4139
4140        client_a
4141            .user_store
4142            .condition(&cx_a, |user_store, _| {
4143                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4144            })
4145            .await;
4146        client_b
4147            .user_store
4148            .condition(&cx_b, |user_store, _| {
4149                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4150            })
4151            .await;
4152        client_c
4153            .user_store
4154            .condition(&cx_c, |user_store, _| {
4155                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4156            })
4157            .await;
4158
4159        project_a
4160            .condition(&cx_a, |project, _| {
4161                project.collaborators().contains_key(&client_b.peer_id)
4162            })
4163            .await;
4164
4165        cx_a.update(move |_| drop(project_a));
4166        client_a
4167            .user_store
4168            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4169            .await;
4170        client_b
4171            .user_store
4172            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4173            .await;
4174        client_c
4175            .user_store
4176            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4177            .await;
4178
4179        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4180            user_store
4181                .contacts()
4182                .iter()
4183                .map(|contact| {
4184                    let worktrees = contact
4185                        .projects
4186                        .iter()
4187                        .map(|p| {
4188                            (
4189                                p.worktree_root_names[0].as_str(),
4190                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4191                            )
4192                        })
4193                        .collect();
4194                    (contact.user.github_login.as_str(), worktrees)
4195                })
4196                .collect()
4197        }
4198    }
4199
4200    #[gpui::test(iterations = 100)]
4201    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4202        cx.foreground().forbid_parking();
4203        let max_peers = env::var("MAX_PEERS")
4204            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4205            .unwrap_or(5);
4206        let max_operations = env::var("OPERATIONS")
4207            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4208            .unwrap_or(10);
4209
4210        let rng = Arc::new(Mutex::new(rng));
4211
4212        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4213        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4214
4215        let fs = FakeFs::new(cx.background());
4216        fs.insert_tree(
4217            "/_collab",
4218            json!({
4219                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4220            }),
4221        )
4222        .await;
4223
4224        let operations = Rc::new(Cell::new(0));
4225        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4226        let mut clients = Vec::new();
4227
4228        let mut next_entity_id = 100000;
4229        let mut host_cx = TestAppContext::new(
4230            cx.foreground_platform(),
4231            cx.platform(),
4232            cx.foreground(),
4233            cx.background(),
4234            cx.font_cache(),
4235            cx.leak_detector(),
4236            next_entity_id,
4237        );
4238        let host = server.create_client(&mut host_cx, "host").await;
4239        let host_project = host_cx.update(|cx| {
4240            Project::local(
4241                host.client.clone(),
4242                host.user_store.clone(),
4243                Arc::new(LanguageRegistry::test()),
4244                fs.clone(),
4245                cx,
4246            )
4247        });
4248        let host_project_id = host_project
4249            .update(&mut host_cx, |p, _| p.next_remote_id())
4250            .await;
4251
4252        let (collab_worktree, _) = host_project
4253            .update(&mut host_cx, |project, cx| {
4254                project.find_or_create_local_worktree("/_collab", true, cx)
4255            })
4256            .await
4257            .unwrap();
4258        collab_worktree
4259            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4260            .await;
4261        host_project
4262            .update(&mut host_cx, |project, cx| project.share(cx))
4263            .await
4264            .unwrap();
4265
4266        clients.push(cx.foreground().spawn(host.simulate_host(
4267            host_project,
4268            language_server_config,
4269            operations.clone(),
4270            max_operations,
4271            rng.clone(),
4272            host_cx,
4273        )));
4274
4275        while operations.get() < max_operations {
4276            cx.background().simulate_random_delay().await;
4277            if clients.len() >= max_peers {
4278                break;
4279            } else if rng.lock().gen_bool(0.05) {
4280                operations.set(operations.get() + 1);
4281
4282                let guest_id = clients.len();
4283                log::info!("Adding guest {}", guest_id);
4284                next_entity_id += 100000;
4285                let mut guest_cx = TestAppContext::new(
4286                    cx.foreground_platform(),
4287                    cx.platform(),
4288                    cx.foreground(),
4289                    cx.background(),
4290                    cx.font_cache(),
4291                    cx.leak_detector(),
4292                    next_entity_id,
4293                );
4294                let guest = server
4295                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4296                    .await;
4297                let guest_project = Project::remote(
4298                    host_project_id,
4299                    guest.client.clone(),
4300                    guest.user_store.clone(),
4301                    guest_lang_registry.clone(),
4302                    FakeFs::new(cx.background()),
4303                    &mut guest_cx.to_async(),
4304                )
4305                .await
4306                .unwrap();
4307                clients.push(cx.foreground().spawn(guest.simulate_guest(
4308                    guest_id,
4309                    guest_project,
4310                    operations.clone(),
4311                    max_operations,
4312                    rng.clone(),
4313                    guest_cx,
4314                )));
4315
4316                log::info!("Guest {} added", guest_id);
4317            }
4318        }
4319
4320        let mut clients = futures::future::join_all(clients).await;
4321        cx.foreground().run_until_parked();
4322
4323        let (host_client, mut host_cx) = clients.remove(0);
4324        let host_project = host_client.project.as_ref().unwrap();
4325        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4326            project
4327                .worktrees(cx)
4328                .map(|worktree| {
4329                    let snapshot = worktree.read(cx).snapshot();
4330                    (snapshot.id(), snapshot)
4331                })
4332                .collect::<BTreeMap<_, _>>()
4333        });
4334
4335        host_client
4336            .project
4337            .as_ref()
4338            .unwrap()
4339            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4340
4341        for (guest_client, mut guest_cx) in clients.into_iter() {
4342            let guest_id = guest_client.client.id();
4343            let worktree_snapshots =
4344                guest_client
4345                    .project
4346                    .as_ref()
4347                    .unwrap()
4348                    .read_with(&guest_cx, |project, cx| {
4349                        project
4350                            .worktrees(cx)
4351                            .map(|worktree| {
4352                                let worktree = worktree.read(cx);
4353                                (worktree.id(), worktree.snapshot())
4354                            })
4355                            .collect::<BTreeMap<_, _>>()
4356                    });
4357
4358            assert_eq!(
4359                worktree_snapshots.keys().collect::<Vec<_>>(),
4360                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4361                "guest {} has different worktrees than the host",
4362                guest_id
4363            );
4364            for (id, host_snapshot) in &host_worktree_snapshots {
4365                let guest_snapshot = &worktree_snapshots[id];
4366                assert_eq!(
4367                    guest_snapshot.root_name(),
4368                    host_snapshot.root_name(),
4369                    "guest {} has different root name than the host for worktree {}",
4370                    guest_id,
4371                    id
4372                );
4373                assert_eq!(
4374                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4375                    host_snapshot.entries(false).collect::<Vec<_>>(),
4376                    "guest {} has different snapshot than the host for worktree {}",
4377                    guest_id,
4378                    id
4379                );
4380            }
4381
4382            guest_client
4383                .project
4384                .as_ref()
4385                .unwrap()
4386                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4387
4388            for guest_buffer in &guest_client.buffers {
4389                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4390                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4391                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4392                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4393                        guest_id, guest_client.peer_id, buffer_id
4394                    ))
4395                });
4396                let path = host_buffer
4397                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4398
4399                assert_eq!(
4400                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4401                    0,
4402                    "guest {}, buffer {}, path {:?} has deferred operations",
4403                    guest_id,
4404                    buffer_id,
4405                    path,
4406                );
4407                assert_eq!(
4408                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4409                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4410                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4411                    guest_id,
4412                    buffer_id,
4413                    path
4414                );
4415            }
4416
4417            guest_cx.update(|_| drop(guest_client));
4418        }
4419
4420        host_cx.update(|_| drop(host_client));
4421    }
4422
4423    struct TestServer {
4424        peer: Arc<Peer>,
4425        app_state: Arc<AppState>,
4426        server: Arc<Server>,
4427        foreground: Rc<executor::Foreground>,
4428        notifications: mpsc::UnboundedReceiver<()>,
4429        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4430        forbid_connections: Arc<AtomicBool>,
4431        _test_db: TestDb,
4432    }
4433
4434    impl TestServer {
4435        async fn start(
4436            foreground: Rc<executor::Foreground>,
4437            background: Arc<executor::Background>,
4438        ) -> Self {
4439            let test_db = TestDb::fake(background);
4440            let app_state = Self::build_app_state(&test_db).await;
4441            let peer = Peer::new();
4442            let notifications = mpsc::unbounded();
4443            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4444            Self {
4445                peer,
4446                app_state,
4447                server,
4448                foreground,
4449                notifications: notifications.1,
4450                connection_killers: Default::default(),
4451                forbid_connections: Default::default(),
4452                _test_db: test_db,
4453            }
4454        }
4455
4456        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4457            cx.update(|cx| {
4458                let settings = Settings::test(cx);
4459                cx.add_app_state(settings);
4460            });
4461
4462            let http = FakeHttpClient::with_404_response();
4463            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4464            let client_name = name.to_string();
4465            let mut client = Client::new(http.clone());
4466            let server = self.server.clone();
4467            let connection_killers = self.connection_killers.clone();
4468            let forbid_connections = self.forbid_connections.clone();
4469            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4470
4471            Arc::get_mut(&mut client)
4472                .unwrap()
4473                .override_authenticate(move |cx| {
4474                    cx.spawn(|_| async move {
4475                        let access_token = "the-token".to_string();
4476                        Ok(Credentials {
4477                            user_id: user_id.0 as u64,
4478                            access_token,
4479                        })
4480                    })
4481                })
4482                .override_establish_connection(move |credentials, cx| {
4483                    assert_eq!(credentials.user_id, user_id.0 as u64);
4484                    assert_eq!(credentials.access_token, "the-token");
4485
4486                    let server = server.clone();
4487                    let connection_killers = connection_killers.clone();
4488                    let forbid_connections = forbid_connections.clone();
4489                    let client_name = client_name.clone();
4490                    let connection_id_tx = connection_id_tx.clone();
4491                    cx.spawn(move |cx| async move {
4492                        if forbid_connections.load(SeqCst) {
4493                            Err(EstablishConnectionError::other(anyhow!(
4494                                "server is forbidding connections"
4495                            )))
4496                        } else {
4497                            let (client_conn, server_conn, kill_conn) =
4498                                Connection::in_memory(cx.background());
4499                            connection_killers.lock().insert(user_id, kill_conn);
4500                            cx.background()
4501                                .spawn(server.handle_connection(
4502                                    server_conn,
4503                                    client_name,
4504                                    user_id,
4505                                    Some(connection_id_tx),
4506                                    cx.background(),
4507                                ))
4508                                .detach();
4509                            Ok(client_conn)
4510                        }
4511                    })
4512                });
4513
4514            client
4515                .authenticate_and_connect(&cx.to_async())
4516                .await
4517                .unwrap();
4518
4519            Channel::init(&client);
4520            Project::init(&client);
4521
4522            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4523            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4524
4525            let client = TestClient {
4526                client,
4527                peer_id,
4528                user_store,
4529                project: Default::default(),
4530                buffers: Default::default(),
4531            };
4532            client.wait_for_current_user(cx).await;
4533            client
4534        }
4535
4536        fn disconnect_client(&self, user_id: UserId) {
4537            self.connection_killers.lock().remove(&user_id);
4538        }
4539
4540        fn forbid_connections(&self) {
4541            self.forbid_connections.store(true, SeqCst);
4542        }
4543
4544        fn allow_connections(&self) {
4545            self.forbid_connections.store(false, SeqCst);
4546        }
4547
4548        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4549            let mut config = Config::default();
4550            config.session_secret = "a".repeat(32);
4551            config.database_url = test_db.url.clone();
4552            let github_client = github::AppClient::test();
4553            Arc::new(AppState {
4554                db: test_db.db().clone(),
4555                handlebars: Default::default(),
4556                auth_client: auth::build_client("", ""),
4557                repo_client: github::RepoClient::test(&github_client),
4558                github_client,
4559                config,
4560            })
4561        }
4562
4563        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4564            self.server.store.read()
4565        }
4566
4567        async fn condition<F>(&mut self, mut predicate: F)
4568        where
4569            F: FnMut(&Store) -> bool,
4570        {
4571            async_std::future::timeout(Duration::from_millis(500), async {
4572                while !(predicate)(&*self.server.store.read()) {
4573                    self.foreground.start_waiting();
4574                    self.notifications.next().await;
4575                    self.foreground.finish_waiting();
4576                }
4577            })
4578            .await
4579            .expect("condition timed out");
4580        }
4581    }
4582
4583    impl Drop for TestServer {
4584        fn drop(&mut self) {
4585            self.peer.reset();
4586        }
4587    }
4588
4589    struct TestClient {
4590        client: Arc<Client>,
4591        pub peer_id: PeerId,
4592        pub user_store: ModelHandle<UserStore>,
4593        project: Option<ModelHandle<Project>>,
4594        buffers: HashSet<ModelHandle<language::Buffer>>,
4595    }
4596
4597    impl Deref for TestClient {
4598        type Target = Arc<Client>;
4599
4600        fn deref(&self) -> &Self::Target {
4601            &self.client
4602        }
4603    }
4604
4605    impl TestClient {
4606        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4607            UserId::from_proto(
4608                self.user_store
4609                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4610            )
4611        }
4612
4613        async fn wait_for_current_user(&self, cx: &TestAppContext) {
4614            let mut authed_user = self
4615                .user_store
4616                .read_with(cx, |user_store, _| user_store.watch_current_user());
4617            while authed_user.next().await.unwrap().is_none() {}
4618        }
4619
4620        fn simulate_host(
4621            mut self,
4622            project: ModelHandle<Project>,
4623            mut language_server_config: LanguageServerConfig,
4624            operations: Rc<Cell<usize>>,
4625            max_operations: usize,
4626            rng: Arc<Mutex<StdRng>>,
4627            mut cx: TestAppContext,
4628        ) -> impl Future<Output = (Self, TestAppContext)> {
4629            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4630
4631            // Set up a fake language server.
4632            language_server_config.set_fake_initializer({
4633                let rng = rng.clone();
4634                let files = files.clone();
4635                let project = project.downgrade();
4636                move |fake_server| {
4637                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4638                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4639                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4640                                range: lsp::Range::new(
4641                                    lsp::Position::new(0, 0),
4642                                    lsp::Position::new(0, 0),
4643                                ),
4644                                new_text: "the-new-text".to_string(),
4645                            })),
4646                            ..Default::default()
4647                        }]))
4648                    });
4649
4650                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4651                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4652                            lsp::CodeAction {
4653                                title: "the-code-action".to_string(),
4654                                ..Default::default()
4655                            },
4656                        )])
4657                    });
4658
4659                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4660                        |params, _| {
4661                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4662                                params.position,
4663                                params.position,
4664                            )))
4665                        },
4666                    );
4667
4668                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4669                        let files = files.clone();
4670                        let rng = rng.clone();
4671                        move |_, _| {
4672                            let files = files.lock();
4673                            let mut rng = rng.lock();
4674                            let count = rng.gen_range::<usize, _>(1..3);
4675                            let files = (0..count)
4676                                .map(|_| files.choose(&mut *rng).unwrap())
4677                                .collect::<Vec<_>>();
4678                            log::info!("LSP: Returning definitions in files {:?}", &files);
4679                            Some(lsp::GotoDefinitionResponse::Array(
4680                                files
4681                                    .into_iter()
4682                                    .map(|file| lsp::Location {
4683                                        uri: lsp::Url::from_file_path(file).unwrap(),
4684                                        range: Default::default(),
4685                                    })
4686                                    .collect(),
4687                            ))
4688                        }
4689                    });
4690
4691                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4692                        let rng = rng.clone();
4693                        let project = project.clone();
4694                        move |params, mut cx| {
4695                            if let Some(project) = project.upgrade(&cx) {
4696                                project.update(&mut cx, |project, cx| {
4697                                    let path = params
4698                                        .text_document_position_params
4699                                        .text_document
4700                                        .uri
4701                                        .to_file_path()
4702                                        .unwrap();
4703                                    let (worktree, relative_path) =
4704                                        project.find_local_worktree(&path, cx)?;
4705                                    let project_path =
4706                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4707                                    let buffer =
4708                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4709
4710                                    let mut highlights = Vec::new();
4711                                    let highlight_count = rng.lock().gen_range(1..=5);
4712                                    let mut prev_end = 0;
4713                                    for _ in 0..highlight_count {
4714                                        let range =
4715                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4716                                        let start = buffer
4717                                            .offset_to_point_utf16(range.start)
4718                                            .to_lsp_position();
4719                                        let end = buffer
4720                                            .offset_to_point_utf16(range.end)
4721                                            .to_lsp_position();
4722                                        highlights.push(lsp::DocumentHighlight {
4723                                            range: lsp::Range::new(start, end),
4724                                            kind: Some(lsp::DocumentHighlightKind::READ),
4725                                        });
4726                                        prev_end = range.end;
4727                                    }
4728                                    Some(highlights)
4729                                })
4730                            } else {
4731                                None
4732                            }
4733                        }
4734                    });
4735                }
4736            });
4737
4738            project.update(&mut cx, |project, _| {
4739                project.languages().add(Arc::new(Language::new(
4740                    LanguageConfig {
4741                        name: "Rust".into(),
4742                        path_suffixes: vec!["rs".to_string()],
4743                        language_server: Some(language_server_config),
4744                        ..Default::default()
4745                    },
4746                    None,
4747                )));
4748            });
4749
4750            async move {
4751                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4752                while operations.get() < max_operations {
4753                    operations.set(operations.get() + 1);
4754
4755                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4756                    match distribution {
4757                        0..=20 if !files.lock().is_empty() => {
4758                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4759                            let mut path = path.as_path();
4760                            while let Some(parent_path) = path.parent() {
4761                                path = parent_path;
4762                                if rng.lock().gen() {
4763                                    break;
4764                                }
4765                            }
4766
4767                            log::info!("Host: find/create local worktree {:?}", path);
4768                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4769                                project.find_or_create_local_worktree(path, true, cx)
4770                            });
4771                            let find_or_create_worktree = async move {
4772                                find_or_create_worktree.await.unwrap();
4773                            };
4774                            if rng.lock().gen() {
4775                                cx.background().spawn(find_or_create_worktree).detach();
4776                            } else {
4777                                find_or_create_worktree.await;
4778                            }
4779                        }
4780                        10..=80 if !files.lock().is_empty() => {
4781                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4782                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4783                                let (worktree, path) = project
4784                                    .update(&mut cx, |project, cx| {
4785                                        project.find_or_create_local_worktree(
4786                                            file.clone(),
4787                                            true,
4788                                            cx,
4789                                        )
4790                                    })
4791                                    .await
4792                                    .unwrap();
4793                                let project_path =
4794                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4795                                log::info!(
4796                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4797                                    file,
4798                                    project_path.0,
4799                                    project_path.1
4800                                );
4801                                let buffer = project
4802                                    .update(&mut cx, |project, cx| {
4803                                        project.open_buffer_for_path(project_path, cx)
4804                                    })
4805                                    .await
4806                                    .unwrap();
4807                                self.buffers.insert(buffer.clone());
4808                                buffer
4809                            } else {
4810                                self.buffers
4811                                    .iter()
4812                                    .choose(&mut *rng.lock())
4813                                    .unwrap()
4814                                    .clone()
4815                            };
4816
4817                            if rng.lock().gen_bool(0.1) {
4818                                cx.update(|cx| {
4819                                    log::info!(
4820                                        "Host: dropping buffer {:?}",
4821                                        buffer.read(cx).file().unwrap().full_path(cx)
4822                                    );
4823                                    self.buffers.remove(&buffer);
4824                                    drop(buffer);
4825                                });
4826                            } else {
4827                                buffer.update(&mut cx, |buffer, cx| {
4828                                    log::info!(
4829                                        "Host: updating buffer {:?} ({})",
4830                                        buffer.file().unwrap().full_path(cx),
4831                                        buffer.remote_id()
4832                                    );
4833                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4834                                });
4835                            }
4836                        }
4837                        _ => loop {
4838                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4839                            let mut path = PathBuf::new();
4840                            path.push("/");
4841                            for _ in 0..path_component_count {
4842                                let letter = rng.lock().gen_range(b'a'..=b'z');
4843                                path.push(std::str::from_utf8(&[letter]).unwrap());
4844                            }
4845                            path.set_extension("rs");
4846                            let parent_path = path.parent().unwrap();
4847
4848                            log::info!("Host: creating file {:?}", path,);
4849
4850                            if fs.create_dir(&parent_path).await.is_ok()
4851                                && fs.create_file(&path, Default::default()).await.is_ok()
4852                            {
4853                                files.lock().push(path);
4854                                break;
4855                            } else {
4856                                log::info!("Host: cannot create file");
4857                            }
4858                        },
4859                    }
4860
4861                    cx.background().simulate_random_delay().await;
4862                }
4863
4864                log::info!("Host done");
4865
4866                self.project = Some(project);
4867                (self, cx)
4868            }
4869        }
4870
4871        pub async fn simulate_guest(
4872            mut self,
4873            guest_id: usize,
4874            project: ModelHandle<Project>,
4875            operations: Rc<Cell<usize>>,
4876            max_operations: usize,
4877            rng: Arc<Mutex<StdRng>>,
4878            mut cx: TestAppContext,
4879        ) -> (Self, TestAppContext) {
4880            while operations.get() < max_operations {
4881                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4882                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4883                        project
4884                            .worktrees(&cx)
4885                            .filter(|worktree| {
4886                                let worktree = worktree.read(cx);
4887                                worktree.is_visible()
4888                                    && worktree.entries(false).any(|e| e.is_file())
4889                            })
4890                            .choose(&mut *rng.lock())
4891                    }) {
4892                        worktree
4893                    } else {
4894                        cx.background().simulate_random_delay().await;
4895                        continue;
4896                    };
4897
4898                    operations.set(operations.get() + 1);
4899                    let (worktree_root_name, project_path) =
4900                        worktree.read_with(&cx, |worktree, _| {
4901                            let entry = worktree
4902                                .entries(false)
4903                                .filter(|e| e.is_file())
4904                                .choose(&mut *rng.lock())
4905                                .unwrap();
4906                            (
4907                                worktree.root_name().to_string(),
4908                                (worktree.id(), entry.path.clone()),
4909                            )
4910                        });
4911                    log::info!(
4912                        "Guest {}: opening path {:?} in worktree {} ({})",
4913                        guest_id,
4914                        project_path.1,
4915                        project_path.0,
4916                        worktree_root_name,
4917                    );
4918                    let buffer = project
4919                        .update(&mut cx, |project, cx| {
4920                            project.open_buffer_for_path(project_path.clone(), cx)
4921                        })
4922                        .await
4923                        .unwrap();
4924                    log::info!(
4925                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4926                        guest_id,
4927                        project_path.1,
4928                        project_path.0,
4929                        worktree_root_name,
4930                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4931                    );
4932                    self.buffers.insert(buffer.clone());
4933                    buffer
4934                } else {
4935                    operations.set(operations.get() + 1);
4936
4937                    self.buffers
4938                        .iter()
4939                        .choose(&mut *rng.lock())
4940                        .unwrap()
4941                        .clone()
4942                };
4943
4944                let choice = rng.lock().gen_range(0..100);
4945                match choice {
4946                    0..=9 => {
4947                        cx.update(|cx| {
4948                            log::info!(
4949                                "Guest {}: dropping buffer {:?}",
4950                                guest_id,
4951                                buffer.read(cx).file().unwrap().full_path(cx)
4952                            );
4953                            self.buffers.remove(&buffer);
4954                            drop(buffer);
4955                        });
4956                    }
4957                    10..=19 => {
4958                        let completions = project.update(&mut cx, |project, cx| {
4959                            log::info!(
4960                                "Guest {}: requesting completions for buffer {} ({:?})",
4961                                guest_id,
4962                                buffer.read(cx).remote_id(),
4963                                buffer.read(cx).file().unwrap().full_path(cx)
4964                            );
4965                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4966                            project.completions(&buffer, offset, cx)
4967                        });
4968                        let completions = cx.background().spawn(async move {
4969                            completions.await.expect("completions request failed");
4970                        });
4971                        if rng.lock().gen_bool(0.3) {
4972                            log::info!("Guest {}: detaching completions request", guest_id);
4973                            completions.detach();
4974                        } else {
4975                            completions.await;
4976                        }
4977                    }
4978                    20..=29 => {
4979                        let code_actions = project.update(&mut cx, |project, cx| {
4980                            log::info!(
4981                                "Guest {}: requesting code actions for buffer {} ({:?})",
4982                                guest_id,
4983                                buffer.read(cx).remote_id(),
4984                                buffer.read(cx).file().unwrap().full_path(cx)
4985                            );
4986                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4987                            project.code_actions(&buffer, range, cx)
4988                        });
4989                        let code_actions = cx.background().spawn(async move {
4990                            code_actions.await.expect("code actions request failed");
4991                        });
4992                        if rng.lock().gen_bool(0.3) {
4993                            log::info!("Guest {}: detaching code actions request", guest_id);
4994                            code_actions.detach();
4995                        } else {
4996                            code_actions.await;
4997                        }
4998                    }
4999                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5000                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5001                            log::info!(
5002                                "Guest {}: saving buffer {} ({:?})",
5003                                guest_id,
5004                                buffer.remote_id(),
5005                                buffer.file().unwrap().full_path(cx)
5006                            );
5007                            (buffer.version(), buffer.save(cx))
5008                        });
5009                        let save = cx.background().spawn(async move {
5010                            let (saved_version, _) = save.await.expect("save request failed");
5011                            assert!(saved_version.observed_all(&requested_version));
5012                        });
5013                        if rng.lock().gen_bool(0.3) {
5014                            log::info!("Guest {}: detaching save request", guest_id);
5015                            save.detach();
5016                        } else {
5017                            save.await;
5018                        }
5019                    }
5020                    40..=44 => {
5021                        let prepare_rename = project.update(&mut cx, |project, cx| {
5022                            log::info!(
5023                                "Guest {}: preparing rename for buffer {} ({:?})",
5024                                guest_id,
5025                                buffer.read(cx).remote_id(),
5026                                buffer.read(cx).file().unwrap().full_path(cx)
5027                            );
5028                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5029                            project.prepare_rename(buffer, offset, cx)
5030                        });
5031                        let prepare_rename = cx.background().spawn(async move {
5032                            prepare_rename.await.expect("prepare rename request failed");
5033                        });
5034                        if rng.lock().gen_bool(0.3) {
5035                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5036                            prepare_rename.detach();
5037                        } else {
5038                            prepare_rename.await;
5039                        }
5040                    }
5041                    45..=49 => {
5042                        let definitions = project.update(&mut cx, |project, cx| {
5043                            log::info!(
5044                                "Guest {}: requesting definitions for buffer {} ({:?})",
5045                                guest_id,
5046                                buffer.read(cx).remote_id(),
5047                                buffer.read(cx).file().unwrap().full_path(cx)
5048                            );
5049                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5050                            project.definition(&buffer, offset, cx)
5051                        });
5052                        let definitions = cx.background().spawn(async move {
5053                            definitions.await.expect("definitions request failed")
5054                        });
5055                        if rng.lock().gen_bool(0.3) {
5056                            log::info!("Guest {}: detaching definitions request", guest_id);
5057                            definitions.detach();
5058                        } else {
5059                            self.buffers
5060                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5061                        }
5062                    }
5063                    50..=54 => {
5064                        let highlights = project.update(&mut cx, |project, cx| {
5065                            log::info!(
5066                                "Guest {}: requesting highlights for buffer {} ({:?})",
5067                                guest_id,
5068                                buffer.read(cx).remote_id(),
5069                                buffer.read(cx).file().unwrap().full_path(cx)
5070                            );
5071                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5072                            project.document_highlights(&buffer, offset, cx)
5073                        });
5074                        let highlights = cx.background().spawn(async move {
5075                            highlights.await.expect("highlights request failed");
5076                        });
5077                        if rng.lock().gen_bool(0.3) {
5078                            log::info!("Guest {}: detaching highlights request", guest_id);
5079                            highlights.detach();
5080                        } else {
5081                            highlights.await;
5082                        }
5083                    }
5084                    55..=59 => {
5085                        let search = project.update(&mut cx, |project, cx| {
5086                            let query = rng.lock().gen_range('a'..='z');
5087                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5088                            project.search(SearchQuery::text(query, false, false), cx)
5089                        });
5090                        let search = cx
5091                            .background()
5092                            .spawn(async move { search.await.expect("search request failed") });
5093                        if rng.lock().gen_bool(0.3) {
5094                            log::info!("Guest {}: detaching search request", guest_id);
5095                            search.detach();
5096                        } else {
5097                            self.buffers.extend(search.await.into_keys());
5098                        }
5099                    }
5100                    _ => {
5101                        buffer.update(&mut cx, |buffer, cx| {
5102                            log::info!(
5103                                "Guest {}: updating buffer {} ({:?})",
5104                                guest_id,
5105                                buffer.remote_id(),
5106                                buffer.file().unwrap().full_path(cx)
5107                            );
5108                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5109                        });
5110                    }
5111                }
5112                cx.background().simulate_random_delay().await;
5113            }
5114
5115            log::info!("Guest {} done", guest_id);
5116
5117            self.project = Some(project);
5118            (self, cx)
5119        }
5120    }
5121
5122    impl Drop for TestClient {
5123        fn drop(&mut self) {
5124            self.client.tear_down();
5125        }
5126    }
5127
5128    impl Executor for Arc<gpui::executor::Background> {
5129        type Timer = gpui::executor::Timer;
5130
5131        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5132            self.spawn(future).detach();
5133        }
5134
5135        fn timer(&self, duration: Duration) -> Self::Timer {
5136            self.as_ref().timer(duration)
5137        }
5138    }
5139
5140    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5141        channel
5142            .messages()
5143            .cursor::<()>()
5144            .map(|m| {
5145                (
5146                    m.sender.github_login.clone(),
5147                    m.body.clone(),
5148                    m.is_pending(),
5149                )
5150            })
5151            .collect()
5152    }
5153
5154    struct EmptyView;
5155
5156    impl gpui::Entity for EmptyView {
5157        type Event = ();
5158    }
5159
5160    impl gpui::View for EmptyView {
5161        fn ui_name() -> &'static str {
5162            "empty view"
5163        }
5164
5165        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5166            gpui::Element::boxed(gpui::elements::Empty)
5167        }
5168    }
5169}