rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::start_language_server)
  87            .add_message_handler(Server::update_language_server)
  88            .add_message_handler(Server::update_diagnostic_summary)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  96            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  97            .add_request_handler(
  98                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  99            )
 100            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 101            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 102            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 105            .add_request_handler(Server::update_buffer)
 106            .add_message_handler(Server::update_buffer_file)
 107            .add_message_handler(Server::buffer_reloaded)
 108            .add_message_handler(Server::buffer_saved)
 109            .add_request_handler(Server::save_buffer)
 110            .add_request_handler(Server::get_channels)
 111            .add_request_handler(Server::get_users)
 112            .add_request_handler(Server::join_channel)
 113            .add_message_handler(Server::leave_channel)
 114            .add_request_handler(Server::send_channel_message)
 115            .add_request_handler(Server::get_channel_messages);
 116
 117        Arc::new(server)
 118    }
 119
 120    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 121    where
 122        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 123        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 124        M: EnvelopedMessage,
 125    {
 126        let prev_handler = self.handlers.insert(
 127            TypeId::of::<M>(),
 128            Box::new(move |server, envelope| {
 129                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 130                (handler)(server, *envelope).boxed()
 131            }),
 132        );
 133        if prev_handler.is_some() {
 134            panic!("registered a handler for the same message twice");
 135        }
 136        self
 137    }
 138
 139    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 140    where
 141        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 142        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 143        M: RequestMessage,
 144    {
 145        self.add_message_handler(move |server, envelope| {
 146            let receipt = envelope.receipt();
 147            let response = (handler)(server.clone(), envelope);
 148            async move {
 149                match response.await {
 150                    Ok(response) => {
 151                        server.peer.respond(receipt, response)?;
 152                        Ok(())
 153                    }
 154                    Err(error) => {
 155                        server.peer.respond_with_error(
 156                            receipt,
 157                            proto::Error {
 158                                message: error.to_string(),
 159                            },
 160                        )?;
 161                        Err(error)
 162                    }
 163                }
 164            }
 165        })
 166    }
 167
 168    pub fn handle_connection<E: Executor>(
 169        self: &Arc<Self>,
 170        connection: Connection,
 171        addr: String,
 172        user_id: UserId,
 173        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 174        executor: E,
 175    ) -> impl Future<Output = ()> {
 176        let mut this = self.clone();
 177        async move {
 178            let (connection_id, handle_io, mut incoming_rx) = this
 179                .peer
 180                .add_connection(connection, {
 181                    let executor = executor.clone();
 182                    move |duration| {
 183                        let timer = executor.timer(duration);
 184                        async move {
 185                            timer.await;
 186                        }
 187                    }
 188                })
 189                .await;
 190
 191            if let Some(send_connection_id) = send_connection_id.as_mut() {
 192                let _ = send_connection_id.send(connection_id).await;
 193            }
 194
 195            this.state_mut().add_connection(connection_id, user_id);
 196            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 197                log::error!("error updating contacts for {:?}: {}", user_id, err);
 198            }
 199
 200            let handle_io = handle_io.fuse();
 201            futures::pin_mut!(handle_io);
 202            loop {
 203                let next_message = incoming_rx.next().fuse();
 204                futures::pin_mut!(next_message);
 205                futures::select_biased! {
 206                    result = handle_io => {
 207                        if let Err(err) = result {
 208                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 209                        }
 210                        break;
 211                    }
 212                    message = next_message => {
 213                        if let Some(message) = message {
 214                            let start_time = Instant::now();
 215                            let type_name = message.payload_type_name();
 216                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 217                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 218                                let notifications = this.notifications.clone();
 219                                let is_background = message.is_background();
 220                                let handle_message = (handler)(this.clone(), message);
 221                                let handle_message = async move {
 222                                    if let Err(err) = handle_message.await {
 223                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 224                                    } else {
 225                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 226                                    }
 227                                    if let Some(mut notifications) = notifications {
 228                                        let _ = notifications.send(()).await;
 229                                    }
 230                                };
 231                                if is_background {
 232                                    executor.spawn_detached(handle_message);
 233                                } else {
 234                                    handle_message.await;
 235                                }
 236                            } else {
 237                                log::warn!("unhandled message: {}", type_name);
 238                            }
 239                        } else {
 240                            log::info!("rpc connection closed {:?}", addr);
 241                            break;
 242                        }
 243                    }
 244                }
 245            }
 246
 247            if let Err(err) = this.sign_out(connection_id).await {
 248                log::error!("error signing out connection {:?} - {:?}", addr, err);
 249            }
 250        }
 251    }
 252
 253    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 254        self.peer.disconnect(connection_id);
 255        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 256
 257        for (project_id, project) in removed_connection.hosted_projects {
 258            if let Some(share) = project.share {
 259                broadcast(
 260                    connection_id,
 261                    share.guests.keys().copied().collect(),
 262                    |conn_id| {
 263                        self.peer
 264                            .send(conn_id, proto::UnshareProject { project_id })
 265                    },
 266                )?;
 267            }
 268        }
 269
 270        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 271            broadcast(connection_id, peer_ids, |conn_id| {
 272                self.peer.send(
 273                    conn_id,
 274                    proto::RemoveProjectCollaborator {
 275                        project_id,
 276                        peer_id: connection_id.0,
 277                    },
 278                )
 279            })?;
 280        }
 281
 282        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 283        Ok(())
 284    }
 285
 286    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 287        Ok(proto::Ack {})
 288    }
 289
 290    async fn register_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::RegisterProject>,
 293    ) -> tide::Result<proto::RegisterProjectResponse> {
 294        let project_id = {
 295            let mut state = self.state_mut();
 296            let user_id = state.user_id_for_connection(request.sender_id)?;
 297            state.register_project(request.sender_id, user_id)
 298        };
 299        Ok(proto::RegisterProjectResponse { project_id })
 300    }
 301
 302    async fn unregister_project(
 303        mut self: Arc<Server>,
 304        request: TypedEnvelope<proto::UnregisterProject>,
 305    ) -> tide::Result<()> {
 306        let project = self
 307            .state_mut()
 308            .unregister_project(request.payload.project_id, request.sender_id)?;
 309        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 310        Ok(())
 311    }
 312
 313    async fn share_project(
 314        mut self: Arc<Server>,
 315        request: TypedEnvelope<proto::ShareProject>,
 316    ) -> tide::Result<proto::Ack> {
 317        self.state_mut()
 318            .share_project(request.payload.project_id, request.sender_id);
 319        Ok(proto::Ack {})
 320    }
 321
 322    async fn unshare_project(
 323        mut self: Arc<Server>,
 324        request: TypedEnvelope<proto::UnshareProject>,
 325    ) -> tide::Result<()> {
 326        let project_id = request.payload.project_id;
 327        let project = self
 328            .state_mut()
 329            .unshare_project(project_id, request.sender_id)?;
 330
 331        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 332            self.peer
 333                .send(conn_id, proto::UnshareProject { project_id })
 334        })?;
 335        self.update_contacts_for_users(&project.authorized_user_ids)?;
 336        Ok(())
 337    }
 338
 339    async fn join_project(
 340        mut self: Arc<Server>,
 341        request: TypedEnvelope<proto::JoinProject>,
 342    ) -> tide::Result<proto::JoinProjectResponse> {
 343        let project_id = request.payload.project_id;
 344
 345        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 346        let (response, connection_ids, contact_user_ids) = self
 347            .state_mut()
 348            .join_project(request.sender_id, user_id, project_id)
 349            .and_then(|joined| {
 350                let share = joined.project.share()?;
 351                let peer_count = share.guests.len();
 352                let mut collaborators = Vec::with_capacity(peer_count);
 353                collaborators.push(proto::Collaborator {
 354                    peer_id: joined.project.host_connection_id.0,
 355                    replica_id: 0,
 356                    user_id: joined.project.host_user_id.to_proto(),
 357                });
 358                let worktrees = share
 359                    .worktrees
 360                    .iter()
 361                    .filter_map(|(id, shared_worktree)| {
 362                        let worktree = joined.project.worktrees.get(&id)?;
 363                        Some(proto::Worktree {
 364                            id: *id,
 365                            root_name: worktree.root_name.clone(),
 366                            entries: shared_worktree.entries.values().cloned().collect(),
 367                            diagnostic_summaries: shared_worktree
 368                                .diagnostic_summaries
 369                                .values()
 370                                .cloned()
 371                                .collect(),
 372                            visible: worktree.visible,
 373                        })
 374                    })
 375                    .collect();
 376                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 377                    if *peer_conn_id != request.sender_id {
 378                        collaborators.push(proto::Collaborator {
 379                            peer_id: peer_conn_id.0,
 380                            replica_id: *peer_replica_id as u32,
 381                            user_id: peer_user_id.to_proto(),
 382                        });
 383                    }
 384                }
 385                let response = proto::JoinProjectResponse {
 386                    worktrees,
 387                    replica_id: joined.replica_id as u32,
 388                    collaborators,
 389                    language_servers: joined.project.language_servers.clone(),
 390                };
 391                let connection_ids = joined.project.connection_ids();
 392                let contact_user_ids = joined.project.authorized_user_ids();
 393                Ok((response, connection_ids, contact_user_ids))
 394            })?;
 395
 396        broadcast(request.sender_id, connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::AddProjectCollaborator {
 400                    project_id,
 401                    collaborator: Some(proto::Collaborator {
 402                        peer_id: request.sender_id.0,
 403                        replica_id: response.replica_id,
 404                        user_id: user_id.to_proto(),
 405                    }),
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&contact_user_ids)?;
 410        Ok(response)
 411    }
 412
 413    async fn leave_project(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::LeaveProject>,
 416    ) -> tide::Result<()> {
 417        let sender_id = request.sender_id;
 418        let project_id = request.payload.project_id;
 419        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 420
 421        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 422            self.peer.send(
 423                conn_id,
 424                proto::RemoveProjectCollaborator {
 425                    project_id,
 426                    peer_id: sender_id.0,
 427                },
 428            )
 429        })?;
 430        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 431
 432        Ok(())
 433    }
 434
 435    async fn register_worktree(
 436        mut self: Arc<Server>,
 437        request: TypedEnvelope<proto::RegisterWorktree>,
 438    ) -> tide::Result<proto::Ack> {
 439        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 440
 441        let mut contact_user_ids = HashSet::default();
 442        contact_user_ids.insert(host_user_id);
 443        for github_login in &request.payload.authorized_logins {
 444            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 445            contact_user_ids.insert(contact_user_id);
 446        }
 447
 448        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 449        let guest_connection_ids;
 450        {
 451            let mut state = self.state_mut();
 452            guest_connection_ids = state
 453                .read_project(request.payload.project_id, request.sender_id)?
 454                .guest_connection_ids();
 455            state.register_worktree(
 456                request.payload.project_id,
 457                request.payload.worktree_id,
 458                request.sender_id,
 459                Worktree {
 460                    authorized_user_ids: contact_user_ids.clone(),
 461                    root_name: request.payload.root_name.clone(),
 462                    visible: request.payload.visible,
 463                },
 464            )?;
 465        }
 466        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 467            self.peer
 468                .forward_send(request.sender_id, connection_id, request.payload.clone())
 469        })?;
 470        self.update_contacts_for_users(&contact_user_ids)?;
 471        Ok(proto::Ack {})
 472    }
 473
 474    async fn unregister_worktree(
 475        mut self: Arc<Server>,
 476        request: TypedEnvelope<proto::UnregisterWorktree>,
 477    ) -> tide::Result<()> {
 478        let project_id = request.payload.project_id;
 479        let worktree_id = request.payload.worktree_id;
 480        let (worktree, guest_connection_ids) =
 481            self.state_mut()
 482                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 483        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 484            self.peer.send(
 485                conn_id,
 486                proto::UnregisterWorktree {
 487                    project_id,
 488                    worktree_id,
 489                },
 490            )
 491        })?;
 492        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 493        Ok(())
 494    }
 495
 496    async fn update_worktree(
 497        mut self: Arc<Server>,
 498        request: TypedEnvelope<proto::UpdateWorktree>,
 499    ) -> tide::Result<proto::Ack> {
 500        let connection_ids = self.state_mut().update_worktree(
 501            request.sender_id,
 502            request.payload.project_id,
 503            request.payload.worktree_id,
 504            &request.payload.removed_entries,
 505            &request.payload.updated_entries,
 506        )?;
 507
 508        broadcast(request.sender_id, connection_ids, |connection_id| {
 509            self.peer
 510                .forward_send(request.sender_id, connection_id, request.payload.clone())
 511        })?;
 512
 513        Ok(proto::Ack {})
 514    }
 515
 516    async fn update_diagnostic_summary(
 517        mut self: Arc<Server>,
 518        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 519    ) -> tide::Result<()> {
 520        let summary = request
 521            .payload
 522            .summary
 523            .clone()
 524            .ok_or_else(|| anyhow!("invalid summary"))?;
 525        let receiver_ids = self.state_mut().update_diagnostic_summary(
 526            request.payload.project_id,
 527            request.payload.worktree_id,
 528            request.sender_id,
 529            summary,
 530        )?;
 531
 532        broadcast(request.sender_id, receiver_ids, |connection_id| {
 533            self.peer
 534                .forward_send(request.sender_id, connection_id, request.payload.clone())
 535        })?;
 536        Ok(())
 537    }
 538
 539    async fn start_language_server(
 540        mut self: Arc<Server>,
 541        request: TypedEnvelope<proto::StartLanguageServer>,
 542    ) -> tide::Result<()> {
 543        let receiver_ids = self.state_mut().start_language_server(
 544            request.payload.project_id,
 545            request.sender_id,
 546            request
 547                .payload
 548                .server
 549                .clone()
 550                .ok_or_else(|| anyhow!("invalid language server"))?,
 551        )?;
 552        broadcast(request.sender_id, receiver_ids, |connection_id| {
 553            self.peer
 554                .forward_send(request.sender_id, connection_id, request.payload.clone())
 555        })?;
 556        Ok(())
 557    }
 558
 559    async fn update_language_server(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::UpdateLanguageServer>,
 562    ) -> tide::Result<()> {
 563        let receiver_ids = self
 564            .state()
 565            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn forward_project_request<T>(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<T>,
 576    ) -> tide::Result<T::Response>
 577    where
 578        T: EntityMessage + RequestMessage,
 579    {
 580        let host_connection_id = self
 581            .state()
 582            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 583            .host_connection_id;
 584        Ok(self
 585            .peer
 586            .forward_request(request.sender_id, host_connection_id, request.payload)
 587            .await?)
 588    }
 589
 590    async fn save_buffer(
 591        self: Arc<Server>,
 592        request: TypedEnvelope<proto::SaveBuffer>,
 593    ) -> tide::Result<proto::BufferSaved> {
 594        let host;
 595        let mut guests;
 596        {
 597            let state = self.state();
 598            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 599            host = project.host_connection_id;
 600            guests = project.guest_connection_ids()
 601        }
 602
 603        let response = self
 604            .peer
 605            .forward_request(request.sender_id, host, request.payload.clone())
 606            .await?;
 607
 608        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 609        broadcast(host, guests, |conn_id| {
 610            self.peer.forward_send(host, conn_id, response.clone())
 611        })?;
 612
 613        Ok(response)
 614    }
 615
 616    async fn update_buffer(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::UpdateBuffer>,
 619    ) -> tide::Result<proto::Ack> {
 620        let receiver_ids = self
 621            .state()
 622            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 623        broadcast(request.sender_id, receiver_ids, |connection_id| {
 624            self.peer
 625                .forward_send(request.sender_id, connection_id, request.payload.clone())
 626        })?;
 627        Ok(proto::Ack {})
 628    }
 629
 630    async fn update_buffer_file(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::UpdateBufferFile>,
 633    ) -> tide::Result<()> {
 634        let receiver_ids = self
 635            .state()
 636            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 637        broadcast(request.sender_id, receiver_ids, |connection_id| {
 638            self.peer
 639                .forward_send(request.sender_id, connection_id, request.payload.clone())
 640        })?;
 641        Ok(())
 642    }
 643
 644    async fn buffer_reloaded(
 645        self: Arc<Server>,
 646        request: TypedEnvelope<proto::BufferReloaded>,
 647    ) -> tide::Result<()> {
 648        let receiver_ids = self
 649            .state()
 650            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 651        broadcast(request.sender_id, receiver_ids, |connection_id| {
 652            self.peer
 653                .forward_send(request.sender_id, connection_id, request.payload.clone())
 654        })?;
 655        Ok(())
 656    }
 657
 658    async fn buffer_saved(
 659        self: Arc<Server>,
 660        request: TypedEnvelope<proto::BufferSaved>,
 661    ) -> tide::Result<()> {
 662        let receiver_ids = self
 663            .state()
 664            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 665        broadcast(request.sender_id, receiver_ids, |connection_id| {
 666            self.peer
 667                .forward_send(request.sender_id, connection_id, request.payload.clone())
 668        })?;
 669        Ok(())
 670    }
 671
 672    async fn get_channels(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::GetChannels>,
 675    ) -> tide::Result<proto::GetChannelsResponse> {
 676        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 677        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 678        Ok(proto::GetChannelsResponse {
 679            channels: channels
 680                .into_iter()
 681                .map(|chan| proto::Channel {
 682                    id: chan.id.to_proto(),
 683                    name: chan.name,
 684                })
 685                .collect(),
 686        })
 687    }
 688
 689    async fn get_users(
 690        self: Arc<Server>,
 691        request: TypedEnvelope<proto::GetUsers>,
 692    ) -> tide::Result<proto::GetUsersResponse> {
 693        let user_ids = request
 694            .payload
 695            .user_ids
 696            .into_iter()
 697            .map(UserId::from_proto)
 698            .collect();
 699        let users = self
 700            .app_state
 701            .db
 702            .get_users_by_ids(user_ids)
 703            .await?
 704            .into_iter()
 705            .map(|user| proto::User {
 706                id: user.id.to_proto(),
 707                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 708                github_login: user.github_login,
 709            })
 710            .collect();
 711        Ok(proto::GetUsersResponse { users })
 712    }
 713
 714    fn update_contacts_for_users<'a>(
 715        self: &Arc<Server>,
 716        user_ids: impl IntoIterator<Item = &'a UserId>,
 717    ) -> anyhow::Result<()> {
 718        let mut result = Ok(());
 719        let state = self.state();
 720        for user_id in user_ids {
 721            let contacts = state.contacts_for_user(*user_id);
 722            for connection_id in state.connection_ids_for_user(*user_id) {
 723                if let Err(error) = self.peer.send(
 724                    connection_id,
 725                    proto::UpdateContacts {
 726                        contacts: contacts.clone(),
 727                    },
 728                ) {
 729                    result = Err(error);
 730                }
 731            }
 732        }
 733        result
 734    }
 735
 736    async fn join_channel(
 737        mut self: Arc<Self>,
 738        request: TypedEnvelope<proto::JoinChannel>,
 739    ) -> tide::Result<proto::JoinChannelResponse> {
 740        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 741        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 742        if !self
 743            .app_state
 744            .db
 745            .can_user_access_channel(user_id, channel_id)
 746            .await?
 747        {
 748            Err(anyhow!("access denied"))?;
 749        }
 750
 751        self.state_mut().join_channel(request.sender_id, channel_id);
 752        let messages = self
 753            .app_state
 754            .db
 755            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 756            .await?
 757            .into_iter()
 758            .map(|msg| proto::ChannelMessage {
 759                id: msg.id.to_proto(),
 760                body: msg.body,
 761                timestamp: msg.sent_at.unix_timestamp() as u64,
 762                sender_id: msg.sender_id.to_proto(),
 763                nonce: Some(msg.nonce.as_u128().into()),
 764            })
 765            .collect::<Vec<_>>();
 766        Ok(proto::JoinChannelResponse {
 767            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 768            messages,
 769        })
 770    }
 771
 772    async fn leave_channel(
 773        mut self: Arc<Self>,
 774        request: TypedEnvelope<proto::LeaveChannel>,
 775    ) -> tide::Result<()> {
 776        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 777        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 778        if !self
 779            .app_state
 780            .db
 781            .can_user_access_channel(user_id, channel_id)
 782            .await?
 783        {
 784            Err(anyhow!("access denied"))?;
 785        }
 786
 787        self.state_mut()
 788            .leave_channel(request.sender_id, channel_id);
 789
 790        Ok(())
 791    }
 792
 793    async fn send_channel_message(
 794        self: Arc<Self>,
 795        request: TypedEnvelope<proto::SendChannelMessage>,
 796    ) -> tide::Result<proto::SendChannelMessageResponse> {
 797        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 798        let user_id;
 799        let connection_ids;
 800        {
 801            let state = self.state();
 802            user_id = state.user_id_for_connection(request.sender_id)?;
 803            connection_ids = state.channel_connection_ids(channel_id)?;
 804        }
 805
 806        // Validate the message body.
 807        let body = request.payload.body.trim().to_string();
 808        if body.len() > MAX_MESSAGE_LEN {
 809            return Err(anyhow!("message is too long"))?;
 810        }
 811        if body.is_empty() {
 812            return Err(anyhow!("message can't be blank"))?;
 813        }
 814
 815        let timestamp = OffsetDateTime::now_utc();
 816        let nonce = request
 817            .payload
 818            .nonce
 819            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 820
 821        let message_id = self
 822            .app_state
 823            .db
 824            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 825            .await?
 826            .to_proto();
 827        let message = proto::ChannelMessage {
 828            sender_id: user_id.to_proto(),
 829            id: message_id,
 830            body,
 831            timestamp: timestamp.unix_timestamp() as u64,
 832            nonce: Some(nonce),
 833        };
 834        broadcast(request.sender_id, connection_ids, |conn_id| {
 835            self.peer.send(
 836                conn_id,
 837                proto::ChannelMessageSent {
 838                    channel_id: channel_id.to_proto(),
 839                    message: Some(message.clone()),
 840                },
 841            )
 842        })?;
 843        Ok(proto::SendChannelMessageResponse {
 844            message: Some(message),
 845        })
 846    }
 847
 848    async fn get_channel_messages(
 849        self: Arc<Self>,
 850        request: TypedEnvelope<proto::GetChannelMessages>,
 851    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 852        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 853        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 854        if !self
 855            .app_state
 856            .db
 857            .can_user_access_channel(user_id, channel_id)
 858            .await?
 859        {
 860            Err(anyhow!("access denied"))?;
 861        }
 862
 863        let messages = self
 864            .app_state
 865            .db
 866            .get_channel_messages(
 867                channel_id,
 868                MESSAGE_COUNT_PER_PAGE,
 869                Some(MessageId::from_proto(request.payload.before_message_id)),
 870            )
 871            .await?
 872            .into_iter()
 873            .map(|msg| proto::ChannelMessage {
 874                id: msg.id.to_proto(),
 875                body: msg.body,
 876                timestamp: msg.sent_at.unix_timestamp() as u64,
 877                sender_id: msg.sender_id.to_proto(),
 878                nonce: Some(msg.nonce.as_u128().into()),
 879            })
 880            .collect::<Vec<_>>();
 881
 882        Ok(proto::GetChannelMessagesResponse {
 883            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 884            messages,
 885        })
 886    }
 887
 888    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 889        self.store.read()
 890    }
 891
 892    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 893        self.store.write()
 894    }
 895}
 896
 897impl Executor for RealExecutor {
 898    type Timer = Timer;
 899
 900    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 901        task::spawn(future);
 902    }
 903
 904    fn timer(&self, duration: Duration) -> Self::Timer {
 905        Timer::after(duration)
 906    }
 907}
 908
 909fn broadcast<F>(
 910    sender_id: ConnectionId,
 911    receiver_ids: Vec<ConnectionId>,
 912    mut f: F,
 913) -> anyhow::Result<()>
 914where
 915    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 916{
 917    let mut result = Ok(());
 918    for receiver_id in receiver_ids {
 919        if receiver_id != sender_id {
 920            if let Err(error) = f(receiver_id) {
 921                if result.is_ok() {
 922                    result = Err(error);
 923                }
 924            }
 925        }
 926    }
 927    result
 928}
 929
 930pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 931    let server = Server::new(app.state().clone(), rpc.clone(), None);
 932    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 933        let server = server.clone();
 934        async move {
 935            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 936
 937            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 938            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 939            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 940            let client_protocol_version: Option<u32> = request
 941                .header("X-Zed-Protocol-Version")
 942                .and_then(|v| v.as_str().parse().ok());
 943
 944            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 945                return Ok(Response::new(StatusCode::UpgradeRequired));
 946            }
 947
 948            let header = match request.header("Sec-Websocket-Key") {
 949                Some(h) => h.as_str(),
 950                None => return Err(anyhow!("expected sec-websocket-key"))?,
 951            };
 952
 953            let user_id = process_auth_header(&request).await?;
 954
 955            let mut response = Response::new(StatusCode::SwitchingProtocols);
 956            response.insert_header(UPGRADE, "websocket");
 957            response.insert_header(CONNECTION, "Upgrade");
 958            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 959            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 960            response.insert_header("Sec-Websocket-Version", "13");
 961
 962            let http_res: &mut tide::http::Response = response.as_mut();
 963            let upgrade_receiver = http_res.recv_upgrade().await;
 964            let addr = request.remote().unwrap_or("unknown").to_string();
 965            task::spawn(async move {
 966                if let Some(stream) = upgrade_receiver.await {
 967                    server
 968                        .handle_connection(
 969                            Connection::new(
 970                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 971                            ),
 972                            addr,
 973                            user_id,
 974                            None,
 975                            RealExecutor,
 976                        )
 977                        .await;
 978                }
 979            });
 980
 981            Ok(response)
 982        }
 983    });
 984}
 985
 986fn header_contains_ignore_case<T>(
 987    request: &tide::Request<T>,
 988    header_name: HeaderName,
 989    value: &str,
 990) -> bool {
 991    request
 992        .header(header_name)
 993        .map(|h| {
 994            h.as_str()
 995                .split(',')
 996                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 997        })
 998        .unwrap_or(false)
 999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004    use crate::{
1005        auth,
1006        db::{tests::TestDb, UserId},
1007        github, AppState, Config,
1008    };
1009    use ::rpc::Peer;
1010    use client::{
1011        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1012        EstablishConnectionError, UserStore,
1013    };
1014    use collections::BTreeMap;
1015    use editor::{
1016        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1017        ToOffset, ToggleCodeActions, Undo,
1018    };
1019    use gpui::{executor, ModelHandle, TestAppContext};
1020    use language::{
1021        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1022        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1023    };
1024    use lsp;
1025    use parking_lot::Mutex;
1026    use postage::barrier;
1027    use project::{
1028        fs::{FakeFs, Fs as _},
1029        search::SearchQuery,
1030        worktree::WorktreeHandle,
1031        DiagnosticSummary, Project, ProjectPath,
1032    };
1033    use rand::prelude::*;
1034    use rpc::PeerId;
1035    use serde_json::json;
1036    use sqlx::types::time::OffsetDateTime;
1037    use std::{
1038        cell::Cell,
1039        env,
1040        ops::Deref,
1041        path::{Path, PathBuf},
1042        rc::Rc,
1043        sync::{
1044            atomic::{AtomicBool, Ordering::SeqCst},
1045            Arc,
1046        },
1047        time::Duration,
1048    };
1049    use workspace::{Settings, Workspace, WorkspaceParams};
1050
1051    #[cfg(test)]
1052    #[ctor::ctor]
1053    fn init_logger() {
1054        if std::env::var("RUST_LOG").is_ok() {
1055            env_logger::init();
1056        }
1057    }
1058
1059    #[gpui::test(iterations = 10)]
1060    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1061        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1062        let lang_registry = Arc::new(LanguageRegistry::test());
1063        let fs = FakeFs::new(cx_a.background());
1064        cx_a.foreground().forbid_parking();
1065
1066        // Connect to a server as 2 clients.
1067        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1068        let client_a = server.create_client(cx_a, "user_a").await;
1069        let client_b = server.create_client(cx_b, "user_b").await;
1070
1071        // Share a project as client A
1072        fs.insert_tree(
1073            "/a",
1074            json!({
1075                ".zed.toml": r#"collaborators = ["user_b"]"#,
1076                "a.txt": "a-contents",
1077                "b.txt": "b-contents",
1078            }),
1079        )
1080        .await;
1081        let project_a = cx_a.update(|cx| {
1082            Project::local(
1083                client_a.clone(),
1084                client_a.user_store.clone(),
1085                lang_registry.clone(),
1086                fs.clone(),
1087                cx,
1088            )
1089        });
1090        let (worktree_a, _) = project_a
1091            .update(cx_a, |p, cx| {
1092                p.find_or_create_local_worktree("/a", true, cx)
1093            })
1094            .await
1095            .unwrap();
1096        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1097        worktree_a
1098            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1099            .await;
1100        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1101        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1102
1103        // Join that project as client B
1104        let project_b = Project::remote(
1105            project_id,
1106            client_b.clone(),
1107            client_b.user_store.clone(),
1108            lang_registry.clone(),
1109            fs.clone(),
1110            &mut cx_b.to_async(),
1111        )
1112        .await
1113        .unwrap();
1114
1115        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1116            assert_eq!(
1117                project
1118                    .collaborators()
1119                    .get(&client_a.peer_id)
1120                    .unwrap()
1121                    .user
1122                    .github_login,
1123                "user_a"
1124            );
1125            project.replica_id()
1126        });
1127        project_a
1128            .condition(&cx_a, |tree, _| {
1129                tree.collaborators()
1130                    .get(&client_b.peer_id)
1131                    .map_or(false, |collaborator| {
1132                        collaborator.replica_id == replica_id_b
1133                            && collaborator.user.github_login == "user_b"
1134                    })
1135            })
1136            .await;
1137
1138        // Open the same file as client B and client A.
1139        let buffer_b = project_b
1140            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1141            .await
1142            .unwrap();
1143        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1144        project_a.read_with(cx_a, |project, cx| {
1145            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1146        });
1147        let buffer_a = project_a
1148            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1149            .await
1150            .unwrap();
1151
1152        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1153
1154        // TODO
1155        // // Create a selection set as client B and see that selection set as client A.
1156        // buffer_a
1157        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1158        //     .await;
1159
1160        // Edit the buffer as client B and see that edit as client A.
1161        editor_b.update(cx_b, |editor, cx| {
1162            editor.handle_input(&Input("ok, ".into()), cx)
1163        });
1164        buffer_a
1165            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1166            .await;
1167
1168        // TODO
1169        // // Remove the selection set as client B, see those selections disappear as client A.
1170        cx_b.update(move |_| drop(editor_b));
1171        // buffer_a
1172        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1173        //     .await;
1174
1175        // Dropping the client B's project removes client B from client A's collaborators.
1176        cx_b.update(move |_| drop(project_b));
1177        project_a
1178            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1179            .await;
1180    }
1181
1182    #[gpui::test(iterations = 10)]
1183    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1184        let lang_registry = Arc::new(LanguageRegistry::test());
1185        let fs = FakeFs::new(cx_a.background());
1186        cx_a.foreground().forbid_parking();
1187
1188        // Connect to a server as 2 clients.
1189        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1190        let client_a = server.create_client(cx_a, "user_a").await;
1191        let client_b = server.create_client(cx_b, "user_b").await;
1192
1193        // Share a project as client A
1194        fs.insert_tree(
1195            "/a",
1196            json!({
1197                ".zed.toml": r#"collaborators = ["user_b"]"#,
1198                "a.txt": "a-contents",
1199                "b.txt": "b-contents",
1200            }),
1201        )
1202        .await;
1203        let project_a = cx_a.update(|cx| {
1204            Project::local(
1205                client_a.clone(),
1206                client_a.user_store.clone(),
1207                lang_registry.clone(),
1208                fs.clone(),
1209                cx,
1210            )
1211        });
1212        let (worktree_a, _) = project_a
1213            .update(cx_a, |p, cx| {
1214                p.find_or_create_local_worktree("/a", true, cx)
1215            })
1216            .await
1217            .unwrap();
1218        worktree_a
1219            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1220            .await;
1221        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1222        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1223        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1224        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1225
1226        // Join that project as client B
1227        let project_b = Project::remote(
1228            project_id,
1229            client_b.clone(),
1230            client_b.user_store.clone(),
1231            lang_registry.clone(),
1232            fs.clone(),
1233            &mut cx_b.to_async(),
1234        )
1235        .await
1236        .unwrap();
1237        project_b
1238            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1239            .await
1240            .unwrap();
1241
1242        // Unshare the project as client A
1243        project_a
1244            .update(cx_a, |project, cx| project.unshare(cx))
1245            .await
1246            .unwrap();
1247        project_b
1248            .condition(cx_b, |project, _| project.is_read_only())
1249            .await;
1250        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1251        cx_b.update(|_| {
1252            drop(project_b);
1253        });
1254
1255        // Share the project again and ensure guests can still join.
1256        project_a
1257            .update(cx_a, |project, cx| project.share(cx))
1258            .await
1259            .unwrap();
1260        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1261
1262        let project_b2 = Project::remote(
1263            project_id,
1264            client_b.clone(),
1265            client_b.user_store.clone(),
1266            lang_registry.clone(),
1267            fs.clone(),
1268            &mut cx_b.to_async(),
1269        )
1270        .await
1271        .unwrap();
1272        project_b2
1273            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1274            .await
1275            .unwrap();
1276    }
1277
1278    #[gpui::test(iterations = 10)]
1279    async fn test_propagate_saves_and_fs_changes(
1280        cx_a: &mut TestAppContext,
1281        cx_b: &mut TestAppContext,
1282        cx_c: &mut TestAppContext,
1283    ) {
1284        let lang_registry = Arc::new(LanguageRegistry::test());
1285        let fs = FakeFs::new(cx_a.background());
1286        cx_a.foreground().forbid_parking();
1287
1288        // Connect to a server as 3 clients.
1289        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1290        let client_a = server.create_client(cx_a, "user_a").await;
1291        let client_b = server.create_client(cx_b, "user_b").await;
1292        let client_c = server.create_client(cx_c, "user_c").await;
1293
1294        // Share a worktree as client A.
1295        fs.insert_tree(
1296            "/a",
1297            json!({
1298                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1299                "file1": "",
1300                "file2": ""
1301            }),
1302        )
1303        .await;
1304        let project_a = cx_a.update(|cx| {
1305            Project::local(
1306                client_a.clone(),
1307                client_a.user_store.clone(),
1308                lang_registry.clone(),
1309                fs.clone(),
1310                cx,
1311            )
1312        });
1313        let (worktree_a, _) = project_a
1314            .update(cx_a, |p, cx| {
1315                p.find_or_create_local_worktree("/a", true, cx)
1316            })
1317            .await
1318            .unwrap();
1319        worktree_a
1320            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1321            .await;
1322        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1323        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1324        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1325
1326        // Join that worktree as clients B and C.
1327        let project_b = Project::remote(
1328            project_id,
1329            client_b.clone(),
1330            client_b.user_store.clone(),
1331            lang_registry.clone(),
1332            fs.clone(),
1333            &mut cx_b.to_async(),
1334        )
1335        .await
1336        .unwrap();
1337        let project_c = Project::remote(
1338            project_id,
1339            client_c.clone(),
1340            client_c.user_store.clone(),
1341            lang_registry.clone(),
1342            fs.clone(),
1343            &mut cx_c.to_async(),
1344        )
1345        .await
1346        .unwrap();
1347        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1348        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1349
1350        // Open and edit a buffer as both guests B and C.
1351        let buffer_b = project_b
1352            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1353            .await
1354            .unwrap();
1355        let buffer_c = project_c
1356            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1357            .await
1358            .unwrap();
1359        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1360        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1361
1362        // Open and edit that buffer as the host.
1363        let buffer_a = project_a
1364            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1365            .await
1366            .unwrap();
1367
1368        buffer_a
1369            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1370            .await;
1371        buffer_a.update(cx_a, |buf, cx| {
1372            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1373        });
1374
1375        // Wait for edits to propagate
1376        buffer_a
1377            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1378            .await;
1379        buffer_b
1380            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1381            .await;
1382        buffer_c
1383            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1384            .await;
1385
1386        // Edit the buffer as the host and concurrently save as guest B.
1387        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1388        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1389        save_b.await.unwrap();
1390        assert_eq!(
1391            fs.load("/a/file1".as_ref()).await.unwrap(),
1392            "hi-a, i-am-c, i-am-b, i-am-a"
1393        );
1394        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1395        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1396        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1397
1398        worktree_a.flush_fs_events(cx_a).await;
1399
1400        // Make changes on host's file system, see those changes on guest worktrees.
1401        fs.rename(
1402            "/a/file1".as_ref(),
1403            "/a/file1-renamed".as_ref(),
1404            Default::default(),
1405        )
1406        .await
1407        .unwrap();
1408
1409        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1410            .await
1411            .unwrap();
1412        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1413
1414        worktree_a
1415            .condition(&cx_a, |tree, _| {
1416                tree.paths()
1417                    .map(|p| p.to_string_lossy())
1418                    .collect::<Vec<_>>()
1419                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1420            })
1421            .await;
1422        worktree_b
1423            .condition(&cx_b, |tree, _| {
1424                tree.paths()
1425                    .map(|p| p.to_string_lossy())
1426                    .collect::<Vec<_>>()
1427                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1428            })
1429            .await;
1430        worktree_c
1431            .condition(&cx_c, |tree, _| {
1432                tree.paths()
1433                    .map(|p| p.to_string_lossy())
1434                    .collect::<Vec<_>>()
1435                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1436            })
1437            .await;
1438
1439        // Ensure buffer files are updated as well.
1440        buffer_a
1441            .condition(&cx_a, |buf, _| {
1442                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1443            })
1444            .await;
1445        buffer_b
1446            .condition(&cx_b, |buf, _| {
1447                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1448            })
1449            .await;
1450        buffer_c
1451            .condition(&cx_c, |buf, _| {
1452                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1453            })
1454            .await;
1455    }
1456
1457    #[gpui::test(iterations = 10)]
1458    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1459        cx_a.foreground().forbid_parking();
1460        let lang_registry = Arc::new(LanguageRegistry::test());
1461        let fs = FakeFs::new(cx_a.background());
1462
1463        // Connect to a server as 2 clients.
1464        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1465        let client_a = server.create_client(cx_a, "user_a").await;
1466        let client_b = server.create_client(cx_b, "user_b").await;
1467
1468        // Share a project as client A
1469        fs.insert_tree(
1470            "/dir",
1471            json!({
1472                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1473                "a.txt": "a-contents",
1474            }),
1475        )
1476        .await;
1477
1478        let project_a = cx_a.update(|cx| {
1479            Project::local(
1480                client_a.clone(),
1481                client_a.user_store.clone(),
1482                lang_registry.clone(),
1483                fs.clone(),
1484                cx,
1485            )
1486        });
1487        let (worktree_a, _) = project_a
1488            .update(cx_a, |p, cx| {
1489                p.find_or_create_local_worktree("/dir", true, cx)
1490            })
1491            .await
1492            .unwrap();
1493        worktree_a
1494            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1495            .await;
1496        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1497        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1498        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1499
1500        // Join that project as client B
1501        let project_b = Project::remote(
1502            project_id,
1503            client_b.clone(),
1504            client_b.user_store.clone(),
1505            lang_registry.clone(),
1506            fs.clone(),
1507            &mut cx_b.to_async(),
1508        )
1509        .await
1510        .unwrap();
1511
1512        // Open a buffer as client B
1513        let buffer_b = project_b
1514            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1515            .await
1516            .unwrap();
1517
1518        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1519        buffer_b.read_with(cx_b, |buf, _| {
1520            assert!(buf.is_dirty());
1521            assert!(!buf.has_conflict());
1522        });
1523
1524        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1525        buffer_b
1526            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1527            .await;
1528        buffer_b.read_with(cx_b, |buf, _| {
1529            assert!(!buf.has_conflict());
1530        });
1531
1532        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1533        buffer_b.read_with(cx_b, |buf, _| {
1534            assert!(buf.is_dirty());
1535            assert!(!buf.has_conflict());
1536        });
1537    }
1538
1539    #[gpui::test(iterations = 10)]
1540    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1541        cx_a.foreground().forbid_parking();
1542        let lang_registry = Arc::new(LanguageRegistry::test());
1543        let fs = FakeFs::new(cx_a.background());
1544
1545        // Connect to a server as 2 clients.
1546        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1547        let client_a = server.create_client(cx_a, "user_a").await;
1548        let client_b = server.create_client(cx_b, "user_b").await;
1549
1550        // Share a project as client A
1551        fs.insert_tree(
1552            "/dir",
1553            json!({
1554                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1555                "a.txt": "a-contents",
1556            }),
1557        )
1558        .await;
1559
1560        let project_a = cx_a.update(|cx| {
1561            Project::local(
1562                client_a.clone(),
1563                client_a.user_store.clone(),
1564                lang_registry.clone(),
1565                fs.clone(),
1566                cx,
1567            )
1568        });
1569        let (worktree_a, _) = project_a
1570            .update(cx_a, |p, cx| {
1571                p.find_or_create_local_worktree("/dir", true, cx)
1572            })
1573            .await
1574            .unwrap();
1575        worktree_a
1576            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1577            .await;
1578        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1579        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1580        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1581
1582        // Join that project as client B
1583        let project_b = Project::remote(
1584            project_id,
1585            client_b.clone(),
1586            client_b.user_store.clone(),
1587            lang_registry.clone(),
1588            fs.clone(),
1589            &mut cx_b.to_async(),
1590        )
1591        .await
1592        .unwrap();
1593        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1594
1595        // Open a buffer as client B
1596        let buffer_b = project_b
1597            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1598            .await
1599            .unwrap();
1600        buffer_b.read_with(cx_b, |buf, _| {
1601            assert!(!buf.is_dirty());
1602            assert!(!buf.has_conflict());
1603        });
1604
1605        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1606            .await
1607            .unwrap();
1608        buffer_b
1609            .condition(&cx_b, |buf, _| {
1610                buf.text() == "new contents" && !buf.is_dirty()
1611            })
1612            .await;
1613        buffer_b.read_with(cx_b, |buf, _| {
1614            assert!(!buf.has_conflict());
1615        });
1616    }
1617
1618    #[gpui::test(iterations = 10)]
1619    async fn test_editing_while_guest_opens_buffer(
1620        cx_a: &mut TestAppContext,
1621        cx_b: &mut TestAppContext,
1622    ) {
1623        cx_a.foreground().forbid_parking();
1624        let lang_registry = Arc::new(LanguageRegistry::test());
1625        let fs = FakeFs::new(cx_a.background());
1626
1627        // Connect to a server as 2 clients.
1628        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1629        let client_a = server.create_client(cx_a, "user_a").await;
1630        let client_b = server.create_client(cx_b, "user_b").await;
1631
1632        // Share a project as client A
1633        fs.insert_tree(
1634            "/dir",
1635            json!({
1636                ".zed.toml": r#"collaborators = ["user_b"]"#,
1637                "a.txt": "a-contents",
1638            }),
1639        )
1640        .await;
1641        let project_a = cx_a.update(|cx| {
1642            Project::local(
1643                client_a.clone(),
1644                client_a.user_store.clone(),
1645                lang_registry.clone(),
1646                fs.clone(),
1647                cx,
1648            )
1649        });
1650        let (worktree_a, _) = project_a
1651            .update(cx_a, |p, cx| {
1652                p.find_or_create_local_worktree("/dir", true, cx)
1653            })
1654            .await
1655            .unwrap();
1656        worktree_a
1657            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1658            .await;
1659        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1660        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1661        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1662
1663        // Join that project as client B
1664        let project_b = Project::remote(
1665            project_id,
1666            client_b.clone(),
1667            client_b.user_store.clone(),
1668            lang_registry.clone(),
1669            fs.clone(),
1670            &mut cx_b.to_async(),
1671        )
1672        .await
1673        .unwrap();
1674
1675        // Open a buffer as client A
1676        let buffer_a = project_a
1677            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1678            .await
1679            .unwrap();
1680
1681        // Start opening the same buffer as client B
1682        let buffer_b = cx_b
1683            .background()
1684            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1685
1686        // Edit the buffer as client A while client B is still opening it.
1687        cx_b.background().simulate_random_delay().await;
1688        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1689        cx_b.background().simulate_random_delay().await;
1690        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1691
1692        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1693        let buffer_b = buffer_b.await.unwrap();
1694        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1695    }
1696
1697    #[gpui::test(iterations = 10)]
1698    async fn test_leaving_worktree_while_opening_buffer(
1699        cx_a: &mut TestAppContext,
1700        cx_b: &mut TestAppContext,
1701    ) {
1702        cx_a.foreground().forbid_parking();
1703        let lang_registry = Arc::new(LanguageRegistry::test());
1704        let fs = FakeFs::new(cx_a.background());
1705
1706        // Connect to a server as 2 clients.
1707        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1708        let client_a = server.create_client(cx_a, "user_a").await;
1709        let client_b = server.create_client(cx_b, "user_b").await;
1710
1711        // Share a project as client A
1712        fs.insert_tree(
1713            "/dir",
1714            json!({
1715                ".zed.toml": r#"collaborators = ["user_b"]"#,
1716                "a.txt": "a-contents",
1717            }),
1718        )
1719        .await;
1720        let project_a = cx_a.update(|cx| {
1721            Project::local(
1722                client_a.clone(),
1723                client_a.user_store.clone(),
1724                lang_registry.clone(),
1725                fs.clone(),
1726                cx,
1727            )
1728        });
1729        let (worktree_a, _) = project_a
1730            .update(cx_a, |p, cx| {
1731                p.find_or_create_local_worktree("/dir", true, cx)
1732            })
1733            .await
1734            .unwrap();
1735        worktree_a
1736            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1737            .await;
1738        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1739        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1740        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1741
1742        // Join that project as client B
1743        let project_b = Project::remote(
1744            project_id,
1745            client_b.clone(),
1746            client_b.user_store.clone(),
1747            lang_registry.clone(),
1748            fs.clone(),
1749            &mut cx_b.to_async(),
1750        )
1751        .await
1752        .unwrap();
1753
1754        // See that a guest has joined as client A.
1755        project_a
1756            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1757            .await;
1758
1759        // Begin opening a buffer as client B, but leave the project before the open completes.
1760        let buffer_b = cx_b
1761            .background()
1762            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1763        cx_b.update(|_| drop(project_b));
1764        drop(buffer_b);
1765
1766        // See that the guest has left.
1767        project_a
1768            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1769            .await;
1770    }
1771
1772    #[gpui::test(iterations = 10)]
1773    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1774        cx_a.foreground().forbid_parking();
1775        let lang_registry = Arc::new(LanguageRegistry::test());
1776        let fs = FakeFs::new(cx_a.background());
1777
1778        // Connect to a server as 2 clients.
1779        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1780        let client_a = server.create_client(cx_a, "user_a").await;
1781        let client_b = server.create_client(cx_b, "user_b").await;
1782
1783        // Share a project as client A
1784        fs.insert_tree(
1785            "/a",
1786            json!({
1787                ".zed.toml": r#"collaborators = ["user_b"]"#,
1788                "a.txt": "a-contents",
1789                "b.txt": "b-contents",
1790            }),
1791        )
1792        .await;
1793        let project_a = cx_a.update(|cx| {
1794            Project::local(
1795                client_a.clone(),
1796                client_a.user_store.clone(),
1797                lang_registry.clone(),
1798                fs.clone(),
1799                cx,
1800            )
1801        });
1802        let (worktree_a, _) = project_a
1803            .update(cx_a, |p, cx| {
1804                p.find_or_create_local_worktree("/a", true, cx)
1805            })
1806            .await
1807            .unwrap();
1808        worktree_a
1809            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1810            .await;
1811        let project_id = project_a
1812            .update(cx_a, |project, _| project.next_remote_id())
1813            .await;
1814        project_a
1815            .update(cx_a, |project, cx| project.share(cx))
1816            .await
1817            .unwrap();
1818
1819        // Join that project as client B
1820        let _project_b = Project::remote(
1821            project_id,
1822            client_b.clone(),
1823            client_b.user_store.clone(),
1824            lang_registry.clone(),
1825            fs.clone(),
1826            &mut cx_b.to_async(),
1827        )
1828        .await
1829        .unwrap();
1830
1831        // Client A sees that a guest has joined.
1832        project_a
1833            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1834            .await;
1835
1836        // Drop client B's connection and ensure client A observes client B leaving the project.
1837        client_b.disconnect(&cx_b.to_async()).unwrap();
1838        project_a
1839            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1840            .await;
1841
1842        // Rejoin the project as client B
1843        let _project_b = Project::remote(
1844            project_id,
1845            client_b.clone(),
1846            client_b.user_store.clone(),
1847            lang_registry.clone(),
1848            fs.clone(),
1849            &mut cx_b.to_async(),
1850        )
1851        .await
1852        .unwrap();
1853
1854        // Client A sees that a guest has re-joined.
1855        project_a
1856            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1857            .await;
1858
1859        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1860        client_b.wait_for_current_user(cx_b).await;
1861        server.disconnect_client(client_b.current_user_id(cx_b));
1862        cx_a.foreground().advance_clock(Duration::from_secs(3));
1863        project_a
1864            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1865            .await;
1866    }
1867
1868    #[gpui::test(iterations = 10)]
1869    async fn test_collaborating_with_diagnostics(
1870        cx_a: &mut TestAppContext,
1871        cx_b: &mut TestAppContext,
1872    ) {
1873        cx_a.foreground().forbid_parking();
1874        let mut lang_registry = Arc::new(LanguageRegistry::test());
1875        let fs = FakeFs::new(cx_a.background());
1876
1877        // Set up a fake language server.
1878        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1879        Arc::get_mut(&mut lang_registry)
1880            .unwrap()
1881            .add(Arc::new(Language::new(
1882                LanguageConfig {
1883                    name: "Rust".into(),
1884                    path_suffixes: vec!["rs".to_string()],
1885                    language_server: Some(language_server_config),
1886                    ..Default::default()
1887                },
1888                Some(tree_sitter_rust::language()),
1889            )));
1890
1891        // Connect to a server as 2 clients.
1892        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1893        let client_a = server.create_client(cx_a, "user_a").await;
1894        let client_b = server.create_client(cx_b, "user_b").await;
1895
1896        // Share a project as client A
1897        fs.insert_tree(
1898            "/a",
1899            json!({
1900                ".zed.toml": r#"collaborators = ["user_b"]"#,
1901                "a.rs": "let one = two",
1902                "other.rs": "",
1903            }),
1904        )
1905        .await;
1906        let project_a = cx_a.update(|cx| {
1907            Project::local(
1908                client_a.clone(),
1909                client_a.user_store.clone(),
1910                lang_registry.clone(),
1911                fs.clone(),
1912                cx,
1913            )
1914        });
1915        let (worktree_a, _) = project_a
1916            .update(cx_a, |p, cx| {
1917                p.find_or_create_local_worktree("/a", true, cx)
1918            })
1919            .await
1920            .unwrap();
1921        worktree_a
1922            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1923            .await;
1924        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1925        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1926        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1927
1928        // Cause the language server to start.
1929        let _ = cx_a
1930            .background()
1931            .spawn(project_a.update(cx_a, |project, cx| {
1932                project.open_buffer(
1933                    ProjectPath {
1934                        worktree_id,
1935                        path: Path::new("other.rs").into(),
1936                    },
1937                    cx,
1938                )
1939            }))
1940            .await
1941            .unwrap();
1942
1943        // Simulate a language server reporting errors for a file.
1944        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1945        fake_language_server
1946            .receive_notification::<lsp::notification::DidOpenTextDocument>()
1947            .await;
1948        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1949            lsp::PublishDiagnosticsParams {
1950                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1951                version: None,
1952                diagnostics: vec![lsp::Diagnostic {
1953                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1954                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1955                    message: "message 1".to_string(),
1956                    ..Default::default()
1957                }],
1958            },
1959        );
1960
1961        // Wait for server to see the diagnostics update.
1962        server
1963            .condition(|store| {
1964                let worktree = store
1965                    .project(project_id)
1966                    .unwrap()
1967                    .share
1968                    .as_ref()
1969                    .unwrap()
1970                    .worktrees
1971                    .get(&worktree_id.to_proto())
1972                    .unwrap();
1973
1974                !worktree.diagnostic_summaries.is_empty()
1975            })
1976            .await;
1977
1978        // Join the worktree as client B.
1979        let project_b = Project::remote(
1980            project_id,
1981            client_b.clone(),
1982            client_b.user_store.clone(),
1983            lang_registry.clone(),
1984            fs.clone(),
1985            &mut cx_b.to_async(),
1986        )
1987        .await
1988        .unwrap();
1989
1990        project_b.read_with(cx_b, |project, cx| {
1991            assert_eq!(
1992                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1993                &[(
1994                    ProjectPath {
1995                        worktree_id,
1996                        path: Arc::from(Path::new("a.rs")),
1997                    },
1998                    DiagnosticSummary {
1999                        error_count: 1,
2000                        warning_count: 0,
2001                        ..Default::default()
2002                    },
2003                )]
2004            )
2005        });
2006
2007        // Simulate a language server reporting more errors for a file.
2008        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2009            lsp::PublishDiagnosticsParams {
2010                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2011                version: None,
2012                diagnostics: vec![
2013                    lsp::Diagnostic {
2014                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2015                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2016                        message: "message 1".to_string(),
2017                        ..Default::default()
2018                    },
2019                    lsp::Diagnostic {
2020                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2021                        range: lsp::Range::new(
2022                            lsp::Position::new(0, 10),
2023                            lsp::Position::new(0, 13),
2024                        ),
2025                        message: "message 2".to_string(),
2026                        ..Default::default()
2027                    },
2028                ],
2029            },
2030        );
2031
2032        // Client b gets the updated summaries
2033        project_b
2034            .condition(&cx_b, |project, cx| {
2035                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2036                    == &[(
2037                        ProjectPath {
2038                            worktree_id,
2039                            path: Arc::from(Path::new("a.rs")),
2040                        },
2041                        DiagnosticSummary {
2042                            error_count: 1,
2043                            warning_count: 1,
2044                            ..Default::default()
2045                        },
2046                    )]
2047            })
2048            .await;
2049
2050        // Open the file with the errors on client B. They should be present.
2051        let buffer_b = cx_b
2052            .background()
2053            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2054            .await
2055            .unwrap();
2056
2057        buffer_b.read_with(cx_b, |buffer, _| {
2058            assert_eq!(
2059                buffer
2060                    .snapshot()
2061                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2062                    .map(|entry| entry)
2063                    .collect::<Vec<_>>(),
2064                &[
2065                    DiagnosticEntry {
2066                        range: Point::new(0, 4)..Point::new(0, 7),
2067                        diagnostic: Diagnostic {
2068                            group_id: 0,
2069                            message: "message 1".to_string(),
2070                            severity: lsp::DiagnosticSeverity::ERROR,
2071                            is_primary: true,
2072                            ..Default::default()
2073                        }
2074                    },
2075                    DiagnosticEntry {
2076                        range: Point::new(0, 10)..Point::new(0, 13),
2077                        diagnostic: Diagnostic {
2078                            group_id: 1,
2079                            severity: lsp::DiagnosticSeverity::WARNING,
2080                            message: "message 2".to_string(),
2081                            is_primary: true,
2082                            ..Default::default()
2083                        }
2084                    }
2085                ]
2086            );
2087        });
2088    }
2089
2090    #[gpui::test(iterations = 10)]
2091    async fn test_collaborating_with_completion(
2092        cx_a: &mut TestAppContext,
2093        cx_b: &mut TestAppContext,
2094    ) {
2095        cx_a.foreground().forbid_parking();
2096        let mut lang_registry = Arc::new(LanguageRegistry::test());
2097        let fs = FakeFs::new(cx_a.background());
2098
2099        // Set up a fake language server.
2100        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2101        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2102            completion_provider: Some(lsp::CompletionOptions {
2103                trigger_characters: Some(vec![".".to_string()]),
2104                ..Default::default()
2105            }),
2106            ..Default::default()
2107        });
2108        Arc::get_mut(&mut lang_registry)
2109            .unwrap()
2110            .add(Arc::new(Language::new(
2111                LanguageConfig {
2112                    name: "Rust".into(),
2113                    path_suffixes: vec!["rs".to_string()],
2114                    language_server: Some(language_server_config),
2115                    ..Default::default()
2116                },
2117                Some(tree_sitter_rust::language()),
2118            )));
2119
2120        // Connect to a server as 2 clients.
2121        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2122        let client_a = server.create_client(cx_a, "user_a").await;
2123        let client_b = server.create_client(cx_b, "user_b").await;
2124
2125        // Share a project as client A
2126        fs.insert_tree(
2127            "/a",
2128            json!({
2129                ".zed.toml": r#"collaborators = ["user_b"]"#,
2130                "main.rs": "fn main() { a }",
2131                "other.rs": "",
2132            }),
2133        )
2134        .await;
2135        let project_a = cx_a.update(|cx| {
2136            Project::local(
2137                client_a.clone(),
2138                client_a.user_store.clone(),
2139                lang_registry.clone(),
2140                fs.clone(),
2141                cx,
2142            )
2143        });
2144        let (worktree_a, _) = project_a
2145            .update(cx_a, |p, cx| {
2146                p.find_or_create_local_worktree("/a", true, cx)
2147            })
2148            .await
2149            .unwrap();
2150        worktree_a
2151            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2152            .await;
2153        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2154        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2155        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2156
2157        // Join the worktree as client B.
2158        let project_b = Project::remote(
2159            project_id,
2160            client_b.clone(),
2161            client_b.user_store.clone(),
2162            lang_registry.clone(),
2163            fs.clone(),
2164            &mut cx_b.to_async(),
2165        )
2166        .await
2167        .unwrap();
2168
2169        // Open a file in an editor as the guest.
2170        let buffer_b = project_b
2171            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2172            .await
2173            .unwrap();
2174        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2175        let editor_b = cx_b.add_view(window_b, |cx| {
2176            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2177        });
2178
2179        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2180        buffer_b
2181            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2182            .await;
2183
2184        // Type a completion trigger character as the guest.
2185        editor_b.update(cx_b, |editor, cx| {
2186            editor.select_ranges([13..13], None, cx);
2187            editor.handle_input(&Input(".".into()), cx);
2188            cx.focus(&editor_b);
2189        });
2190
2191        // Receive a completion request as the host's language server.
2192        // Return some completions from the host's language server.
2193        cx_a.foreground().start_waiting();
2194        fake_language_server
2195            .handle_request::<lsp::request::Completion, _>(|params, _| {
2196                assert_eq!(
2197                    params.text_document_position.text_document.uri,
2198                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2199                );
2200                assert_eq!(
2201                    params.text_document_position.position,
2202                    lsp::Position::new(0, 14),
2203                );
2204
2205                Some(lsp::CompletionResponse::Array(vec![
2206                    lsp::CompletionItem {
2207                        label: "first_method(…)".into(),
2208                        detail: Some("fn(&mut self, B) -> C".into()),
2209                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2210                            new_text: "first_method($1)".to_string(),
2211                            range: lsp::Range::new(
2212                                lsp::Position::new(0, 14),
2213                                lsp::Position::new(0, 14),
2214                            ),
2215                        })),
2216                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2217                        ..Default::default()
2218                    },
2219                    lsp::CompletionItem {
2220                        label: "second_method(…)".into(),
2221                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2222                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2223                            new_text: "second_method()".to_string(),
2224                            range: lsp::Range::new(
2225                                lsp::Position::new(0, 14),
2226                                lsp::Position::new(0, 14),
2227                            ),
2228                        })),
2229                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2230                        ..Default::default()
2231                    },
2232                ]))
2233            })
2234            .next()
2235            .await
2236            .unwrap();
2237        cx_a.foreground().finish_waiting();
2238
2239        // Open the buffer on the host.
2240        let buffer_a = project_a
2241            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2242            .await
2243            .unwrap();
2244        buffer_a
2245            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2246            .await;
2247
2248        // Confirm a completion on the guest.
2249        editor_b
2250            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2251            .await;
2252        editor_b.update(cx_b, |editor, cx| {
2253            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2254            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2255        });
2256
2257        // Return a resolved completion from the host's language server.
2258        // The resolved completion has an additional text edit.
2259        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2260            |params, _| {
2261                assert_eq!(params.label, "first_method(…)");
2262                lsp::CompletionItem {
2263                    label: "first_method(…)".into(),
2264                    detail: Some("fn(&mut self, B) -> C".into()),
2265                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2266                        new_text: "first_method($1)".to_string(),
2267                        range: lsp::Range::new(
2268                            lsp::Position::new(0, 14),
2269                            lsp::Position::new(0, 14),
2270                        ),
2271                    })),
2272                    additional_text_edits: Some(vec![lsp::TextEdit {
2273                        new_text: "use d::SomeTrait;\n".to_string(),
2274                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2275                    }]),
2276                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2277                    ..Default::default()
2278                }
2279            },
2280        );
2281
2282        // The additional edit is applied.
2283        buffer_a
2284            .condition(&cx_a, |buffer, _| {
2285                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2286            })
2287            .await;
2288        buffer_b
2289            .condition(&cx_b, |buffer, _| {
2290                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2291            })
2292            .await;
2293    }
2294
2295    #[gpui::test(iterations = 10)]
2296    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2297        cx_a.foreground().forbid_parking();
2298        let mut lang_registry = Arc::new(LanguageRegistry::test());
2299        let fs = FakeFs::new(cx_a.background());
2300
2301        // Set up a fake language server.
2302        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2303        Arc::get_mut(&mut lang_registry)
2304            .unwrap()
2305            .add(Arc::new(Language::new(
2306                LanguageConfig {
2307                    name: "Rust".into(),
2308                    path_suffixes: vec!["rs".to_string()],
2309                    language_server: Some(language_server_config),
2310                    ..Default::default()
2311                },
2312                Some(tree_sitter_rust::language()),
2313            )));
2314
2315        // Connect to a server as 2 clients.
2316        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2317        let client_a = server.create_client(cx_a, "user_a").await;
2318        let client_b = server.create_client(cx_b, "user_b").await;
2319
2320        // Share a project as client A
2321        fs.insert_tree(
2322            "/a",
2323            json!({
2324                ".zed.toml": r#"collaborators = ["user_b"]"#,
2325                "a.rs": "let one = two",
2326            }),
2327        )
2328        .await;
2329        let project_a = cx_a.update(|cx| {
2330            Project::local(
2331                client_a.clone(),
2332                client_a.user_store.clone(),
2333                lang_registry.clone(),
2334                fs.clone(),
2335                cx,
2336            )
2337        });
2338        let (worktree_a, _) = project_a
2339            .update(cx_a, |p, cx| {
2340                p.find_or_create_local_worktree("/a", true, cx)
2341            })
2342            .await
2343            .unwrap();
2344        worktree_a
2345            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2346            .await;
2347        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2348        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2349        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2350
2351        // Join the worktree as client B.
2352        let project_b = Project::remote(
2353            project_id,
2354            client_b.clone(),
2355            client_b.user_store.clone(),
2356            lang_registry.clone(),
2357            fs.clone(),
2358            &mut cx_b.to_async(),
2359        )
2360        .await
2361        .unwrap();
2362
2363        let buffer_b = cx_b
2364            .background()
2365            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2366            .await
2367            .unwrap();
2368
2369        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2370        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2371            Some(vec![
2372                lsp::TextEdit {
2373                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2374                    new_text: "h".to_string(),
2375                },
2376                lsp::TextEdit {
2377                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2378                    new_text: "y".to_string(),
2379                },
2380            ])
2381        });
2382
2383        project_b
2384            .update(cx_b, |project, cx| {
2385                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2386            })
2387            .await
2388            .unwrap();
2389        assert_eq!(
2390            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2391            "let honey = two"
2392        );
2393    }
2394
2395    #[gpui::test(iterations = 10)]
2396    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2397        cx_a.foreground().forbid_parking();
2398        let mut lang_registry = Arc::new(LanguageRegistry::test());
2399        let fs = FakeFs::new(cx_a.background());
2400        fs.insert_tree(
2401            "/root-1",
2402            json!({
2403                ".zed.toml": r#"collaborators = ["user_b"]"#,
2404                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2405            }),
2406        )
2407        .await;
2408        fs.insert_tree(
2409            "/root-2",
2410            json!({
2411                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2412            }),
2413        )
2414        .await;
2415
2416        // Set up a fake language server.
2417        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2418        Arc::get_mut(&mut lang_registry)
2419            .unwrap()
2420            .add(Arc::new(Language::new(
2421                LanguageConfig {
2422                    name: "Rust".into(),
2423                    path_suffixes: vec!["rs".to_string()],
2424                    language_server: Some(language_server_config),
2425                    ..Default::default()
2426                },
2427                Some(tree_sitter_rust::language()),
2428            )));
2429
2430        // Connect to a server as 2 clients.
2431        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2432        let client_a = server.create_client(cx_a, "user_a").await;
2433        let client_b = server.create_client(cx_b, "user_b").await;
2434
2435        // Share a project as client A
2436        let project_a = cx_a.update(|cx| {
2437            Project::local(
2438                client_a.clone(),
2439                client_a.user_store.clone(),
2440                lang_registry.clone(),
2441                fs.clone(),
2442                cx,
2443            )
2444        });
2445        let (worktree_a, _) = project_a
2446            .update(cx_a, |p, cx| {
2447                p.find_or_create_local_worktree("/root-1", true, cx)
2448            })
2449            .await
2450            .unwrap();
2451        worktree_a
2452            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2453            .await;
2454        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2455        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2456        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2457
2458        // Join the worktree as client B.
2459        let project_b = Project::remote(
2460            project_id,
2461            client_b.clone(),
2462            client_b.user_store.clone(),
2463            lang_registry.clone(),
2464            fs.clone(),
2465            &mut cx_b.to_async(),
2466        )
2467        .await
2468        .unwrap();
2469
2470        // Open the file on client B.
2471        let buffer_b = cx_b
2472            .background()
2473            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2474            .await
2475            .unwrap();
2476
2477        // Request the definition of a symbol as the guest.
2478        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2479        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2480            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2481                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2482                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2483            )))
2484        });
2485
2486        let definitions_1 = project_b
2487            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2488            .await
2489            .unwrap();
2490        cx_b.read(|cx| {
2491            assert_eq!(definitions_1.len(), 1);
2492            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2493            let target_buffer = definitions_1[0].buffer.read(cx);
2494            assert_eq!(
2495                target_buffer.text(),
2496                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2497            );
2498            assert_eq!(
2499                definitions_1[0].range.to_point(target_buffer),
2500                Point::new(0, 6)..Point::new(0, 9)
2501            );
2502        });
2503
2504        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2505        // the previous call to `definition`.
2506        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2507            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2508                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2509                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2510            )))
2511        });
2512
2513        let definitions_2 = project_b
2514            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2515            .await
2516            .unwrap();
2517        cx_b.read(|cx| {
2518            assert_eq!(definitions_2.len(), 1);
2519            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2520            let target_buffer = definitions_2[0].buffer.read(cx);
2521            assert_eq!(
2522                target_buffer.text(),
2523                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2524            );
2525            assert_eq!(
2526                definitions_2[0].range.to_point(target_buffer),
2527                Point::new(1, 6)..Point::new(1, 11)
2528            );
2529        });
2530        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2531    }
2532
2533    #[gpui::test(iterations = 10)]
2534    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2535        cx_a.foreground().forbid_parking();
2536        let mut lang_registry = Arc::new(LanguageRegistry::test());
2537        let fs = FakeFs::new(cx_a.background());
2538        fs.insert_tree(
2539            "/root-1",
2540            json!({
2541                ".zed.toml": r#"collaborators = ["user_b"]"#,
2542                "one.rs": "const ONE: usize = 1;",
2543                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2544            }),
2545        )
2546        .await;
2547        fs.insert_tree(
2548            "/root-2",
2549            json!({
2550                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2551            }),
2552        )
2553        .await;
2554
2555        // Set up a fake language server.
2556        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2557        Arc::get_mut(&mut lang_registry)
2558            .unwrap()
2559            .add(Arc::new(Language::new(
2560                LanguageConfig {
2561                    name: "Rust".into(),
2562                    path_suffixes: vec!["rs".to_string()],
2563                    language_server: Some(language_server_config),
2564                    ..Default::default()
2565                },
2566                Some(tree_sitter_rust::language()),
2567            )));
2568
2569        // Connect to a server as 2 clients.
2570        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2571        let client_a = server.create_client(cx_a, "user_a").await;
2572        let client_b = server.create_client(cx_b, "user_b").await;
2573
2574        // Share a project as client A
2575        let project_a = cx_a.update(|cx| {
2576            Project::local(
2577                client_a.clone(),
2578                client_a.user_store.clone(),
2579                lang_registry.clone(),
2580                fs.clone(),
2581                cx,
2582            )
2583        });
2584        let (worktree_a, _) = project_a
2585            .update(cx_a, |p, cx| {
2586                p.find_or_create_local_worktree("/root-1", true, cx)
2587            })
2588            .await
2589            .unwrap();
2590        worktree_a
2591            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2592            .await;
2593        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2594        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2595        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2596
2597        // Join the worktree as client B.
2598        let project_b = Project::remote(
2599            project_id,
2600            client_b.clone(),
2601            client_b.user_store.clone(),
2602            lang_registry.clone(),
2603            fs.clone(),
2604            &mut cx_b.to_async(),
2605        )
2606        .await
2607        .unwrap();
2608
2609        // Open the file on client B.
2610        let buffer_b = cx_b
2611            .background()
2612            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2613            .await
2614            .unwrap();
2615
2616        // Request references to a symbol as the guest.
2617        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2618        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2619            assert_eq!(
2620                params.text_document_position.text_document.uri.as_str(),
2621                "file:///root-1/one.rs"
2622            );
2623            Some(vec![
2624                lsp::Location {
2625                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2626                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2627                },
2628                lsp::Location {
2629                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2630                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2631                },
2632                lsp::Location {
2633                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2634                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2635                },
2636            ])
2637        });
2638
2639        let references = project_b
2640            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2641            .await
2642            .unwrap();
2643        cx_b.read(|cx| {
2644            assert_eq!(references.len(), 3);
2645            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2646
2647            let two_buffer = references[0].buffer.read(cx);
2648            let three_buffer = references[2].buffer.read(cx);
2649            assert_eq!(
2650                two_buffer.file().unwrap().path().as_ref(),
2651                Path::new("two.rs")
2652            );
2653            assert_eq!(references[1].buffer, references[0].buffer);
2654            assert_eq!(
2655                three_buffer.file().unwrap().full_path(cx),
2656                Path::new("three.rs")
2657            );
2658
2659            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2660            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2661            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2662        });
2663    }
2664
2665    #[gpui::test(iterations = 10)]
2666    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2667        cx_a.foreground().forbid_parking();
2668        let lang_registry = Arc::new(LanguageRegistry::test());
2669        let fs = FakeFs::new(cx_a.background());
2670        fs.insert_tree(
2671            "/root-1",
2672            json!({
2673                ".zed.toml": r#"collaborators = ["user_b"]"#,
2674                "a": "hello world",
2675                "b": "goodnight moon",
2676                "c": "a world of goo",
2677                "d": "world champion of clown world",
2678            }),
2679        )
2680        .await;
2681        fs.insert_tree(
2682            "/root-2",
2683            json!({
2684                "e": "disney world is fun",
2685            }),
2686        )
2687        .await;
2688
2689        // Connect to a server as 2 clients.
2690        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2691        let client_a = server.create_client(cx_a, "user_a").await;
2692        let client_b = server.create_client(cx_b, "user_b").await;
2693
2694        // Share a project as client A
2695        let project_a = cx_a.update(|cx| {
2696            Project::local(
2697                client_a.clone(),
2698                client_a.user_store.clone(),
2699                lang_registry.clone(),
2700                fs.clone(),
2701                cx,
2702            )
2703        });
2704        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2705
2706        let (worktree_1, _) = project_a
2707            .update(cx_a, |p, cx| {
2708                p.find_or_create_local_worktree("/root-1", true, cx)
2709            })
2710            .await
2711            .unwrap();
2712        worktree_1
2713            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2714            .await;
2715        let (worktree_2, _) = project_a
2716            .update(cx_a, |p, cx| {
2717                p.find_or_create_local_worktree("/root-2", true, cx)
2718            })
2719            .await
2720            .unwrap();
2721        worktree_2
2722            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2723            .await;
2724
2725        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2726
2727        // Join the worktree as client B.
2728        let project_b = Project::remote(
2729            project_id,
2730            client_b.clone(),
2731            client_b.user_store.clone(),
2732            lang_registry.clone(),
2733            fs.clone(),
2734            &mut cx_b.to_async(),
2735        )
2736        .await
2737        .unwrap();
2738
2739        let results = project_b
2740            .update(cx_b, |project, cx| {
2741                project.search(SearchQuery::text("world", false, false), cx)
2742            })
2743            .await
2744            .unwrap();
2745
2746        let mut ranges_by_path = results
2747            .into_iter()
2748            .map(|(buffer, ranges)| {
2749                buffer.read_with(cx_b, |buffer, cx| {
2750                    let path = buffer.file().unwrap().full_path(cx);
2751                    let offset_ranges = ranges
2752                        .into_iter()
2753                        .map(|range| range.to_offset(buffer))
2754                        .collect::<Vec<_>>();
2755                    (path, offset_ranges)
2756                })
2757            })
2758            .collect::<Vec<_>>();
2759        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2760
2761        assert_eq!(
2762            ranges_by_path,
2763            &[
2764                (PathBuf::from("root-1/a"), vec![6..11]),
2765                (PathBuf::from("root-1/c"), vec![2..7]),
2766                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2767                (PathBuf::from("root-2/e"), vec![7..12]),
2768            ]
2769        );
2770    }
2771
2772    #[gpui::test(iterations = 10)]
2773    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2774        cx_a.foreground().forbid_parking();
2775        let lang_registry = Arc::new(LanguageRegistry::test());
2776        let fs = FakeFs::new(cx_a.background());
2777        fs.insert_tree(
2778            "/root-1",
2779            json!({
2780                ".zed.toml": r#"collaborators = ["user_b"]"#,
2781                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2782            }),
2783        )
2784        .await;
2785
2786        // Set up a fake language server.
2787        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2788        lang_registry.add(Arc::new(Language::new(
2789            LanguageConfig {
2790                name: "Rust".into(),
2791                path_suffixes: vec!["rs".to_string()],
2792                language_server: Some(language_server_config),
2793                ..Default::default()
2794            },
2795            Some(tree_sitter_rust::language()),
2796        )));
2797
2798        // Connect to a server as 2 clients.
2799        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2800        let client_a = server.create_client(cx_a, "user_a").await;
2801        let client_b = server.create_client(cx_b, "user_b").await;
2802
2803        // Share a project as client A
2804        let project_a = cx_a.update(|cx| {
2805            Project::local(
2806                client_a.clone(),
2807                client_a.user_store.clone(),
2808                lang_registry.clone(),
2809                fs.clone(),
2810                cx,
2811            )
2812        });
2813        let (worktree_a, _) = project_a
2814            .update(cx_a, |p, cx| {
2815                p.find_or_create_local_worktree("/root-1", true, cx)
2816            })
2817            .await
2818            .unwrap();
2819        worktree_a
2820            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2821            .await;
2822        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2823        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2824        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2825
2826        // Join the worktree as client B.
2827        let project_b = Project::remote(
2828            project_id,
2829            client_b.clone(),
2830            client_b.user_store.clone(),
2831            lang_registry.clone(),
2832            fs.clone(),
2833            &mut cx_b.to_async(),
2834        )
2835        .await
2836        .unwrap();
2837
2838        // Open the file on client B.
2839        let buffer_b = cx_b
2840            .background()
2841            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2842            .await
2843            .unwrap();
2844
2845        // Request document highlights as the guest.
2846        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2847        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2848            |params, _| {
2849                assert_eq!(
2850                    params
2851                        .text_document_position_params
2852                        .text_document
2853                        .uri
2854                        .as_str(),
2855                    "file:///root-1/main.rs"
2856                );
2857                assert_eq!(
2858                    params.text_document_position_params.position,
2859                    lsp::Position::new(0, 34)
2860                );
2861                Some(vec![
2862                    lsp::DocumentHighlight {
2863                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2864                        range: lsp::Range::new(
2865                            lsp::Position::new(0, 10),
2866                            lsp::Position::new(0, 16),
2867                        ),
2868                    },
2869                    lsp::DocumentHighlight {
2870                        kind: Some(lsp::DocumentHighlightKind::READ),
2871                        range: lsp::Range::new(
2872                            lsp::Position::new(0, 32),
2873                            lsp::Position::new(0, 38),
2874                        ),
2875                    },
2876                    lsp::DocumentHighlight {
2877                        kind: Some(lsp::DocumentHighlightKind::READ),
2878                        range: lsp::Range::new(
2879                            lsp::Position::new(0, 41),
2880                            lsp::Position::new(0, 47),
2881                        ),
2882                    },
2883                ])
2884            },
2885        );
2886
2887        let highlights = project_b
2888            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2889            .await
2890            .unwrap();
2891        buffer_b.read_with(cx_b, |buffer, _| {
2892            let snapshot = buffer.snapshot();
2893
2894            let highlights = highlights
2895                .into_iter()
2896                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2897                .collect::<Vec<_>>();
2898            assert_eq!(
2899                highlights,
2900                &[
2901                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2902                    (lsp::DocumentHighlightKind::READ, 32..38),
2903                    (lsp::DocumentHighlightKind::READ, 41..47)
2904                ]
2905            )
2906        });
2907    }
2908
2909    #[gpui::test(iterations = 10)]
2910    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2911        cx_a.foreground().forbid_parking();
2912        let mut lang_registry = Arc::new(LanguageRegistry::test());
2913        let fs = FakeFs::new(cx_a.background());
2914        fs.insert_tree(
2915            "/code",
2916            json!({
2917                "crate-1": {
2918                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2919                    "one.rs": "const ONE: usize = 1;",
2920                },
2921                "crate-2": {
2922                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2923                },
2924                "private": {
2925                    "passwords.txt": "the-password",
2926                }
2927            }),
2928        )
2929        .await;
2930
2931        // Set up a fake language server.
2932        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2933        Arc::get_mut(&mut lang_registry)
2934            .unwrap()
2935            .add(Arc::new(Language::new(
2936                LanguageConfig {
2937                    name: "Rust".into(),
2938                    path_suffixes: vec!["rs".to_string()],
2939                    language_server: Some(language_server_config),
2940                    ..Default::default()
2941                },
2942                Some(tree_sitter_rust::language()),
2943            )));
2944
2945        // Connect to a server as 2 clients.
2946        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2947        let client_a = server.create_client(cx_a, "user_a").await;
2948        let client_b = server.create_client(cx_b, "user_b").await;
2949
2950        // Share a project as client A
2951        let project_a = cx_a.update(|cx| {
2952            Project::local(
2953                client_a.clone(),
2954                client_a.user_store.clone(),
2955                lang_registry.clone(),
2956                fs.clone(),
2957                cx,
2958            )
2959        });
2960        let (worktree_a, _) = project_a
2961            .update(cx_a, |p, cx| {
2962                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2963            })
2964            .await
2965            .unwrap();
2966        worktree_a
2967            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2968            .await;
2969        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2970        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2971        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2972
2973        // Join the worktree as client B.
2974        let project_b = Project::remote(
2975            project_id,
2976            client_b.clone(),
2977            client_b.user_store.clone(),
2978            lang_registry.clone(),
2979            fs.clone(),
2980            &mut cx_b.to_async(),
2981        )
2982        .await
2983        .unwrap();
2984
2985        // Cause the language server to start.
2986        let _buffer = cx_b
2987            .background()
2988            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2989            .await
2990            .unwrap();
2991
2992        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2993        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2994            #[allow(deprecated)]
2995            Some(vec![lsp::SymbolInformation {
2996                name: "TWO".into(),
2997                location: lsp::Location {
2998                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2999                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3000                },
3001                kind: lsp::SymbolKind::CONSTANT,
3002                tags: None,
3003                container_name: None,
3004                deprecated: None,
3005            }])
3006        });
3007
3008        // Request the definition of a symbol as the guest.
3009        let symbols = project_b
3010            .update(cx_b, |p, cx| p.symbols("two", cx))
3011            .await
3012            .unwrap();
3013        assert_eq!(symbols.len(), 1);
3014        assert_eq!(symbols[0].name, "TWO");
3015
3016        // Open one of the returned symbols.
3017        let buffer_b_2 = project_b
3018            .update(cx_b, |project, cx| {
3019                project.open_buffer_for_symbol(&symbols[0], cx)
3020            })
3021            .await
3022            .unwrap();
3023        buffer_b_2.read_with(cx_b, |buffer, _| {
3024            assert_eq!(
3025                buffer.file().unwrap().path().as_ref(),
3026                Path::new("../crate-2/two.rs")
3027            );
3028        });
3029
3030        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3031        let mut fake_symbol = symbols[0].clone();
3032        fake_symbol.path = Path::new("/code/secrets").into();
3033        let error = project_b
3034            .update(cx_b, |project, cx| {
3035                project.open_buffer_for_symbol(&fake_symbol, cx)
3036            })
3037            .await
3038            .unwrap_err();
3039        assert!(error.to_string().contains("invalid symbol signature"));
3040    }
3041
3042    #[gpui::test(iterations = 10)]
3043    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3044        cx_a: &mut TestAppContext,
3045        cx_b: &mut TestAppContext,
3046        mut rng: StdRng,
3047    ) {
3048        cx_a.foreground().forbid_parking();
3049        let mut lang_registry = Arc::new(LanguageRegistry::test());
3050        let fs = FakeFs::new(cx_a.background());
3051        fs.insert_tree(
3052            "/root",
3053            json!({
3054                ".zed.toml": r#"collaborators = ["user_b"]"#,
3055                "a.rs": "const ONE: usize = b::TWO;",
3056                "b.rs": "const TWO: usize = 2",
3057            }),
3058        )
3059        .await;
3060
3061        // Set up a fake language server.
3062        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3063
3064        Arc::get_mut(&mut lang_registry)
3065            .unwrap()
3066            .add(Arc::new(Language::new(
3067                LanguageConfig {
3068                    name: "Rust".into(),
3069                    path_suffixes: vec!["rs".to_string()],
3070                    language_server: Some(language_server_config),
3071                    ..Default::default()
3072                },
3073                Some(tree_sitter_rust::language()),
3074            )));
3075
3076        // Connect to a server as 2 clients.
3077        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3078        let client_a = server.create_client(cx_a, "user_a").await;
3079        let client_b = server.create_client(cx_b, "user_b").await;
3080
3081        // Share a project as client A
3082        let project_a = cx_a.update(|cx| {
3083            Project::local(
3084                client_a.clone(),
3085                client_a.user_store.clone(),
3086                lang_registry.clone(),
3087                fs.clone(),
3088                cx,
3089            )
3090        });
3091
3092        let (worktree_a, _) = project_a
3093            .update(cx_a, |p, cx| {
3094                p.find_or_create_local_worktree("/root", true, cx)
3095            })
3096            .await
3097            .unwrap();
3098        worktree_a
3099            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3100            .await;
3101        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3102        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3103        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3104
3105        // Join the worktree as client B.
3106        let project_b = Project::remote(
3107            project_id,
3108            client_b.clone(),
3109            client_b.user_store.clone(),
3110            lang_registry.clone(),
3111            fs.clone(),
3112            &mut cx_b.to_async(),
3113        )
3114        .await
3115        .unwrap();
3116
3117        let buffer_b1 = cx_b
3118            .background()
3119            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3120            .await
3121            .unwrap();
3122
3123        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3124        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3125            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3126                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3127                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3128            )))
3129        });
3130
3131        let definitions;
3132        let buffer_b2;
3133        if rng.gen() {
3134            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3135            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3136        } else {
3137            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3138            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3139        }
3140
3141        let buffer_b2 = buffer_b2.await.unwrap();
3142        let definitions = definitions.await.unwrap();
3143        assert_eq!(definitions.len(), 1);
3144        assert_eq!(definitions[0].buffer, buffer_b2);
3145    }
3146
3147    #[gpui::test(iterations = 10)]
3148    async fn test_collaborating_with_code_actions(
3149        cx_a: &mut TestAppContext,
3150        cx_b: &mut TestAppContext,
3151    ) {
3152        cx_a.foreground().forbid_parking();
3153        let mut lang_registry = Arc::new(LanguageRegistry::test());
3154        let fs = FakeFs::new(cx_a.background());
3155        cx_b.update(|cx| editor::init(cx));
3156
3157        // Set up a fake language server.
3158        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3159        Arc::get_mut(&mut lang_registry)
3160            .unwrap()
3161            .add(Arc::new(Language::new(
3162                LanguageConfig {
3163                    name: "Rust".into(),
3164                    path_suffixes: vec!["rs".to_string()],
3165                    language_server: Some(language_server_config),
3166                    ..Default::default()
3167                },
3168                Some(tree_sitter_rust::language()),
3169            )));
3170
3171        // Connect to a server as 2 clients.
3172        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3173        let client_a = server.create_client(cx_a, "user_a").await;
3174        let client_b = server.create_client(cx_b, "user_b").await;
3175
3176        // Share a project as client A
3177        fs.insert_tree(
3178            "/a",
3179            json!({
3180                ".zed.toml": r#"collaborators = ["user_b"]"#,
3181                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3182                "other.rs": "pub fn foo() -> usize { 4 }",
3183            }),
3184        )
3185        .await;
3186        let project_a = cx_a.update(|cx| {
3187            Project::local(
3188                client_a.clone(),
3189                client_a.user_store.clone(),
3190                lang_registry.clone(),
3191                fs.clone(),
3192                cx,
3193            )
3194        });
3195        let (worktree_a, _) = project_a
3196            .update(cx_a, |p, cx| {
3197                p.find_or_create_local_worktree("/a", true, cx)
3198            })
3199            .await
3200            .unwrap();
3201        worktree_a
3202            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3203            .await;
3204        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3205        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3206        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3207
3208        // Join the worktree as client B.
3209        let project_b = Project::remote(
3210            project_id,
3211            client_b.clone(),
3212            client_b.user_store.clone(),
3213            lang_registry.clone(),
3214            fs.clone(),
3215            &mut cx_b.to_async(),
3216        )
3217        .await
3218        .unwrap();
3219        let mut params = cx_b.update(WorkspaceParams::test);
3220        params.languages = lang_registry.clone();
3221        params.client = client_b.client.clone();
3222        params.user_store = client_b.user_store.clone();
3223        params.project = project_b;
3224
3225        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3226        let editor_b = workspace_b
3227            .update(cx_b, |workspace, cx| {
3228                workspace.open_path((worktree_id, "main.rs").into(), cx)
3229            })
3230            .await
3231            .unwrap()
3232            .downcast::<Editor>()
3233            .unwrap();
3234
3235        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3236        fake_language_server
3237            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3238                assert_eq!(
3239                    params.text_document.uri,
3240                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3241                );
3242                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3243                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3244                None
3245            })
3246            .next()
3247            .await;
3248
3249        // Move cursor to a location that contains code actions.
3250        editor_b.update(cx_b, |editor, cx| {
3251            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3252            cx.focus(&editor_b);
3253        });
3254
3255        fake_language_server
3256            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3257                assert_eq!(
3258                    params.text_document.uri,
3259                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3260                );
3261                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3262                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3263
3264                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3265                    lsp::CodeAction {
3266                        title: "Inline into all callers".to_string(),
3267                        edit: Some(lsp::WorkspaceEdit {
3268                            changes: Some(
3269                                [
3270                                    (
3271                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3272                                        vec![lsp::TextEdit::new(
3273                                            lsp::Range::new(
3274                                                lsp::Position::new(1, 22),
3275                                                lsp::Position::new(1, 34),
3276                                            ),
3277                                            "4".to_string(),
3278                                        )],
3279                                    ),
3280                                    (
3281                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3282                                        vec![lsp::TextEdit::new(
3283                                            lsp::Range::new(
3284                                                lsp::Position::new(0, 0),
3285                                                lsp::Position::new(0, 27),
3286                                            ),
3287                                            "".to_string(),
3288                                        )],
3289                                    ),
3290                                ]
3291                                .into_iter()
3292                                .collect(),
3293                            ),
3294                            ..Default::default()
3295                        }),
3296                        data: Some(json!({
3297                            "codeActionParams": {
3298                                "range": {
3299                                    "start": {"line": 1, "column": 31},
3300                                    "end": {"line": 1, "column": 31},
3301                                }
3302                            }
3303                        })),
3304                        ..Default::default()
3305                    },
3306                )])
3307            })
3308            .next()
3309            .await;
3310
3311        // Toggle code actions and wait for them to display.
3312        editor_b.update(cx_b, |editor, cx| {
3313            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3314        });
3315        editor_b
3316            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3317            .await;
3318
3319        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3320
3321        // Confirming the code action will trigger a resolve request.
3322        let confirm_action = workspace_b
3323            .update(cx_b, |workspace, cx| {
3324                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3325            })
3326            .unwrap();
3327        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3328            lsp::CodeAction {
3329                title: "Inline into all callers".to_string(),
3330                edit: Some(lsp::WorkspaceEdit {
3331                    changes: Some(
3332                        [
3333                            (
3334                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3335                                vec![lsp::TextEdit::new(
3336                                    lsp::Range::new(
3337                                        lsp::Position::new(1, 22),
3338                                        lsp::Position::new(1, 34),
3339                                    ),
3340                                    "4".to_string(),
3341                                )],
3342                            ),
3343                            (
3344                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3345                                vec![lsp::TextEdit::new(
3346                                    lsp::Range::new(
3347                                        lsp::Position::new(0, 0),
3348                                        lsp::Position::new(0, 27),
3349                                    ),
3350                                    "".to_string(),
3351                                )],
3352                            ),
3353                        ]
3354                        .into_iter()
3355                        .collect(),
3356                    ),
3357                    ..Default::default()
3358                }),
3359                ..Default::default()
3360            }
3361        });
3362
3363        // After the action is confirmed, an editor containing both modified files is opened.
3364        confirm_action.await.unwrap();
3365        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3366            workspace
3367                .active_item(cx)
3368                .unwrap()
3369                .downcast::<Editor>()
3370                .unwrap()
3371        });
3372        code_action_editor.update(cx_b, |editor, cx| {
3373            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3374            editor.undo(&Undo, cx);
3375            assert_eq!(
3376                editor.text(cx),
3377                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3378            );
3379            editor.redo(&Redo, cx);
3380            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3381        });
3382    }
3383
3384    #[gpui::test(iterations = 10)]
3385    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3386        cx_a.foreground().forbid_parking();
3387        let mut lang_registry = Arc::new(LanguageRegistry::test());
3388        let fs = FakeFs::new(cx_a.background());
3389        cx_b.update(|cx| editor::init(cx));
3390
3391        // Set up a fake language server.
3392        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3393        Arc::get_mut(&mut lang_registry)
3394            .unwrap()
3395            .add(Arc::new(Language::new(
3396                LanguageConfig {
3397                    name: "Rust".into(),
3398                    path_suffixes: vec!["rs".to_string()],
3399                    language_server: Some(language_server_config),
3400                    ..Default::default()
3401                },
3402                Some(tree_sitter_rust::language()),
3403            )));
3404
3405        // Connect to a server as 2 clients.
3406        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3407        let client_a = server.create_client(cx_a, "user_a").await;
3408        let client_b = server.create_client(cx_b, "user_b").await;
3409
3410        // Share a project as client A
3411        fs.insert_tree(
3412            "/dir",
3413            json!({
3414                ".zed.toml": r#"collaborators = ["user_b"]"#,
3415                "one.rs": "const ONE: usize = 1;",
3416                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3417            }),
3418        )
3419        .await;
3420        let project_a = cx_a.update(|cx| {
3421            Project::local(
3422                client_a.clone(),
3423                client_a.user_store.clone(),
3424                lang_registry.clone(),
3425                fs.clone(),
3426                cx,
3427            )
3428        });
3429        let (worktree_a, _) = project_a
3430            .update(cx_a, |p, cx| {
3431                p.find_or_create_local_worktree("/dir", true, cx)
3432            })
3433            .await
3434            .unwrap();
3435        worktree_a
3436            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3437            .await;
3438        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3439        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3440        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3441
3442        // Join the worktree as client B.
3443        let project_b = Project::remote(
3444            project_id,
3445            client_b.clone(),
3446            client_b.user_store.clone(),
3447            lang_registry.clone(),
3448            fs.clone(),
3449            &mut cx_b.to_async(),
3450        )
3451        .await
3452        .unwrap();
3453        let mut params = cx_b.update(WorkspaceParams::test);
3454        params.languages = lang_registry.clone();
3455        params.client = client_b.client.clone();
3456        params.user_store = client_b.user_store.clone();
3457        params.project = project_b;
3458
3459        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3460        let editor_b = workspace_b
3461            .update(cx_b, |workspace, cx| {
3462                workspace.open_path((worktree_id, "one.rs").into(), cx)
3463            })
3464            .await
3465            .unwrap()
3466            .downcast::<Editor>()
3467            .unwrap();
3468        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3469
3470        // Move cursor to a location that can be renamed.
3471        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3472            editor.select_ranges([7..7], None, cx);
3473            editor.rename(&Rename, cx).unwrap()
3474        });
3475
3476        fake_language_server
3477            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3478                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3479                assert_eq!(params.position, lsp::Position::new(0, 7));
3480                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3481                    lsp::Position::new(0, 6),
3482                    lsp::Position::new(0, 9),
3483                )))
3484            })
3485            .next()
3486            .await
3487            .unwrap();
3488        prepare_rename.await.unwrap();
3489        editor_b.update(cx_b, |editor, cx| {
3490            let rename = editor.pending_rename().unwrap();
3491            let buffer = editor.buffer().read(cx).snapshot(cx);
3492            assert_eq!(
3493                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3494                6..9
3495            );
3496            rename.editor.update(cx, |rename_editor, cx| {
3497                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3498                    rename_buffer.edit([0..3], "THREE", cx);
3499                });
3500            });
3501        });
3502
3503        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3504            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3505        });
3506        fake_language_server
3507            .handle_request::<lsp::request::Rename, _>(|params, _| {
3508                assert_eq!(
3509                    params.text_document_position.text_document.uri.as_str(),
3510                    "file:///dir/one.rs"
3511                );
3512                assert_eq!(
3513                    params.text_document_position.position,
3514                    lsp::Position::new(0, 6)
3515                );
3516                assert_eq!(params.new_name, "THREE");
3517                Some(lsp::WorkspaceEdit {
3518                    changes: Some(
3519                        [
3520                            (
3521                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3522                                vec![lsp::TextEdit::new(
3523                                    lsp::Range::new(
3524                                        lsp::Position::new(0, 6),
3525                                        lsp::Position::new(0, 9),
3526                                    ),
3527                                    "THREE".to_string(),
3528                                )],
3529                            ),
3530                            (
3531                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3532                                vec![
3533                                    lsp::TextEdit::new(
3534                                        lsp::Range::new(
3535                                            lsp::Position::new(0, 24),
3536                                            lsp::Position::new(0, 27),
3537                                        ),
3538                                        "THREE".to_string(),
3539                                    ),
3540                                    lsp::TextEdit::new(
3541                                        lsp::Range::new(
3542                                            lsp::Position::new(0, 35),
3543                                            lsp::Position::new(0, 38),
3544                                        ),
3545                                        "THREE".to_string(),
3546                                    ),
3547                                ],
3548                            ),
3549                        ]
3550                        .into_iter()
3551                        .collect(),
3552                    ),
3553                    ..Default::default()
3554                })
3555            })
3556            .next()
3557            .await
3558            .unwrap();
3559        confirm_rename.await.unwrap();
3560
3561        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3562            workspace
3563                .active_item(cx)
3564                .unwrap()
3565                .downcast::<Editor>()
3566                .unwrap()
3567        });
3568        rename_editor.update(cx_b, |editor, cx| {
3569            assert_eq!(
3570                editor.text(cx),
3571                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3572            );
3573            editor.undo(&Undo, cx);
3574            assert_eq!(
3575                editor.text(cx),
3576                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3577            );
3578            editor.redo(&Redo, cx);
3579            assert_eq!(
3580                editor.text(cx),
3581                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3582            );
3583        });
3584
3585        // Ensure temporary rename edits cannot be undone/redone.
3586        editor_b.update(cx_b, |editor, cx| {
3587            editor.undo(&Undo, cx);
3588            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3589            editor.undo(&Undo, cx);
3590            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3591            editor.redo(&Redo, cx);
3592            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3593        })
3594    }
3595
3596    #[gpui::test(iterations = 10)]
3597    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3598        cx_a.foreground().forbid_parking();
3599
3600        // Connect to a server as 2 clients.
3601        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3602        let client_a = server.create_client(cx_a, "user_a").await;
3603        let client_b = server.create_client(cx_b, "user_b").await;
3604
3605        // Create an org that includes these 2 users.
3606        let db = &server.app_state.db;
3607        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3608        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3609            .await
3610            .unwrap();
3611        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3612            .await
3613            .unwrap();
3614
3615        // Create a channel that includes all the users.
3616        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3617        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3618            .await
3619            .unwrap();
3620        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3621            .await
3622            .unwrap();
3623        db.create_channel_message(
3624            channel_id,
3625            client_b.current_user_id(&cx_b),
3626            "hello A, it's B.",
3627            OffsetDateTime::now_utc(),
3628            1,
3629        )
3630        .await
3631        .unwrap();
3632
3633        let channels_a = cx_a
3634            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3635        channels_a
3636            .condition(cx_a, |list, _| list.available_channels().is_some())
3637            .await;
3638        channels_a.read_with(cx_a, |list, _| {
3639            assert_eq!(
3640                list.available_channels().unwrap(),
3641                &[ChannelDetails {
3642                    id: channel_id.to_proto(),
3643                    name: "test-channel".to_string()
3644                }]
3645            )
3646        });
3647        let channel_a = channels_a.update(cx_a, |this, cx| {
3648            this.get_channel(channel_id.to_proto(), cx).unwrap()
3649        });
3650        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3651        channel_a
3652            .condition(&cx_a, |channel, _| {
3653                channel_messages(channel)
3654                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3655            })
3656            .await;
3657
3658        let channels_b = cx_b
3659            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3660        channels_b
3661            .condition(cx_b, |list, _| list.available_channels().is_some())
3662            .await;
3663        channels_b.read_with(cx_b, |list, _| {
3664            assert_eq!(
3665                list.available_channels().unwrap(),
3666                &[ChannelDetails {
3667                    id: channel_id.to_proto(),
3668                    name: "test-channel".to_string()
3669                }]
3670            )
3671        });
3672
3673        let channel_b = channels_b.update(cx_b, |this, cx| {
3674            this.get_channel(channel_id.to_proto(), cx).unwrap()
3675        });
3676        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3677        channel_b
3678            .condition(&cx_b, |channel, _| {
3679                channel_messages(channel)
3680                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3681            })
3682            .await;
3683
3684        channel_a
3685            .update(cx_a, |channel, cx| {
3686                channel
3687                    .send_message("oh, hi B.".to_string(), cx)
3688                    .unwrap()
3689                    .detach();
3690                let task = channel.send_message("sup".to_string(), cx).unwrap();
3691                assert_eq!(
3692                    channel_messages(channel),
3693                    &[
3694                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3695                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3696                        ("user_a".to_string(), "sup".to_string(), true)
3697                    ]
3698                );
3699                task
3700            })
3701            .await
3702            .unwrap();
3703
3704        channel_b
3705            .condition(&cx_b, |channel, _| {
3706                channel_messages(channel)
3707                    == [
3708                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3709                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3710                        ("user_a".to_string(), "sup".to_string(), false),
3711                    ]
3712            })
3713            .await;
3714
3715        assert_eq!(
3716            server
3717                .state()
3718                .await
3719                .channel(channel_id)
3720                .unwrap()
3721                .connection_ids
3722                .len(),
3723            2
3724        );
3725        cx_b.update(|_| drop(channel_b));
3726        server
3727            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3728            .await;
3729
3730        cx_a.update(|_| drop(channel_a));
3731        server
3732            .condition(|state| state.channel(channel_id).is_none())
3733            .await;
3734    }
3735
3736    #[gpui::test(iterations = 10)]
3737    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3738        cx_a.foreground().forbid_parking();
3739
3740        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3741        let client_a = server.create_client(cx_a, "user_a").await;
3742
3743        let db = &server.app_state.db;
3744        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3745        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3746        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3747            .await
3748            .unwrap();
3749        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3750            .await
3751            .unwrap();
3752
3753        let channels_a = cx_a
3754            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3755        channels_a
3756            .condition(cx_a, |list, _| list.available_channels().is_some())
3757            .await;
3758        let channel_a = channels_a.update(cx_a, |this, cx| {
3759            this.get_channel(channel_id.to_proto(), cx).unwrap()
3760        });
3761
3762        // Messages aren't allowed to be too long.
3763        channel_a
3764            .update(cx_a, |channel, cx| {
3765                let long_body = "this is long.\n".repeat(1024);
3766                channel.send_message(long_body, cx).unwrap()
3767            })
3768            .await
3769            .unwrap_err();
3770
3771        // Messages aren't allowed to be blank.
3772        channel_a.update(cx_a, |channel, cx| {
3773            channel.send_message(String::new(), cx).unwrap_err()
3774        });
3775
3776        // Leading and trailing whitespace are trimmed.
3777        channel_a
3778            .update(cx_a, |channel, cx| {
3779                channel
3780                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3781                    .unwrap()
3782            })
3783            .await
3784            .unwrap();
3785        assert_eq!(
3786            db.get_channel_messages(channel_id, 10, None)
3787                .await
3788                .unwrap()
3789                .iter()
3790                .map(|m| &m.body)
3791                .collect::<Vec<_>>(),
3792            &["surrounded by whitespace"]
3793        );
3794    }
3795
3796    #[gpui::test(iterations = 10)]
3797    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3798        cx_a.foreground().forbid_parking();
3799
3800        // Connect to a server as 2 clients.
3801        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3802        let client_a = server.create_client(cx_a, "user_a").await;
3803        let client_b = server.create_client(cx_b, "user_b").await;
3804        let mut status_b = client_b.status();
3805
3806        // Create an org that includes these 2 users.
3807        let db = &server.app_state.db;
3808        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3809        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3810            .await
3811            .unwrap();
3812        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3813            .await
3814            .unwrap();
3815
3816        // Create a channel that includes all the users.
3817        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3818        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3819            .await
3820            .unwrap();
3821        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3822            .await
3823            .unwrap();
3824        db.create_channel_message(
3825            channel_id,
3826            client_b.current_user_id(&cx_b),
3827            "hello A, it's B.",
3828            OffsetDateTime::now_utc(),
3829            2,
3830        )
3831        .await
3832        .unwrap();
3833
3834        let channels_a = cx_a
3835            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3836        channels_a
3837            .condition(cx_a, |list, _| list.available_channels().is_some())
3838            .await;
3839
3840        channels_a.read_with(cx_a, |list, _| {
3841            assert_eq!(
3842                list.available_channels().unwrap(),
3843                &[ChannelDetails {
3844                    id: channel_id.to_proto(),
3845                    name: "test-channel".to_string()
3846                }]
3847            )
3848        });
3849        let channel_a = channels_a.update(cx_a, |this, cx| {
3850            this.get_channel(channel_id.to_proto(), cx).unwrap()
3851        });
3852        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3853        channel_a
3854            .condition(&cx_a, |channel, _| {
3855                channel_messages(channel)
3856                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3857            })
3858            .await;
3859
3860        let channels_b = cx_b
3861            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3862        channels_b
3863            .condition(cx_b, |list, _| list.available_channels().is_some())
3864            .await;
3865        channels_b.read_with(cx_b, |list, _| {
3866            assert_eq!(
3867                list.available_channels().unwrap(),
3868                &[ChannelDetails {
3869                    id: channel_id.to_proto(),
3870                    name: "test-channel".to_string()
3871                }]
3872            )
3873        });
3874
3875        let channel_b = channels_b.update(cx_b, |this, cx| {
3876            this.get_channel(channel_id.to_proto(), cx).unwrap()
3877        });
3878        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3879        channel_b
3880            .condition(&cx_b, |channel, _| {
3881                channel_messages(channel)
3882                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3883            })
3884            .await;
3885
3886        // Disconnect client B, ensuring we can still access its cached channel data.
3887        server.forbid_connections();
3888        server.disconnect_client(client_b.current_user_id(&cx_b));
3889        cx_b.foreground().advance_clock(Duration::from_secs(3));
3890        while !matches!(
3891            status_b.next().await,
3892            Some(client::Status::ReconnectionError { .. })
3893        ) {}
3894
3895        channels_b.read_with(cx_b, |channels, _| {
3896            assert_eq!(
3897                channels.available_channels().unwrap(),
3898                [ChannelDetails {
3899                    id: channel_id.to_proto(),
3900                    name: "test-channel".to_string()
3901                }]
3902            )
3903        });
3904        channel_b.read_with(cx_b, |channel, _| {
3905            assert_eq!(
3906                channel_messages(channel),
3907                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3908            )
3909        });
3910
3911        // Send a message from client B while it is disconnected.
3912        channel_b
3913            .update(cx_b, |channel, cx| {
3914                let task = channel
3915                    .send_message("can you see this?".to_string(), cx)
3916                    .unwrap();
3917                assert_eq!(
3918                    channel_messages(channel),
3919                    &[
3920                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3921                        ("user_b".to_string(), "can you see this?".to_string(), true)
3922                    ]
3923                );
3924                task
3925            })
3926            .await
3927            .unwrap_err();
3928
3929        // Send a message from client A while B is disconnected.
3930        channel_a
3931            .update(cx_a, |channel, cx| {
3932                channel
3933                    .send_message("oh, hi B.".to_string(), cx)
3934                    .unwrap()
3935                    .detach();
3936                let task = channel.send_message("sup".to_string(), cx).unwrap();
3937                assert_eq!(
3938                    channel_messages(channel),
3939                    &[
3940                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3941                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3942                        ("user_a".to_string(), "sup".to_string(), true)
3943                    ]
3944                );
3945                task
3946            })
3947            .await
3948            .unwrap();
3949
3950        // Give client B a chance to reconnect.
3951        server.allow_connections();
3952        cx_b.foreground().advance_clock(Duration::from_secs(10));
3953
3954        // Verify that B sees the new messages upon reconnection, as well as the message client B
3955        // sent while offline.
3956        channel_b
3957            .condition(&cx_b, |channel, _| {
3958                channel_messages(channel)
3959                    == [
3960                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3961                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3962                        ("user_a".to_string(), "sup".to_string(), false),
3963                        ("user_b".to_string(), "can you see this?".to_string(), false),
3964                    ]
3965            })
3966            .await;
3967
3968        // Ensure client A and B can communicate normally after reconnection.
3969        channel_a
3970            .update(cx_a, |channel, cx| {
3971                channel.send_message("you online?".to_string(), cx).unwrap()
3972            })
3973            .await
3974            .unwrap();
3975        channel_b
3976            .condition(&cx_b, |channel, _| {
3977                channel_messages(channel)
3978                    == [
3979                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3980                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3981                        ("user_a".to_string(), "sup".to_string(), false),
3982                        ("user_b".to_string(), "can you see this?".to_string(), false),
3983                        ("user_a".to_string(), "you online?".to_string(), false),
3984                    ]
3985            })
3986            .await;
3987
3988        channel_b
3989            .update(cx_b, |channel, cx| {
3990                channel.send_message("yep".to_string(), cx).unwrap()
3991            })
3992            .await
3993            .unwrap();
3994        channel_a
3995            .condition(&cx_a, |channel, _| {
3996                channel_messages(channel)
3997                    == [
3998                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3999                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4000                        ("user_a".to_string(), "sup".to_string(), false),
4001                        ("user_b".to_string(), "can you see this?".to_string(), false),
4002                        ("user_a".to_string(), "you online?".to_string(), false),
4003                        ("user_b".to_string(), "yep".to_string(), false),
4004                    ]
4005            })
4006            .await;
4007    }
4008
4009    #[gpui::test(iterations = 10)]
4010    async fn test_contacts(
4011        cx_a: &mut TestAppContext,
4012        cx_b: &mut TestAppContext,
4013        cx_c: &mut TestAppContext,
4014    ) {
4015        cx_a.foreground().forbid_parking();
4016        let lang_registry = Arc::new(LanguageRegistry::test());
4017        let fs = FakeFs::new(cx_a.background());
4018
4019        // Connect to a server as 3 clients.
4020        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4021        let client_a = server.create_client(cx_a, "user_a").await;
4022        let client_b = server.create_client(cx_b, "user_b").await;
4023        let client_c = server.create_client(cx_c, "user_c").await;
4024
4025        // Share a worktree as client A.
4026        fs.insert_tree(
4027            "/a",
4028            json!({
4029                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4030            }),
4031        )
4032        .await;
4033
4034        let project_a = cx_a.update(|cx| {
4035            Project::local(
4036                client_a.clone(),
4037                client_a.user_store.clone(),
4038                lang_registry.clone(),
4039                fs.clone(),
4040                cx,
4041            )
4042        });
4043        let (worktree_a, _) = project_a
4044            .update(cx_a, |p, cx| {
4045                p.find_or_create_local_worktree("/a", true, cx)
4046            })
4047            .await
4048            .unwrap();
4049        worktree_a
4050            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4051            .await;
4052
4053        client_a
4054            .user_store
4055            .condition(&cx_a, |user_store, _| {
4056                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4057            })
4058            .await;
4059        client_b
4060            .user_store
4061            .condition(&cx_b, |user_store, _| {
4062                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4063            })
4064            .await;
4065        client_c
4066            .user_store
4067            .condition(&cx_c, |user_store, _| {
4068                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4069            })
4070            .await;
4071
4072        let project_id = project_a
4073            .update(cx_a, |project, _| project.next_remote_id())
4074            .await;
4075        project_a
4076            .update(cx_a, |project, cx| project.share(cx))
4077            .await
4078            .unwrap();
4079
4080        let _project_b = Project::remote(
4081            project_id,
4082            client_b.clone(),
4083            client_b.user_store.clone(),
4084            lang_registry.clone(),
4085            fs.clone(),
4086            &mut cx_b.to_async(),
4087        )
4088        .await
4089        .unwrap();
4090
4091        client_a
4092            .user_store
4093            .condition(&cx_a, |user_store, _| {
4094                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4095            })
4096            .await;
4097        client_b
4098            .user_store
4099            .condition(&cx_b, |user_store, _| {
4100                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4101            })
4102            .await;
4103        client_c
4104            .user_store
4105            .condition(&cx_c, |user_store, _| {
4106                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4107            })
4108            .await;
4109
4110        project_a
4111            .condition(&cx_a, |project, _| {
4112                project.collaborators().contains_key(&client_b.peer_id)
4113            })
4114            .await;
4115
4116        cx_a.update(move |_| drop(project_a));
4117        client_a
4118            .user_store
4119            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4120            .await;
4121        client_b
4122            .user_store
4123            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4124            .await;
4125        client_c
4126            .user_store
4127            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4128            .await;
4129
4130        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4131            user_store
4132                .contacts()
4133                .iter()
4134                .map(|contact| {
4135                    let worktrees = contact
4136                        .projects
4137                        .iter()
4138                        .map(|p| {
4139                            (
4140                                p.worktree_root_names[0].as_str(),
4141                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4142                            )
4143                        })
4144                        .collect();
4145                    (contact.user.github_login.as_str(), worktrees)
4146                })
4147                .collect()
4148        }
4149    }
4150
4151    #[gpui::test(iterations = 100)]
4152    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4153        cx.foreground().forbid_parking();
4154        let max_peers = env::var("MAX_PEERS")
4155            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4156            .unwrap_or(5);
4157        let max_operations = env::var("OPERATIONS")
4158            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4159            .unwrap_or(10);
4160
4161        let rng = Arc::new(Mutex::new(rng));
4162
4163        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4164        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4165
4166        let fs = FakeFs::new(cx.background());
4167        fs.insert_tree(
4168            "/_collab",
4169            json!({
4170                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4171            }),
4172        )
4173        .await;
4174
4175        let operations = Rc::new(Cell::new(0));
4176        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4177        let mut clients = Vec::new();
4178
4179        let mut next_entity_id = 100000;
4180        let mut host_cx = TestAppContext::new(
4181            cx.foreground_platform(),
4182            cx.platform(),
4183            cx.foreground(),
4184            cx.background(),
4185            cx.font_cache(),
4186            cx.leak_detector(),
4187            next_entity_id,
4188        );
4189        let host = server.create_client(&mut host_cx, "host").await;
4190        let host_project = host_cx.update(|cx| {
4191            Project::local(
4192                host.client.clone(),
4193                host.user_store.clone(),
4194                Arc::new(LanguageRegistry::test()),
4195                fs.clone(),
4196                cx,
4197            )
4198        });
4199        let host_project_id = host_project
4200            .update(&mut host_cx, |p, _| p.next_remote_id())
4201            .await;
4202
4203        let (collab_worktree, _) = host_project
4204            .update(&mut host_cx, |project, cx| {
4205                project.find_or_create_local_worktree("/_collab", true, cx)
4206            })
4207            .await
4208            .unwrap();
4209        collab_worktree
4210            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4211            .await;
4212        host_project
4213            .update(&mut host_cx, |project, cx| project.share(cx))
4214            .await
4215            .unwrap();
4216
4217        clients.push(cx.foreground().spawn(host.simulate_host(
4218            host_project,
4219            language_server_config,
4220            operations.clone(),
4221            max_operations,
4222            rng.clone(),
4223            host_cx,
4224        )));
4225
4226        while operations.get() < max_operations {
4227            cx.background().simulate_random_delay().await;
4228            if clients.len() >= max_peers {
4229                break;
4230            } else if rng.lock().gen_bool(0.05) {
4231                operations.set(operations.get() + 1);
4232
4233                let guest_id = clients.len();
4234                log::info!("Adding guest {}", guest_id);
4235                next_entity_id += 100000;
4236                let mut guest_cx = TestAppContext::new(
4237                    cx.foreground_platform(),
4238                    cx.platform(),
4239                    cx.foreground(),
4240                    cx.background(),
4241                    cx.font_cache(),
4242                    cx.leak_detector(),
4243                    next_entity_id,
4244                );
4245                let guest = server
4246                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4247                    .await;
4248                let guest_project = Project::remote(
4249                    host_project_id,
4250                    guest.client.clone(),
4251                    guest.user_store.clone(),
4252                    guest_lang_registry.clone(),
4253                    FakeFs::new(cx.background()),
4254                    &mut guest_cx.to_async(),
4255                )
4256                .await
4257                .unwrap();
4258                clients.push(cx.foreground().spawn(guest.simulate_guest(
4259                    guest_id,
4260                    guest_project,
4261                    operations.clone(),
4262                    max_operations,
4263                    rng.clone(),
4264                    guest_cx,
4265                )));
4266
4267                log::info!("Guest {} added", guest_id);
4268            }
4269        }
4270
4271        let mut clients = futures::future::join_all(clients).await;
4272        cx.foreground().run_until_parked();
4273
4274        let (host_client, mut host_cx) = clients.remove(0);
4275        let host_project = host_client.project.as_ref().unwrap();
4276        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4277            project
4278                .worktrees(cx)
4279                .map(|worktree| {
4280                    let snapshot = worktree.read(cx).snapshot();
4281                    (snapshot.id(), snapshot)
4282                })
4283                .collect::<BTreeMap<_, _>>()
4284        });
4285
4286        host_client
4287            .project
4288            .as_ref()
4289            .unwrap()
4290            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4291
4292        for (guest_client, mut guest_cx) in clients.into_iter() {
4293            let guest_id = guest_client.client.id();
4294            let worktree_snapshots =
4295                guest_client
4296                    .project
4297                    .as_ref()
4298                    .unwrap()
4299                    .read_with(&guest_cx, |project, cx| {
4300                        project
4301                            .worktrees(cx)
4302                            .map(|worktree| {
4303                                let worktree = worktree.read(cx);
4304                                (worktree.id(), worktree.snapshot())
4305                            })
4306                            .collect::<BTreeMap<_, _>>()
4307                    });
4308
4309            assert_eq!(
4310                worktree_snapshots.keys().collect::<Vec<_>>(),
4311                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4312                "guest {} has different worktrees than the host",
4313                guest_id
4314            );
4315            for (id, host_snapshot) in &host_worktree_snapshots {
4316                let guest_snapshot = &worktree_snapshots[id];
4317                assert_eq!(
4318                    guest_snapshot.root_name(),
4319                    host_snapshot.root_name(),
4320                    "guest {} has different root name than the host for worktree {}",
4321                    guest_id,
4322                    id
4323                );
4324                assert_eq!(
4325                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4326                    host_snapshot.entries(false).collect::<Vec<_>>(),
4327                    "guest {} has different snapshot than the host for worktree {}",
4328                    guest_id,
4329                    id
4330                );
4331            }
4332
4333            guest_client
4334                .project
4335                .as_ref()
4336                .unwrap()
4337                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4338
4339            for guest_buffer in &guest_client.buffers {
4340                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4341                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4342                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4343                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4344                        guest_id, guest_client.peer_id, buffer_id
4345                    ))
4346                });
4347                let path = host_buffer
4348                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4349
4350                assert_eq!(
4351                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4352                    0,
4353                    "guest {}, buffer {}, path {:?} has deferred operations",
4354                    guest_id,
4355                    buffer_id,
4356                    path,
4357                );
4358                assert_eq!(
4359                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4360                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4361                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4362                    guest_id,
4363                    buffer_id,
4364                    path
4365                );
4366            }
4367
4368            guest_cx.update(|_| drop(guest_client));
4369        }
4370
4371        host_cx.update(|_| drop(host_client));
4372    }
4373
4374    struct TestServer {
4375        peer: Arc<Peer>,
4376        app_state: Arc<AppState>,
4377        server: Arc<Server>,
4378        foreground: Rc<executor::Foreground>,
4379        notifications: mpsc::UnboundedReceiver<()>,
4380        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4381        forbid_connections: Arc<AtomicBool>,
4382        _test_db: TestDb,
4383    }
4384
4385    impl TestServer {
4386        async fn start(
4387            foreground: Rc<executor::Foreground>,
4388            background: Arc<executor::Background>,
4389        ) -> Self {
4390            let test_db = TestDb::fake(background);
4391            let app_state = Self::build_app_state(&test_db).await;
4392            let peer = Peer::new();
4393            let notifications = mpsc::unbounded();
4394            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4395            Self {
4396                peer,
4397                app_state,
4398                server,
4399                foreground,
4400                notifications: notifications.1,
4401                connection_killers: Default::default(),
4402                forbid_connections: Default::default(),
4403                _test_db: test_db,
4404            }
4405        }
4406
4407        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4408            cx.update(|cx| {
4409                let settings = Settings::test(cx);
4410                cx.set_global(settings);
4411            });
4412
4413            let http = FakeHttpClient::with_404_response();
4414            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4415            let client_name = name.to_string();
4416            let mut client = Client::new(http.clone());
4417            let server = self.server.clone();
4418            let connection_killers = self.connection_killers.clone();
4419            let forbid_connections = self.forbid_connections.clone();
4420            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4421
4422            Arc::get_mut(&mut client)
4423                .unwrap()
4424                .override_authenticate(move |cx| {
4425                    cx.spawn(|_| async move {
4426                        let access_token = "the-token".to_string();
4427                        Ok(Credentials {
4428                            user_id: user_id.0 as u64,
4429                            access_token,
4430                        })
4431                    })
4432                })
4433                .override_establish_connection(move |credentials, cx| {
4434                    assert_eq!(credentials.user_id, user_id.0 as u64);
4435                    assert_eq!(credentials.access_token, "the-token");
4436
4437                    let server = server.clone();
4438                    let connection_killers = connection_killers.clone();
4439                    let forbid_connections = forbid_connections.clone();
4440                    let client_name = client_name.clone();
4441                    let connection_id_tx = connection_id_tx.clone();
4442                    cx.spawn(move |cx| async move {
4443                        if forbid_connections.load(SeqCst) {
4444                            Err(EstablishConnectionError::other(anyhow!(
4445                                "server is forbidding connections"
4446                            )))
4447                        } else {
4448                            let (client_conn, server_conn, kill_conn) =
4449                                Connection::in_memory(cx.background());
4450                            connection_killers.lock().insert(user_id, kill_conn);
4451                            cx.background()
4452                                .spawn(server.handle_connection(
4453                                    server_conn,
4454                                    client_name,
4455                                    user_id,
4456                                    Some(connection_id_tx),
4457                                    cx.background(),
4458                                ))
4459                                .detach();
4460                            Ok(client_conn)
4461                        }
4462                    })
4463                });
4464
4465            client
4466                .authenticate_and_connect(&cx.to_async())
4467                .await
4468                .unwrap();
4469
4470            Channel::init(&client);
4471            Project::init(&client);
4472
4473            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4474            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4475
4476            let client = TestClient {
4477                client,
4478                peer_id,
4479                user_store,
4480                project: Default::default(),
4481                buffers: Default::default(),
4482            };
4483            client.wait_for_current_user(cx).await;
4484            client
4485        }
4486
4487        fn disconnect_client(&self, user_id: UserId) {
4488            self.connection_killers.lock().remove(&user_id);
4489        }
4490
4491        fn forbid_connections(&self) {
4492            self.forbid_connections.store(true, SeqCst);
4493        }
4494
4495        fn allow_connections(&self) {
4496            self.forbid_connections.store(false, SeqCst);
4497        }
4498
4499        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4500            let mut config = Config::default();
4501            config.session_secret = "a".repeat(32);
4502            config.database_url = test_db.url.clone();
4503            let github_client = github::AppClient::test();
4504            Arc::new(AppState {
4505                db: test_db.db().clone(),
4506                handlebars: Default::default(),
4507                auth_client: auth::build_client("", ""),
4508                repo_client: github::RepoClient::test(&github_client),
4509                github_client,
4510                config,
4511            })
4512        }
4513
4514        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4515            self.server.store.read()
4516        }
4517
4518        async fn condition<F>(&mut self, mut predicate: F)
4519        where
4520            F: FnMut(&Store) -> bool,
4521        {
4522            async_std::future::timeout(Duration::from_millis(500), async {
4523                while !(predicate)(&*self.server.store.read()) {
4524                    self.foreground.start_waiting();
4525                    self.notifications.next().await;
4526                    self.foreground.finish_waiting();
4527                }
4528            })
4529            .await
4530            .expect("condition timed out");
4531        }
4532    }
4533
4534    impl Drop for TestServer {
4535        fn drop(&mut self) {
4536            self.peer.reset();
4537        }
4538    }
4539
4540    struct TestClient {
4541        client: Arc<Client>,
4542        pub peer_id: PeerId,
4543        pub user_store: ModelHandle<UserStore>,
4544        project: Option<ModelHandle<Project>>,
4545        buffers: HashSet<ModelHandle<language::Buffer>>,
4546    }
4547
4548    impl Deref for TestClient {
4549        type Target = Arc<Client>;
4550
4551        fn deref(&self) -> &Self::Target {
4552            &self.client
4553        }
4554    }
4555
4556    impl TestClient {
4557        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4558            UserId::from_proto(
4559                self.user_store
4560                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4561            )
4562        }
4563
4564        async fn wait_for_current_user(&self, cx: &TestAppContext) {
4565            let mut authed_user = self
4566                .user_store
4567                .read_with(cx, |user_store, _| user_store.watch_current_user());
4568            while authed_user.next().await.unwrap().is_none() {}
4569        }
4570
4571        fn simulate_host(
4572            mut self,
4573            project: ModelHandle<Project>,
4574            mut language_server_config: LanguageServerConfig,
4575            operations: Rc<Cell<usize>>,
4576            max_operations: usize,
4577            rng: Arc<Mutex<StdRng>>,
4578            mut cx: TestAppContext,
4579        ) -> impl Future<Output = (Self, TestAppContext)> {
4580            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4581
4582            // Set up a fake language server.
4583            language_server_config.set_fake_initializer({
4584                let rng = rng.clone();
4585                let files = files.clone();
4586                let project = project.downgrade();
4587                move |fake_server| {
4588                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4589                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4590                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4591                                range: lsp::Range::new(
4592                                    lsp::Position::new(0, 0),
4593                                    lsp::Position::new(0, 0),
4594                                ),
4595                                new_text: "the-new-text".to_string(),
4596                            })),
4597                            ..Default::default()
4598                        }]))
4599                    });
4600
4601                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4602                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4603                            lsp::CodeAction {
4604                                title: "the-code-action".to_string(),
4605                                ..Default::default()
4606                            },
4607                        )])
4608                    });
4609
4610                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4611                        |params, _| {
4612                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4613                                params.position,
4614                                params.position,
4615                            )))
4616                        },
4617                    );
4618
4619                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4620                        let files = files.clone();
4621                        let rng = rng.clone();
4622                        move |_, _| {
4623                            let files = files.lock();
4624                            let mut rng = rng.lock();
4625                            let count = rng.gen_range::<usize, _>(1..3);
4626                            let files = (0..count)
4627                                .map(|_| files.choose(&mut *rng).unwrap())
4628                                .collect::<Vec<_>>();
4629                            log::info!("LSP: Returning definitions in files {:?}", &files);
4630                            Some(lsp::GotoDefinitionResponse::Array(
4631                                files
4632                                    .into_iter()
4633                                    .map(|file| lsp::Location {
4634                                        uri: lsp::Url::from_file_path(file).unwrap(),
4635                                        range: Default::default(),
4636                                    })
4637                                    .collect(),
4638                            ))
4639                        }
4640                    });
4641
4642                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4643                        let rng = rng.clone();
4644                        let project = project.clone();
4645                        move |params, mut cx| {
4646                            if let Some(project) = project.upgrade(&cx) {
4647                                project.update(&mut cx, |project, cx| {
4648                                    let path = params
4649                                        .text_document_position_params
4650                                        .text_document
4651                                        .uri
4652                                        .to_file_path()
4653                                        .unwrap();
4654                                    let (worktree, relative_path) =
4655                                        project.find_local_worktree(&path, cx)?;
4656                                    let project_path =
4657                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4658                                    let buffer =
4659                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4660
4661                                    let mut highlights = Vec::new();
4662                                    let highlight_count = rng.lock().gen_range(1..=5);
4663                                    let mut prev_end = 0;
4664                                    for _ in 0..highlight_count {
4665                                        let range =
4666                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4667                                        let start = buffer
4668                                            .offset_to_point_utf16(range.start)
4669                                            .to_lsp_position();
4670                                        let end = buffer
4671                                            .offset_to_point_utf16(range.end)
4672                                            .to_lsp_position();
4673                                        highlights.push(lsp::DocumentHighlight {
4674                                            range: lsp::Range::new(start, end),
4675                                            kind: Some(lsp::DocumentHighlightKind::READ),
4676                                        });
4677                                        prev_end = range.end;
4678                                    }
4679                                    Some(highlights)
4680                                })
4681                            } else {
4682                                None
4683                            }
4684                        }
4685                    });
4686                }
4687            });
4688
4689            project.update(&mut cx, |project, _| {
4690                project.languages().add(Arc::new(Language::new(
4691                    LanguageConfig {
4692                        name: "Rust".into(),
4693                        path_suffixes: vec!["rs".to_string()],
4694                        language_server: Some(language_server_config),
4695                        ..Default::default()
4696                    },
4697                    None,
4698                )));
4699            });
4700
4701            async move {
4702                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4703                while operations.get() < max_operations {
4704                    operations.set(operations.get() + 1);
4705
4706                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4707                    match distribution {
4708                        0..=20 if !files.lock().is_empty() => {
4709                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4710                            let mut path = path.as_path();
4711                            while let Some(parent_path) = path.parent() {
4712                                path = parent_path;
4713                                if rng.lock().gen() {
4714                                    break;
4715                                }
4716                            }
4717
4718                            log::info!("Host: find/create local worktree {:?}", path);
4719                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4720                                project.find_or_create_local_worktree(path, true, cx)
4721                            });
4722                            let find_or_create_worktree = async move {
4723                                find_or_create_worktree.await.unwrap();
4724                            };
4725                            if rng.lock().gen() {
4726                                cx.background().spawn(find_or_create_worktree).detach();
4727                            } else {
4728                                find_or_create_worktree.await;
4729                            }
4730                        }
4731                        10..=80 if !files.lock().is_empty() => {
4732                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4733                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4734                                let (worktree, path) = project
4735                                    .update(&mut cx, |project, cx| {
4736                                        project.find_or_create_local_worktree(
4737                                            file.clone(),
4738                                            true,
4739                                            cx,
4740                                        )
4741                                    })
4742                                    .await
4743                                    .unwrap();
4744                                let project_path =
4745                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4746                                log::info!(
4747                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4748                                    file,
4749                                    project_path.0,
4750                                    project_path.1
4751                                );
4752                                let buffer = project
4753                                    .update(&mut cx, |project, cx| {
4754                                        project.open_buffer(project_path, cx)
4755                                    })
4756                                    .await
4757                                    .unwrap();
4758                                self.buffers.insert(buffer.clone());
4759                                buffer
4760                            } else {
4761                                self.buffers
4762                                    .iter()
4763                                    .choose(&mut *rng.lock())
4764                                    .unwrap()
4765                                    .clone()
4766                            };
4767
4768                            if rng.lock().gen_bool(0.1) {
4769                                cx.update(|cx| {
4770                                    log::info!(
4771                                        "Host: dropping buffer {:?}",
4772                                        buffer.read(cx).file().unwrap().full_path(cx)
4773                                    );
4774                                    self.buffers.remove(&buffer);
4775                                    drop(buffer);
4776                                });
4777                            } else {
4778                                buffer.update(&mut cx, |buffer, cx| {
4779                                    log::info!(
4780                                        "Host: updating buffer {:?} ({})",
4781                                        buffer.file().unwrap().full_path(cx),
4782                                        buffer.remote_id()
4783                                    );
4784                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4785                                });
4786                            }
4787                        }
4788                        _ => loop {
4789                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4790                            let mut path = PathBuf::new();
4791                            path.push("/");
4792                            for _ in 0..path_component_count {
4793                                let letter = rng.lock().gen_range(b'a'..=b'z');
4794                                path.push(std::str::from_utf8(&[letter]).unwrap());
4795                            }
4796                            path.set_extension("rs");
4797                            let parent_path = path.parent().unwrap();
4798
4799                            log::info!("Host: creating file {:?}", path,);
4800
4801                            if fs.create_dir(&parent_path).await.is_ok()
4802                                && fs.create_file(&path, Default::default()).await.is_ok()
4803                            {
4804                                files.lock().push(path);
4805                                break;
4806                            } else {
4807                                log::info!("Host: cannot create file");
4808                            }
4809                        },
4810                    }
4811
4812                    cx.background().simulate_random_delay().await;
4813                }
4814
4815                log::info!("Host done");
4816
4817                self.project = Some(project);
4818                (self, cx)
4819            }
4820        }
4821
4822        pub async fn simulate_guest(
4823            mut self,
4824            guest_id: usize,
4825            project: ModelHandle<Project>,
4826            operations: Rc<Cell<usize>>,
4827            max_operations: usize,
4828            rng: Arc<Mutex<StdRng>>,
4829            mut cx: TestAppContext,
4830        ) -> (Self, TestAppContext) {
4831            while operations.get() < max_operations {
4832                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4833                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4834                        project
4835                            .worktrees(&cx)
4836                            .filter(|worktree| {
4837                                let worktree = worktree.read(cx);
4838                                worktree.is_visible()
4839                                    && worktree.entries(false).any(|e| e.is_file())
4840                            })
4841                            .choose(&mut *rng.lock())
4842                    }) {
4843                        worktree
4844                    } else {
4845                        cx.background().simulate_random_delay().await;
4846                        continue;
4847                    };
4848
4849                    operations.set(operations.get() + 1);
4850                    let (worktree_root_name, project_path) =
4851                        worktree.read_with(&cx, |worktree, _| {
4852                            let entry = worktree
4853                                .entries(false)
4854                                .filter(|e| e.is_file())
4855                                .choose(&mut *rng.lock())
4856                                .unwrap();
4857                            (
4858                                worktree.root_name().to_string(),
4859                                (worktree.id(), entry.path.clone()),
4860                            )
4861                        });
4862                    log::info!(
4863                        "Guest {}: opening path {:?} in worktree {} ({})",
4864                        guest_id,
4865                        project_path.1,
4866                        project_path.0,
4867                        worktree_root_name,
4868                    );
4869                    let buffer = project
4870                        .update(&mut cx, |project, cx| {
4871                            project.open_buffer(project_path.clone(), cx)
4872                        })
4873                        .await
4874                        .unwrap();
4875                    log::info!(
4876                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4877                        guest_id,
4878                        project_path.1,
4879                        project_path.0,
4880                        worktree_root_name,
4881                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4882                    );
4883                    self.buffers.insert(buffer.clone());
4884                    buffer
4885                } else {
4886                    operations.set(operations.get() + 1);
4887
4888                    self.buffers
4889                        .iter()
4890                        .choose(&mut *rng.lock())
4891                        .unwrap()
4892                        .clone()
4893                };
4894
4895                let choice = rng.lock().gen_range(0..100);
4896                match choice {
4897                    0..=9 => {
4898                        cx.update(|cx| {
4899                            log::info!(
4900                                "Guest {}: dropping buffer {:?}",
4901                                guest_id,
4902                                buffer.read(cx).file().unwrap().full_path(cx)
4903                            );
4904                            self.buffers.remove(&buffer);
4905                            drop(buffer);
4906                        });
4907                    }
4908                    10..=19 => {
4909                        let completions = project.update(&mut cx, |project, cx| {
4910                            log::info!(
4911                                "Guest {}: requesting completions for buffer {} ({:?})",
4912                                guest_id,
4913                                buffer.read(cx).remote_id(),
4914                                buffer.read(cx).file().unwrap().full_path(cx)
4915                            );
4916                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4917                            project.completions(&buffer, offset, cx)
4918                        });
4919                        let completions = cx.background().spawn(async move {
4920                            completions.await.expect("completions request failed");
4921                        });
4922                        if rng.lock().gen_bool(0.3) {
4923                            log::info!("Guest {}: detaching completions request", guest_id);
4924                            completions.detach();
4925                        } else {
4926                            completions.await;
4927                        }
4928                    }
4929                    20..=29 => {
4930                        let code_actions = project.update(&mut cx, |project, cx| {
4931                            log::info!(
4932                                "Guest {}: requesting code actions for buffer {} ({:?})",
4933                                guest_id,
4934                                buffer.read(cx).remote_id(),
4935                                buffer.read(cx).file().unwrap().full_path(cx)
4936                            );
4937                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4938                            project.code_actions(&buffer, range, cx)
4939                        });
4940                        let code_actions = cx.background().spawn(async move {
4941                            code_actions.await.expect("code actions request failed");
4942                        });
4943                        if rng.lock().gen_bool(0.3) {
4944                            log::info!("Guest {}: detaching code actions request", guest_id);
4945                            code_actions.detach();
4946                        } else {
4947                            code_actions.await;
4948                        }
4949                    }
4950                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4951                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4952                            log::info!(
4953                                "Guest {}: saving buffer {} ({:?})",
4954                                guest_id,
4955                                buffer.remote_id(),
4956                                buffer.file().unwrap().full_path(cx)
4957                            );
4958                            (buffer.version(), buffer.save(cx))
4959                        });
4960                        let save = cx.background().spawn(async move {
4961                            let (saved_version, _) = save.await.expect("save request failed");
4962                            assert!(saved_version.observed_all(&requested_version));
4963                        });
4964                        if rng.lock().gen_bool(0.3) {
4965                            log::info!("Guest {}: detaching save request", guest_id);
4966                            save.detach();
4967                        } else {
4968                            save.await;
4969                        }
4970                    }
4971                    40..=44 => {
4972                        let prepare_rename = project.update(&mut cx, |project, cx| {
4973                            log::info!(
4974                                "Guest {}: preparing rename for buffer {} ({:?})",
4975                                guest_id,
4976                                buffer.read(cx).remote_id(),
4977                                buffer.read(cx).file().unwrap().full_path(cx)
4978                            );
4979                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4980                            project.prepare_rename(buffer, offset, cx)
4981                        });
4982                        let prepare_rename = cx.background().spawn(async move {
4983                            prepare_rename.await.expect("prepare rename request failed");
4984                        });
4985                        if rng.lock().gen_bool(0.3) {
4986                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4987                            prepare_rename.detach();
4988                        } else {
4989                            prepare_rename.await;
4990                        }
4991                    }
4992                    45..=49 => {
4993                        let definitions = project.update(&mut cx, |project, cx| {
4994                            log::info!(
4995                                "Guest {}: requesting definitions for buffer {} ({:?})",
4996                                guest_id,
4997                                buffer.read(cx).remote_id(),
4998                                buffer.read(cx).file().unwrap().full_path(cx)
4999                            );
5000                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5001                            project.definition(&buffer, offset, cx)
5002                        });
5003                        let definitions = cx.background().spawn(async move {
5004                            definitions.await.expect("definitions request failed")
5005                        });
5006                        if rng.lock().gen_bool(0.3) {
5007                            log::info!("Guest {}: detaching definitions request", guest_id);
5008                            definitions.detach();
5009                        } else {
5010                            self.buffers
5011                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5012                        }
5013                    }
5014                    50..=54 => {
5015                        let highlights = project.update(&mut cx, |project, cx| {
5016                            log::info!(
5017                                "Guest {}: requesting highlights for buffer {} ({:?})",
5018                                guest_id,
5019                                buffer.read(cx).remote_id(),
5020                                buffer.read(cx).file().unwrap().full_path(cx)
5021                            );
5022                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5023                            project.document_highlights(&buffer, offset, cx)
5024                        });
5025                        let highlights = cx.background().spawn(async move {
5026                            highlights.await.expect("highlights request failed");
5027                        });
5028                        if rng.lock().gen_bool(0.3) {
5029                            log::info!("Guest {}: detaching highlights request", guest_id);
5030                            highlights.detach();
5031                        } else {
5032                            highlights.await;
5033                        }
5034                    }
5035                    55..=59 => {
5036                        let search = project.update(&mut cx, |project, cx| {
5037                            let query = rng.lock().gen_range('a'..='z');
5038                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5039                            project.search(SearchQuery::text(query, false, false), cx)
5040                        });
5041                        let search = cx
5042                            .background()
5043                            .spawn(async move { search.await.expect("search request failed") });
5044                        if rng.lock().gen_bool(0.3) {
5045                            log::info!("Guest {}: detaching search request", guest_id);
5046                            search.detach();
5047                        } else {
5048                            self.buffers.extend(search.await.into_keys());
5049                        }
5050                    }
5051                    _ => {
5052                        buffer.update(&mut cx, |buffer, cx| {
5053                            log::info!(
5054                                "Guest {}: updating buffer {} ({:?})",
5055                                guest_id,
5056                                buffer.remote_id(),
5057                                buffer.file().unwrap().full_path(cx)
5058                            );
5059                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5060                        });
5061                    }
5062                }
5063                cx.background().simulate_random_delay().await;
5064            }
5065
5066            log::info!("Guest {} done", guest_id);
5067
5068            self.project = Some(project);
5069            (self, cx)
5070        }
5071    }
5072
5073    impl Drop for TestClient {
5074        fn drop(&mut self) {
5075            self.client.tear_down();
5076        }
5077    }
5078
5079    impl Executor for Arc<gpui::executor::Background> {
5080        type Timer = gpui::executor::Timer;
5081
5082        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5083            self.spawn(future).detach();
5084        }
5085
5086        fn timer(&self, duration: Duration) -> Self::Timer {
5087            self.as_ref().timer(duration)
5088        }
5089    }
5090
5091    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5092        channel
5093            .messages()
5094            .cursor::<()>()
5095            .map(|m| {
5096                (
5097                    m.sender.github_login.clone(),
5098                    m.body.clone(),
5099                    m.is_pending(),
5100                )
5101            })
5102            .collect()
5103    }
5104
5105    struct EmptyView;
5106
5107    impl gpui::Entity for EmptyView {
5108        type Event = ();
5109    }
5110
5111    impl gpui::View for EmptyView {
5112        fn ui_name() -> &'static str {
5113            "empty view"
5114        }
5115
5116        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5117            gpui::Element::boxed(gpui::elements::Empty)
5118        }
5119    }
5120}