rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::start_language_server)
  87            .add_message_handler(Server::update_language_server)
  88            .add_message_handler(Server::update_diagnostic_summary)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  96            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  97            .add_request_handler(
  98                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  99            )
 100            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 101            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 102            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 105            .add_request_handler(Server::update_buffer)
 106            .add_message_handler(Server::update_buffer_file)
 107            .add_message_handler(Server::buffer_reloaded)
 108            .add_message_handler(Server::buffer_saved)
 109            .add_request_handler(Server::save_buffer)
 110            .add_request_handler(Server::get_channels)
 111            .add_request_handler(Server::get_users)
 112            .add_request_handler(Server::join_channel)
 113            .add_message_handler(Server::leave_channel)
 114            .add_request_handler(Server::send_channel_message)
 115            .add_request_handler(Server::get_channel_messages);
 116
 117        Arc::new(server)
 118    }
 119
 120    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 121    where
 122        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 123        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 124        M: EnvelopedMessage,
 125    {
 126        let prev_handler = self.handlers.insert(
 127            TypeId::of::<M>(),
 128            Box::new(move |server, envelope| {
 129                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 130                (handler)(server, *envelope).boxed()
 131            }),
 132        );
 133        if prev_handler.is_some() {
 134            panic!("registered a handler for the same message twice");
 135        }
 136        self
 137    }
 138
 139    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 140    where
 141        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 142        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 143        M: RequestMessage,
 144    {
 145        self.add_message_handler(move |server, envelope| {
 146            let receipt = envelope.receipt();
 147            let response = (handler)(server.clone(), envelope);
 148            async move {
 149                match response.await {
 150                    Ok(response) => {
 151                        server.peer.respond(receipt, response)?;
 152                        Ok(())
 153                    }
 154                    Err(error) => {
 155                        server.peer.respond_with_error(
 156                            receipt,
 157                            proto::Error {
 158                                message: error.to_string(),
 159                            },
 160                        )?;
 161                        Err(error)
 162                    }
 163                }
 164            }
 165        })
 166    }
 167
 168    pub fn handle_connection<E: Executor>(
 169        self: &Arc<Self>,
 170        connection: Connection,
 171        addr: String,
 172        user_id: UserId,
 173        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 174        executor: E,
 175    ) -> impl Future<Output = ()> {
 176        let mut this = self.clone();
 177        async move {
 178            let (connection_id, handle_io, mut incoming_rx) = this
 179                .peer
 180                .add_connection(connection, {
 181                    let executor = executor.clone();
 182                    move |duration| {
 183                        let timer = executor.timer(duration);
 184                        async move {
 185                            timer.await;
 186                        }
 187                    }
 188                })
 189                .await;
 190
 191            if let Some(send_connection_id) = send_connection_id.as_mut() {
 192                let _ = send_connection_id.send(connection_id).await;
 193            }
 194
 195            this.state_mut().add_connection(connection_id, user_id);
 196            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 197                log::error!("error updating contacts for {:?}: {}", user_id, err);
 198            }
 199
 200            let handle_io = handle_io.fuse();
 201            futures::pin_mut!(handle_io);
 202            loop {
 203                let next_message = incoming_rx.next().fuse();
 204                futures::pin_mut!(next_message);
 205                futures::select_biased! {
 206                    result = handle_io => {
 207                        if let Err(err) = result {
 208                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 209                        }
 210                        break;
 211                    }
 212                    message = next_message => {
 213                        if let Some(message) = message {
 214                            let start_time = Instant::now();
 215                            let type_name = message.payload_type_name();
 216                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 217                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 218                                let notifications = this.notifications.clone();
 219                                let is_background = message.is_background();
 220                                let handle_message = (handler)(this.clone(), message);
 221                                let handle_message = async move {
 222                                    if let Err(err) = handle_message.await {
 223                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 224                                    } else {
 225                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 226                                    }
 227                                    if let Some(mut notifications) = notifications {
 228                                        let _ = notifications.send(()).await;
 229                                    }
 230                                };
 231                                if is_background {
 232                                    executor.spawn_detached(handle_message);
 233                                } else {
 234                                    handle_message.await;
 235                                }
 236                            } else {
 237                                log::warn!("unhandled message: {}", type_name);
 238                            }
 239                        } else {
 240                            log::info!("rpc connection closed {:?}", addr);
 241                            break;
 242                        }
 243                    }
 244                }
 245            }
 246
 247            if let Err(err) = this.sign_out(connection_id).await {
 248                log::error!("error signing out connection {:?} - {:?}", addr, err);
 249            }
 250        }
 251    }
 252
 253    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 254        self.peer.disconnect(connection_id);
 255        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 256
 257        for (project_id, project) in removed_connection.hosted_projects {
 258            if let Some(share) = project.share {
 259                broadcast(
 260                    connection_id,
 261                    share.guests.keys().copied().collect(),
 262                    |conn_id| {
 263                        self.peer
 264                            .send(conn_id, proto::UnshareProject { project_id })
 265                    },
 266                )?;
 267            }
 268        }
 269
 270        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 271            broadcast(connection_id, peer_ids, |conn_id| {
 272                self.peer.send(
 273                    conn_id,
 274                    proto::RemoveProjectCollaborator {
 275                        project_id,
 276                        peer_id: connection_id.0,
 277                    },
 278                )
 279            })?;
 280        }
 281
 282        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 283        Ok(())
 284    }
 285
 286    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 287        Ok(proto::Ack {})
 288    }
 289
 290    async fn register_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::RegisterProject>,
 293    ) -> tide::Result<proto::RegisterProjectResponse> {
 294        let project_id = {
 295            let mut state = self.state_mut();
 296            let user_id = state.user_id_for_connection(request.sender_id)?;
 297            state.register_project(request.sender_id, user_id)
 298        };
 299        Ok(proto::RegisterProjectResponse { project_id })
 300    }
 301
 302    async fn unregister_project(
 303        mut self: Arc<Server>,
 304        request: TypedEnvelope<proto::UnregisterProject>,
 305    ) -> tide::Result<()> {
 306        let project = self
 307            .state_mut()
 308            .unregister_project(request.payload.project_id, request.sender_id)?;
 309        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 310        Ok(())
 311    }
 312
 313    async fn share_project(
 314        mut self: Arc<Server>,
 315        request: TypedEnvelope<proto::ShareProject>,
 316    ) -> tide::Result<proto::Ack> {
 317        self.state_mut()
 318            .share_project(request.payload.project_id, request.sender_id);
 319        Ok(proto::Ack {})
 320    }
 321
 322    async fn unshare_project(
 323        mut self: Arc<Server>,
 324        request: TypedEnvelope<proto::UnshareProject>,
 325    ) -> tide::Result<()> {
 326        let project_id = request.payload.project_id;
 327        let project = self
 328            .state_mut()
 329            .unshare_project(project_id, request.sender_id)?;
 330
 331        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 332            self.peer
 333                .send(conn_id, proto::UnshareProject { project_id })
 334        })?;
 335        self.update_contacts_for_users(&project.authorized_user_ids)?;
 336        Ok(())
 337    }
 338
 339    async fn join_project(
 340        mut self: Arc<Server>,
 341        request: TypedEnvelope<proto::JoinProject>,
 342    ) -> tide::Result<proto::JoinProjectResponse> {
 343        let project_id = request.payload.project_id;
 344
 345        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 346        let (response, connection_ids, contact_user_ids) = self
 347            .state_mut()
 348            .join_project(request.sender_id, user_id, project_id)
 349            .and_then(|joined| {
 350                let share = joined.project.share()?;
 351                let peer_count = share.guests.len();
 352                let mut collaborators = Vec::with_capacity(peer_count);
 353                collaborators.push(proto::Collaborator {
 354                    peer_id: joined.project.host_connection_id.0,
 355                    replica_id: 0,
 356                    user_id: joined.project.host_user_id.to_proto(),
 357                });
 358                let worktrees = share
 359                    .worktrees
 360                    .iter()
 361                    .filter_map(|(id, shared_worktree)| {
 362                        let worktree = joined.project.worktrees.get(&id)?;
 363                        Some(proto::Worktree {
 364                            id: *id,
 365                            root_name: worktree.root_name.clone(),
 366                            entries: shared_worktree.entries.values().cloned().collect(),
 367                            diagnostic_summaries: shared_worktree
 368                                .diagnostic_summaries
 369                                .values()
 370                                .cloned()
 371                                .collect(),
 372                            visible: worktree.visible,
 373                        })
 374                    })
 375                    .collect();
 376                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 377                    if *peer_conn_id != request.sender_id {
 378                        collaborators.push(proto::Collaborator {
 379                            peer_id: peer_conn_id.0,
 380                            replica_id: *peer_replica_id as u32,
 381                            user_id: peer_user_id.to_proto(),
 382                        });
 383                    }
 384                }
 385                let response = proto::JoinProjectResponse {
 386                    worktrees,
 387                    replica_id: joined.replica_id as u32,
 388                    collaborators,
 389                    language_servers: joined.project.language_servers.clone(),
 390                };
 391                let connection_ids = joined.project.connection_ids();
 392                let contact_user_ids = joined.project.authorized_user_ids();
 393                Ok((response, connection_ids, contact_user_ids))
 394            })?;
 395
 396        broadcast(request.sender_id, connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::AddProjectCollaborator {
 400                    project_id,
 401                    collaborator: Some(proto::Collaborator {
 402                        peer_id: request.sender_id.0,
 403                        replica_id: response.replica_id,
 404                        user_id: user_id.to_proto(),
 405                    }),
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&contact_user_ids)?;
 410        Ok(response)
 411    }
 412
 413    async fn leave_project(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::LeaveProject>,
 416    ) -> tide::Result<()> {
 417        let sender_id = request.sender_id;
 418        let project_id = request.payload.project_id;
 419        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 420
 421        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 422            self.peer.send(
 423                conn_id,
 424                proto::RemoveProjectCollaborator {
 425                    project_id,
 426                    peer_id: sender_id.0,
 427                },
 428            )
 429        })?;
 430        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 431
 432        Ok(())
 433    }
 434
 435    async fn register_worktree(
 436        mut self: Arc<Server>,
 437        request: TypedEnvelope<proto::RegisterWorktree>,
 438    ) -> tide::Result<proto::Ack> {
 439        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 440
 441        let mut contact_user_ids = HashSet::default();
 442        contact_user_ids.insert(host_user_id);
 443        for github_login in &request.payload.authorized_logins {
 444            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 445            contact_user_ids.insert(contact_user_id);
 446        }
 447
 448        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 449        let guest_connection_ids;
 450        {
 451            let mut state = self.state_mut();
 452            guest_connection_ids = state
 453                .read_project(request.payload.project_id, request.sender_id)?
 454                .guest_connection_ids();
 455            state.register_worktree(
 456                request.payload.project_id,
 457                request.payload.worktree_id,
 458                request.sender_id,
 459                Worktree {
 460                    authorized_user_ids: contact_user_ids.clone(),
 461                    root_name: request.payload.root_name.clone(),
 462                    visible: request.payload.visible,
 463                },
 464            )?;
 465        }
 466        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 467            self.peer
 468                .forward_send(request.sender_id, connection_id, request.payload.clone())
 469        })?;
 470        self.update_contacts_for_users(&contact_user_ids)?;
 471        Ok(proto::Ack {})
 472    }
 473
 474    async fn unregister_worktree(
 475        mut self: Arc<Server>,
 476        request: TypedEnvelope<proto::UnregisterWorktree>,
 477    ) -> tide::Result<()> {
 478        let project_id = request.payload.project_id;
 479        let worktree_id = request.payload.worktree_id;
 480        let (worktree, guest_connection_ids) =
 481            self.state_mut()
 482                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 483        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 484            self.peer.send(
 485                conn_id,
 486                proto::UnregisterWorktree {
 487                    project_id,
 488                    worktree_id,
 489                },
 490            )
 491        })?;
 492        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 493        Ok(())
 494    }
 495
 496    async fn update_worktree(
 497        mut self: Arc<Server>,
 498        request: TypedEnvelope<proto::UpdateWorktree>,
 499    ) -> tide::Result<proto::Ack> {
 500        let connection_ids = self.state_mut().update_worktree(
 501            request.sender_id,
 502            request.payload.project_id,
 503            request.payload.worktree_id,
 504            &request.payload.removed_entries,
 505            &request.payload.updated_entries,
 506        )?;
 507
 508        broadcast(request.sender_id, connection_ids, |connection_id| {
 509            self.peer
 510                .forward_send(request.sender_id, connection_id, request.payload.clone())
 511        })?;
 512
 513        Ok(proto::Ack {})
 514    }
 515
 516    async fn update_diagnostic_summary(
 517        mut self: Arc<Server>,
 518        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 519    ) -> tide::Result<()> {
 520        let summary = request
 521            .payload
 522            .summary
 523            .clone()
 524            .ok_or_else(|| anyhow!("invalid summary"))?;
 525        let receiver_ids = self.state_mut().update_diagnostic_summary(
 526            request.payload.project_id,
 527            request.payload.worktree_id,
 528            request.sender_id,
 529            summary,
 530        )?;
 531
 532        broadcast(request.sender_id, receiver_ids, |connection_id| {
 533            self.peer
 534                .forward_send(request.sender_id, connection_id, request.payload.clone())
 535        })?;
 536        Ok(())
 537    }
 538
 539    async fn start_language_server(
 540        mut self: Arc<Server>,
 541        request: TypedEnvelope<proto::StartLanguageServer>,
 542    ) -> tide::Result<()> {
 543        let receiver_ids = self.state_mut().start_language_server(
 544            request.payload.project_id,
 545            request.sender_id,
 546            request
 547                .payload
 548                .server
 549                .clone()
 550                .ok_or_else(|| anyhow!("invalid language server"))?,
 551        )?;
 552        broadcast(request.sender_id, receiver_ids, |connection_id| {
 553            self.peer
 554                .forward_send(request.sender_id, connection_id, request.payload.clone())
 555        })?;
 556        Ok(())
 557    }
 558
 559    async fn update_language_server(
 560        self: Arc<Server>,
 561        request: TypedEnvelope<proto::UpdateLanguageServer>,
 562    ) -> tide::Result<()> {
 563        let receiver_ids = self
 564            .state()
 565            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn forward_project_request<T>(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<T>,
 576    ) -> tide::Result<T::Response>
 577    where
 578        T: EntityMessage + RequestMessage,
 579    {
 580        let host_connection_id = self
 581            .state()
 582            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 583            .host_connection_id;
 584        Ok(self
 585            .peer
 586            .forward_request(request.sender_id, host_connection_id, request.payload)
 587            .await?)
 588    }
 589
 590    async fn save_buffer(
 591        self: Arc<Server>,
 592        request: TypedEnvelope<proto::SaveBuffer>,
 593    ) -> tide::Result<proto::BufferSaved> {
 594        let host;
 595        let mut guests;
 596        {
 597            let state = self.state();
 598            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 599            host = project.host_connection_id;
 600            guests = project.guest_connection_ids()
 601        }
 602
 603        let response = self
 604            .peer
 605            .forward_request(request.sender_id, host, request.payload.clone())
 606            .await?;
 607
 608        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 609        broadcast(host, guests, |conn_id| {
 610            self.peer.forward_send(host, conn_id, response.clone())
 611        })?;
 612
 613        Ok(response)
 614    }
 615
 616    async fn update_buffer(
 617        self: Arc<Server>,
 618        request: TypedEnvelope<proto::UpdateBuffer>,
 619    ) -> tide::Result<proto::Ack> {
 620        let receiver_ids = self
 621            .state()
 622            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 623        broadcast(request.sender_id, receiver_ids, |connection_id| {
 624            self.peer
 625                .forward_send(request.sender_id, connection_id, request.payload.clone())
 626        })?;
 627        Ok(proto::Ack {})
 628    }
 629
 630    async fn update_buffer_file(
 631        self: Arc<Server>,
 632        request: TypedEnvelope<proto::UpdateBufferFile>,
 633    ) -> tide::Result<()> {
 634        let receiver_ids = self
 635            .state()
 636            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 637        broadcast(request.sender_id, receiver_ids, |connection_id| {
 638            self.peer
 639                .forward_send(request.sender_id, connection_id, request.payload.clone())
 640        })?;
 641        Ok(())
 642    }
 643
 644    async fn buffer_reloaded(
 645        self: Arc<Server>,
 646        request: TypedEnvelope<proto::BufferReloaded>,
 647    ) -> tide::Result<()> {
 648        let receiver_ids = self
 649            .state()
 650            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 651        broadcast(request.sender_id, receiver_ids, |connection_id| {
 652            self.peer
 653                .forward_send(request.sender_id, connection_id, request.payload.clone())
 654        })?;
 655        Ok(())
 656    }
 657
 658    async fn buffer_saved(
 659        self: Arc<Server>,
 660        request: TypedEnvelope<proto::BufferSaved>,
 661    ) -> tide::Result<()> {
 662        let receiver_ids = self
 663            .state()
 664            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 665        broadcast(request.sender_id, receiver_ids, |connection_id| {
 666            self.peer
 667                .forward_send(request.sender_id, connection_id, request.payload.clone())
 668        })?;
 669        Ok(())
 670    }
 671
 672    async fn get_channels(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::GetChannels>,
 675    ) -> tide::Result<proto::GetChannelsResponse> {
 676        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 677        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 678        Ok(proto::GetChannelsResponse {
 679            channels: channels
 680                .into_iter()
 681                .map(|chan| proto::Channel {
 682                    id: chan.id.to_proto(),
 683                    name: chan.name,
 684                })
 685                .collect(),
 686        })
 687    }
 688
 689    async fn get_users(
 690        self: Arc<Server>,
 691        request: TypedEnvelope<proto::GetUsers>,
 692    ) -> tide::Result<proto::GetUsersResponse> {
 693        let user_ids = request
 694            .payload
 695            .user_ids
 696            .into_iter()
 697            .map(UserId::from_proto)
 698            .collect();
 699        let users = self
 700            .app_state
 701            .db
 702            .get_users_by_ids(user_ids)
 703            .await?
 704            .into_iter()
 705            .map(|user| proto::User {
 706                id: user.id.to_proto(),
 707                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 708                github_login: user.github_login,
 709            })
 710            .collect();
 711        Ok(proto::GetUsersResponse { users })
 712    }
 713
 714    fn update_contacts_for_users<'a>(
 715        self: &Arc<Server>,
 716        user_ids: impl IntoIterator<Item = &'a UserId>,
 717    ) -> anyhow::Result<()> {
 718        let mut result = Ok(());
 719        let state = self.state();
 720        for user_id in user_ids {
 721            let contacts = state.contacts_for_user(*user_id);
 722            for connection_id in state.connection_ids_for_user(*user_id) {
 723                if let Err(error) = self.peer.send(
 724                    connection_id,
 725                    proto::UpdateContacts {
 726                        contacts: contacts.clone(),
 727                    },
 728                ) {
 729                    result = Err(error);
 730                }
 731            }
 732        }
 733        result
 734    }
 735
 736    async fn join_channel(
 737        mut self: Arc<Self>,
 738        request: TypedEnvelope<proto::JoinChannel>,
 739    ) -> tide::Result<proto::JoinChannelResponse> {
 740        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 741        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 742        if !self
 743            .app_state
 744            .db
 745            .can_user_access_channel(user_id, channel_id)
 746            .await?
 747        {
 748            Err(anyhow!("access denied"))?;
 749        }
 750
 751        self.state_mut().join_channel(request.sender_id, channel_id);
 752        let messages = self
 753            .app_state
 754            .db
 755            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 756            .await?
 757            .into_iter()
 758            .map(|msg| proto::ChannelMessage {
 759                id: msg.id.to_proto(),
 760                body: msg.body,
 761                timestamp: msg.sent_at.unix_timestamp() as u64,
 762                sender_id: msg.sender_id.to_proto(),
 763                nonce: Some(msg.nonce.as_u128().into()),
 764            })
 765            .collect::<Vec<_>>();
 766        Ok(proto::JoinChannelResponse {
 767            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 768            messages,
 769        })
 770    }
 771
 772    async fn leave_channel(
 773        mut self: Arc<Self>,
 774        request: TypedEnvelope<proto::LeaveChannel>,
 775    ) -> tide::Result<()> {
 776        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 777        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 778        if !self
 779            .app_state
 780            .db
 781            .can_user_access_channel(user_id, channel_id)
 782            .await?
 783        {
 784            Err(anyhow!("access denied"))?;
 785        }
 786
 787        self.state_mut()
 788            .leave_channel(request.sender_id, channel_id);
 789
 790        Ok(())
 791    }
 792
 793    async fn send_channel_message(
 794        self: Arc<Self>,
 795        request: TypedEnvelope<proto::SendChannelMessage>,
 796    ) -> tide::Result<proto::SendChannelMessageResponse> {
 797        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 798        let user_id;
 799        let connection_ids;
 800        {
 801            let state = self.state();
 802            user_id = state.user_id_for_connection(request.sender_id)?;
 803            connection_ids = state.channel_connection_ids(channel_id)?;
 804        }
 805
 806        // Validate the message body.
 807        let body = request.payload.body.trim().to_string();
 808        if body.len() > MAX_MESSAGE_LEN {
 809            return Err(anyhow!("message is too long"))?;
 810        }
 811        if body.is_empty() {
 812            return Err(anyhow!("message can't be blank"))?;
 813        }
 814
 815        let timestamp = OffsetDateTime::now_utc();
 816        let nonce = request
 817            .payload
 818            .nonce
 819            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 820
 821        let message_id = self
 822            .app_state
 823            .db
 824            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 825            .await?
 826            .to_proto();
 827        let message = proto::ChannelMessage {
 828            sender_id: user_id.to_proto(),
 829            id: message_id,
 830            body,
 831            timestamp: timestamp.unix_timestamp() as u64,
 832            nonce: Some(nonce),
 833        };
 834        broadcast(request.sender_id, connection_ids, |conn_id| {
 835            self.peer.send(
 836                conn_id,
 837                proto::ChannelMessageSent {
 838                    channel_id: channel_id.to_proto(),
 839                    message: Some(message.clone()),
 840                },
 841            )
 842        })?;
 843        Ok(proto::SendChannelMessageResponse {
 844            message: Some(message),
 845        })
 846    }
 847
 848    async fn get_channel_messages(
 849        self: Arc<Self>,
 850        request: TypedEnvelope<proto::GetChannelMessages>,
 851    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 852        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 853        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 854        if !self
 855            .app_state
 856            .db
 857            .can_user_access_channel(user_id, channel_id)
 858            .await?
 859        {
 860            Err(anyhow!("access denied"))?;
 861        }
 862
 863        let messages = self
 864            .app_state
 865            .db
 866            .get_channel_messages(
 867                channel_id,
 868                MESSAGE_COUNT_PER_PAGE,
 869                Some(MessageId::from_proto(request.payload.before_message_id)),
 870            )
 871            .await?
 872            .into_iter()
 873            .map(|msg| proto::ChannelMessage {
 874                id: msg.id.to_proto(),
 875                body: msg.body,
 876                timestamp: msg.sent_at.unix_timestamp() as u64,
 877                sender_id: msg.sender_id.to_proto(),
 878                nonce: Some(msg.nonce.as_u128().into()),
 879            })
 880            .collect::<Vec<_>>();
 881
 882        Ok(proto::GetChannelMessagesResponse {
 883            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 884            messages,
 885        })
 886    }
 887
 888    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 889        self.store.read()
 890    }
 891
 892    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 893        self.store.write()
 894    }
 895}
 896
 897impl Executor for RealExecutor {
 898    type Timer = Timer;
 899
 900    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 901        task::spawn(future);
 902    }
 903
 904    fn timer(&self, duration: Duration) -> Self::Timer {
 905        Timer::after(duration)
 906    }
 907}
 908
 909fn broadcast<F>(
 910    sender_id: ConnectionId,
 911    receiver_ids: Vec<ConnectionId>,
 912    mut f: F,
 913) -> anyhow::Result<()>
 914where
 915    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 916{
 917    let mut result = Ok(());
 918    for receiver_id in receiver_ids {
 919        if receiver_id != sender_id {
 920            if let Err(error) = f(receiver_id) {
 921                if result.is_ok() {
 922                    result = Err(error);
 923                }
 924            }
 925        }
 926    }
 927    result
 928}
 929
 930pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 931    let server = Server::new(app.state().clone(), rpc.clone(), None);
 932    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 933        let server = server.clone();
 934        async move {
 935            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 936
 937            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 938            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 939            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 940            let client_protocol_version: Option<u32> = request
 941                .header("X-Zed-Protocol-Version")
 942                .and_then(|v| v.as_str().parse().ok());
 943
 944            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 945                return Ok(Response::new(StatusCode::UpgradeRequired));
 946            }
 947
 948            let header = match request.header("Sec-Websocket-Key") {
 949                Some(h) => h.as_str(),
 950                None => return Err(anyhow!("expected sec-websocket-key"))?,
 951            };
 952
 953            let user_id = process_auth_header(&request).await?;
 954
 955            let mut response = Response::new(StatusCode::SwitchingProtocols);
 956            response.insert_header(UPGRADE, "websocket");
 957            response.insert_header(CONNECTION, "Upgrade");
 958            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 959            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 960            response.insert_header("Sec-Websocket-Version", "13");
 961
 962            let http_res: &mut tide::http::Response = response.as_mut();
 963            let upgrade_receiver = http_res.recv_upgrade().await;
 964            let addr = request.remote().unwrap_or("unknown").to_string();
 965            task::spawn(async move {
 966                if let Some(stream) = upgrade_receiver.await {
 967                    server
 968                        .handle_connection(
 969                            Connection::new(
 970                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 971                            ),
 972                            addr,
 973                            user_id,
 974                            None,
 975                            RealExecutor,
 976                        )
 977                        .await;
 978                }
 979            });
 980
 981            Ok(response)
 982        }
 983    });
 984}
 985
 986fn header_contains_ignore_case<T>(
 987    request: &tide::Request<T>,
 988    header_name: HeaderName,
 989    value: &str,
 990) -> bool {
 991    request
 992        .header(header_name)
 993        .map(|h| {
 994            h.as_str()
 995                .split(',')
 996                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 997        })
 998        .unwrap_or(false)
 999}
1000
1001#[cfg(test)]
1002mod tests {
1003    use super::*;
1004    use crate::{
1005        auth,
1006        db::{tests::TestDb, UserId},
1007        github, AppState, Config,
1008    };
1009    use ::rpc::Peer;
1010    use client::{
1011        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1012        EstablishConnectionError, UserStore,
1013    };
1014    use collections::BTreeMap;
1015    use editor::{
1016        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1017        Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1018    };
1019    use gpui::{executor, ModelHandle, TestAppContext};
1020    use language::{
1021        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1022        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1023    };
1024    use lsp;
1025    use parking_lot::Mutex;
1026    use postage::{barrier, watch};
1027    use project::{
1028        fs::{FakeFs, Fs as _},
1029        search::SearchQuery,
1030        worktree::WorktreeHandle,
1031        DiagnosticSummary, Project, ProjectPath,
1032    };
1033    use rand::prelude::*;
1034    use rpc::PeerId;
1035    use serde_json::json;
1036    use sqlx::types::time::OffsetDateTime;
1037    use std::{
1038        cell::Cell,
1039        env,
1040        ops::Deref,
1041        path::{Path, PathBuf},
1042        rc::Rc,
1043        sync::{
1044            atomic::{AtomicBool, Ordering::SeqCst},
1045            Arc,
1046        },
1047        time::Duration,
1048    };
1049    use workspace::{Settings, Workspace, WorkspaceParams};
1050
1051    #[cfg(test)]
1052    #[ctor::ctor]
1053    fn init_logger() {
1054        if std::env::var("RUST_LOG").is_ok() {
1055            env_logger::init();
1056        }
1057    }
1058
1059    #[gpui::test(iterations = 10)]
1060    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1061        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1062        let lang_registry = Arc::new(LanguageRegistry::test());
1063        let fs = FakeFs::new(cx_a.background());
1064        cx_a.foreground().forbid_parking();
1065
1066        // Connect to a server as 2 clients.
1067        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1068        let client_a = server.create_client(cx_a, "user_a").await;
1069        let client_b = server.create_client(cx_b, "user_b").await;
1070
1071        // Share a project as client A
1072        fs.insert_tree(
1073            "/a",
1074            json!({
1075                ".zed.toml": r#"collaborators = ["user_b"]"#,
1076                "a.txt": "a-contents",
1077                "b.txt": "b-contents",
1078            }),
1079        )
1080        .await;
1081        let project_a = cx_a.update(|cx| {
1082            Project::local(
1083                client_a.clone(),
1084                client_a.user_store.clone(),
1085                lang_registry.clone(),
1086                fs.clone(),
1087                cx,
1088            )
1089        });
1090        let (worktree_a, _) = project_a
1091            .update(cx_a, |p, cx| {
1092                p.find_or_create_local_worktree("/a", true, cx)
1093            })
1094            .await
1095            .unwrap();
1096        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1097        worktree_a
1098            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1099            .await;
1100        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1101        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1102
1103        // Join that project as client B
1104        let project_b = Project::remote(
1105            project_id,
1106            client_b.clone(),
1107            client_b.user_store.clone(),
1108            lang_registry.clone(),
1109            fs.clone(),
1110            &mut cx_b.to_async(),
1111        )
1112        .await
1113        .unwrap();
1114
1115        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1116            assert_eq!(
1117                project
1118                    .collaborators()
1119                    .get(&client_a.peer_id)
1120                    .unwrap()
1121                    .user
1122                    .github_login,
1123                "user_a"
1124            );
1125            project.replica_id()
1126        });
1127        project_a
1128            .condition(&cx_a, |tree, _| {
1129                tree.collaborators()
1130                    .get(&client_b.peer_id)
1131                    .map_or(false, |collaborator| {
1132                        collaborator.replica_id == replica_id_b
1133                            && collaborator.user.github_login == "user_b"
1134                    })
1135            })
1136            .await;
1137
1138        // Open the same file as client B and client A.
1139        let buffer_b = project_b
1140            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1141            .await
1142            .unwrap();
1143        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1144        buffer_b.read_with(cx_b, |buf, cx| {
1145            assert_eq!(buf.read(cx).text(), "b-contents")
1146        });
1147        project_a.read_with(cx_a, |project, cx| {
1148            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1149        });
1150        let buffer_a = project_a
1151            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1152            .await
1153            .unwrap();
1154
1155        let editor_b = cx_b.add_view(window_b, |cx| {
1156            Editor::for_buffer(
1157                buffer_b,
1158                None,
1159                watch::channel_with(Settings::test(cx)).1,
1160                cx,
1161            )
1162        });
1163
1164        // TODO
1165        // // Create a selection set as client B and see that selection set as client A.
1166        // buffer_a
1167        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1168        //     .await;
1169
1170        // Edit the buffer as client B and see that edit as client A.
1171        editor_b.update(cx_b, |editor, cx| {
1172            editor.handle_input(&Input("ok, ".into()), cx)
1173        });
1174        buffer_a
1175            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1176            .await;
1177
1178        // TODO
1179        // // Remove the selection set as client B, see those selections disappear as client A.
1180        cx_b.update(move |_| drop(editor_b));
1181        // buffer_a
1182        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1183        //     .await;
1184
1185        // Dropping the client B's project removes client B from client A's collaborators.
1186        cx_b.update(move |_| drop(project_b));
1187        project_a
1188            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1189            .await;
1190    }
1191
1192    #[gpui::test(iterations = 10)]
1193    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1194        let lang_registry = Arc::new(LanguageRegistry::test());
1195        let fs = FakeFs::new(cx_a.background());
1196        cx_a.foreground().forbid_parking();
1197
1198        // Connect to a server as 2 clients.
1199        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1200        let client_a = server.create_client(cx_a, "user_a").await;
1201        let client_b = server.create_client(cx_b, "user_b").await;
1202
1203        // Share a project as client A
1204        fs.insert_tree(
1205            "/a",
1206            json!({
1207                ".zed.toml": r#"collaborators = ["user_b"]"#,
1208                "a.txt": "a-contents",
1209                "b.txt": "b-contents",
1210            }),
1211        )
1212        .await;
1213        let project_a = cx_a.update(|cx| {
1214            Project::local(
1215                client_a.clone(),
1216                client_a.user_store.clone(),
1217                lang_registry.clone(),
1218                fs.clone(),
1219                cx,
1220            )
1221        });
1222        let (worktree_a, _) = project_a
1223            .update(cx_a, |p, cx| {
1224                p.find_or_create_local_worktree("/a", true, cx)
1225            })
1226            .await
1227            .unwrap();
1228        worktree_a
1229            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1230            .await;
1231        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1232        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1233        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1234        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1235
1236        // Join that project as client B
1237        let project_b = Project::remote(
1238            project_id,
1239            client_b.clone(),
1240            client_b.user_store.clone(),
1241            lang_registry.clone(),
1242            fs.clone(),
1243            &mut cx_b.to_async(),
1244        )
1245        .await
1246        .unwrap();
1247        project_b
1248            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1249            .await
1250            .unwrap();
1251
1252        // Unshare the project as client A
1253        project_a
1254            .update(cx_a, |project, cx| project.unshare(cx))
1255            .await
1256            .unwrap();
1257        project_b
1258            .condition(cx_b, |project, _| project.is_read_only())
1259            .await;
1260        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1261        cx_b.update(|_| {
1262            drop(project_b);
1263        });
1264
1265        // Share the project again and ensure guests can still join.
1266        project_a
1267            .update(cx_a, |project, cx| project.share(cx))
1268            .await
1269            .unwrap();
1270        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1271
1272        let project_b2 = Project::remote(
1273            project_id,
1274            client_b.clone(),
1275            client_b.user_store.clone(),
1276            lang_registry.clone(),
1277            fs.clone(),
1278            &mut cx_b.to_async(),
1279        )
1280        .await
1281        .unwrap();
1282        project_b2
1283            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1284            .await
1285            .unwrap();
1286    }
1287
1288    #[gpui::test(iterations = 10)]
1289    async fn test_propagate_saves_and_fs_changes(
1290        cx_a: &mut TestAppContext,
1291        cx_b: &mut TestAppContext,
1292        cx_c: &mut TestAppContext,
1293    ) {
1294        let lang_registry = Arc::new(LanguageRegistry::test());
1295        let fs = FakeFs::new(cx_a.background());
1296        cx_a.foreground().forbid_parking();
1297
1298        // Connect to a server as 3 clients.
1299        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1300        let client_a = server.create_client(cx_a, "user_a").await;
1301        let client_b = server.create_client(cx_b, "user_b").await;
1302        let client_c = server.create_client(cx_c, "user_c").await;
1303
1304        // Share a worktree as client A.
1305        fs.insert_tree(
1306            "/a",
1307            json!({
1308                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1309                "file1": "",
1310                "file2": ""
1311            }),
1312        )
1313        .await;
1314        let project_a = cx_a.update(|cx| {
1315            Project::local(
1316                client_a.clone(),
1317                client_a.user_store.clone(),
1318                lang_registry.clone(),
1319                fs.clone(),
1320                cx,
1321            )
1322        });
1323        let (worktree_a, _) = project_a
1324            .update(cx_a, |p, cx| {
1325                p.find_or_create_local_worktree("/a", true, cx)
1326            })
1327            .await
1328            .unwrap();
1329        worktree_a
1330            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1331            .await;
1332        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1333        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1334        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1335
1336        // Join that worktree as clients B and C.
1337        let project_b = Project::remote(
1338            project_id,
1339            client_b.clone(),
1340            client_b.user_store.clone(),
1341            lang_registry.clone(),
1342            fs.clone(),
1343            &mut cx_b.to_async(),
1344        )
1345        .await
1346        .unwrap();
1347        let project_c = Project::remote(
1348            project_id,
1349            client_c.clone(),
1350            client_c.user_store.clone(),
1351            lang_registry.clone(),
1352            fs.clone(),
1353            &mut cx_c.to_async(),
1354        )
1355        .await
1356        .unwrap();
1357        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1358        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1359
1360        // Open and edit a buffer as both guests B and C.
1361        let buffer_b = project_b
1362            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1363            .await
1364            .unwrap();
1365        let buffer_c = project_c
1366            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1367            .await
1368            .unwrap();
1369        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1370        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1371
1372        // Open and edit that buffer as the host.
1373        let buffer_a = project_a
1374            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1375            .await
1376            .unwrap();
1377
1378        buffer_a
1379            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1380            .await;
1381        buffer_a.update(cx_a, |buf, cx| {
1382            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1383        });
1384
1385        // Wait for edits to propagate
1386        buffer_a
1387            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1388            .await;
1389        buffer_b
1390            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1391            .await;
1392        buffer_c
1393            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1394            .await;
1395
1396        // Edit the buffer as the host and concurrently save as guest B.
1397        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1398        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1399        save_b.await.unwrap();
1400        assert_eq!(
1401            fs.load("/a/file1".as_ref()).await.unwrap(),
1402            "hi-a, i-am-c, i-am-b, i-am-a"
1403        );
1404        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1405        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1406        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1407
1408        worktree_a.flush_fs_events(cx_a).await;
1409
1410        // Make changes on host's file system, see those changes on guest worktrees.
1411        fs.rename(
1412            "/a/file1".as_ref(),
1413            "/a/file1-renamed".as_ref(),
1414            Default::default(),
1415        )
1416        .await
1417        .unwrap();
1418
1419        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1420            .await
1421            .unwrap();
1422        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1423
1424        worktree_a
1425            .condition(&cx_a, |tree, _| {
1426                tree.paths()
1427                    .map(|p| p.to_string_lossy())
1428                    .collect::<Vec<_>>()
1429                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1430            })
1431            .await;
1432        worktree_b
1433            .condition(&cx_b, |tree, _| {
1434                tree.paths()
1435                    .map(|p| p.to_string_lossy())
1436                    .collect::<Vec<_>>()
1437                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1438            })
1439            .await;
1440        worktree_c
1441            .condition(&cx_c, |tree, _| {
1442                tree.paths()
1443                    .map(|p| p.to_string_lossy())
1444                    .collect::<Vec<_>>()
1445                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1446            })
1447            .await;
1448
1449        // Ensure buffer files are updated as well.
1450        buffer_a
1451            .condition(&cx_a, |buf, _| {
1452                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1453            })
1454            .await;
1455        buffer_b
1456            .condition(&cx_b, |buf, _| {
1457                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1458            })
1459            .await;
1460        buffer_c
1461            .condition(&cx_c, |buf, _| {
1462                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1463            })
1464            .await;
1465    }
1466
1467    #[gpui::test(iterations = 10)]
1468    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1469        cx_a.foreground().forbid_parking();
1470        let lang_registry = Arc::new(LanguageRegistry::test());
1471        let fs = FakeFs::new(cx_a.background());
1472
1473        // Connect to a server as 2 clients.
1474        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1475        let client_a = server.create_client(cx_a, "user_a").await;
1476        let client_b = server.create_client(cx_b, "user_b").await;
1477
1478        // Share a project as client A
1479        fs.insert_tree(
1480            "/dir",
1481            json!({
1482                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1483                "a.txt": "a-contents",
1484            }),
1485        )
1486        .await;
1487
1488        let project_a = cx_a.update(|cx| {
1489            Project::local(
1490                client_a.clone(),
1491                client_a.user_store.clone(),
1492                lang_registry.clone(),
1493                fs.clone(),
1494                cx,
1495            )
1496        });
1497        let (worktree_a, _) = project_a
1498            .update(cx_a, |p, cx| {
1499                p.find_or_create_local_worktree("/dir", true, cx)
1500            })
1501            .await
1502            .unwrap();
1503        worktree_a
1504            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1505            .await;
1506        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1507        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1508        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1509
1510        // Join that project as client B
1511        let project_b = Project::remote(
1512            project_id,
1513            client_b.clone(),
1514            client_b.user_store.clone(),
1515            lang_registry.clone(),
1516            fs.clone(),
1517            &mut cx_b.to_async(),
1518        )
1519        .await
1520        .unwrap();
1521
1522        // Open a buffer as client B
1523        let buffer_b = project_b
1524            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1525            .await
1526            .unwrap();
1527
1528        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1529        buffer_b.read_with(cx_b, |buf, _| {
1530            assert!(buf.is_dirty());
1531            assert!(!buf.has_conflict());
1532        });
1533
1534        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1535        buffer_b
1536            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1537            .await;
1538        buffer_b.read_with(cx_b, |buf, _| {
1539            assert!(!buf.has_conflict());
1540        });
1541
1542        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1543        buffer_b.read_with(cx_b, |buf, _| {
1544            assert!(buf.is_dirty());
1545            assert!(!buf.has_conflict());
1546        });
1547    }
1548
1549    #[gpui::test(iterations = 10)]
1550    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1551        cx_a.foreground().forbid_parking();
1552        let lang_registry = Arc::new(LanguageRegistry::test());
1553        let fs = FakeFs::new(cx_a.background());
1554
1555        // Connect to a server as 2 clients.
1556        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1557        let client_a = server.create_client(cx_a, "user_a").await;
1558        let client_b = server.create_client(cx_b, "user_b").await;
1559
1560        // Share a project as client A
1561        fs.insert_tree(
1562            "/dir",
1563            json!({
1564                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1565                "a.txt": "a-contents",
1566            }),
1567        )
1568        .await;
1569
1570        let project_a = cx_a.update(|cx| {
1571            Project::local(
1572                client_a.clone(),
1573                client_a.user_store.clone(),
1574                lang_registry.clone(),
1575                fs.clone(),
1576                cx,
1577            )
1578        });
1579        let (worktree_a, _) = project_a
1580            .update(cx_a, |p, cx| {
1581                p.find_or_create_local_worktree("/dir", true, cx)
1582            })
1583            .await
1584            .unwrap();
1585        worktree_a
1586            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1587            .await;
1588        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1589        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1590        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1591
1592        // Join that project as client B
1593        let project_b = Project::remote(
1594            project_id,
1595            client_b.clone(),
1596            client_b.user_store.clone(),
1597            lang_registry.clone(),
1598            fs.clone(),
1599            &mut cx_b.to_async(),
1600        )
1601        .await
1602        .unwrap();
1603        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1604
1605        // Open a buffer as client B
1606        let buffer_b = project_b
1607            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1608            .await
1609            .unwrap();
1610        buffer_b.read_with(cx_b, |buf, _| {
1611            assert!(!buf.is_dirty());
1612            assert!(!buf.has_conflict());
1613        });
1614
1615        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1616            .await
1617            .unwrap();
1618        buffer_b
1619            .condition(&cx_b, |buf, _| {
1620                buf.text() == "new contents" && !buf.is_dirty()
1621            })
1622            .await;
1623        buffer_b.read_with(cx_b, |buf, _| {
1624            assert!(!buf.has_conflict());
1625        });
1626    }
1627
1628    #[gpui::test(iterations = 10)]
1629    async fn test_editing_while_guest_opens_buffer(
1630        cx_a: &mut TestAppContext,
1631        cx_b: &mut TestAppContext,
1632    ) {
1633        cx_a.foreground().forbid_parking();
1634        let lang_registry = Arc::new(LanguageRegistry::test());
1635        let fs = FakeFs::new(cx_a.background());
1636
1637        // Connect to a server as 2 clients.
1638        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1639        let client_a = server.create_client(cx_a, "user_a").await;
1640        let client_b = server.create_client(cx_b, "user_b").await;
1641
1642        // Share a project as client A
1643        fs.insert_tree(
1644            "/dir",
1645            json!({
1646                ".zed.toml": r#"collaborators = ["user_b"]"#,
1647                "a.txt": "a-contents",
1648            }),
1649        )
1650        .await;
1651        let project_a = cx_a.update(|cx| {
1652            Project::local(
1653                client_a.clone(),
1654                client_a.user_store.clone(),
1655                lang_registry.clone(),
1656                fs.clone(),
1657                cx,
1658            )
1659        });
1660        let (worktree_a, _) = project_a
1661            .update(cx_a, |p, cx| {
1662                p.find_or_create_local_worktree("/dir", true, cx)
1663            })
1664            .await
1665            .unwrap();
1666        worktree_a
1667            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1668            .await;
1669        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1670        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1671        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1672
1673        // Join that project as client B
1674        let project_b = Project::remote(
1675            project_id,
1676            client_b.clone(),
1677            client_b.user_store.clone(),
1678            lang_registry.clone(),
1679            fs.clone(),
1680            &mut cx_b.to_async(),
1681        )
1682        .await
1683        .unwrap();
1684
1685        // Open a buffer as client A
1686        let buffer_a = project_a
1687            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1688            .await
1689            .unwrap();
1690
1691        // Start opening the same buffer as client B
1692        let buffer_b = cx_b
1693            .background()
1694            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1695
1696        // Edit the buffer as client A while client B is still opening it.
1697        cx_b.background().simulate_random_delay().await;
1698        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1699        cx_b.background().simulate_random_delay().await;
1700        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1701
1702        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1703        let buffer_b = buffer_b.await.unwrap();
1704        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1705    }
1706
1707    #[gpui::test(iterations = 10)]
1708    async fn test_leaving_worktree_while_opening_buffer(
1709        cx_a: &mut TestAppContext,
1710        cx_b: &mut TestAppContext,
1711    ) {
1712        cx_a.foreground().forbid_parking();
1713        let lang_registry = Arc::new(LanguageRegistry::test());
1714        let fs = FakeFs::new(cx_a.background());
1715
1716        // Connect to a server as 2 clients.
1717        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1718        let client_a = server.create_client(cx_a, "user_a").await;
1719        let client_b = server.create_client(cx_b, "user_b").await;
1720
1721        // Share a project as client A
1722        fs.insert_tree(
1723            "/dir",
1724            json!({
1725                ".zed.toml": r#"collaborators = ["user_b"]"#,
1726                "a.txt": "a-contents",
1727            }),
1728        )
1729        .await;
1730        let project_a = cx_a.update(|cx| {
1731            Project::local(
1732                client_a.clone(),
1733                client_a.user_store.clone(),
1734                lang_registry.clone(),
1735                fs.clone(),
1736                cx,
1737            )
1738        });
1739        let (worktree_a, _) = project_a
1740            .update(cx_a, |p, cx| {
1741                p.find_or_create_local_worktree("/dir", true, cx)
1742            })
1743            .await
1744            .unwrap();
1745        worktree_a
1746            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1747            .await;
1748        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1749        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1750        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1751
1752        // Join that project as client B
1753        let project_b = Project::remote(
1754            project_id,
1755            client_b.clone(),
1756            client_b.user_store.clone(),
1757            lang_registry.clone(),
1758            fs.clone(),
1759            &mut cx_b.to_async(),
1760        )
1761        .await
1762        .unwrap();
1763
1764        // See that a guest has joined as client A.
1765        project_a
1766            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1767            .await;
1768
1769        // Begin opening a buffer as client B, but leave the project before the open completes.
1770        let buffer_b = cx_b
1771            .background()
1772            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1773        cx_b.update(|_| drop(project_b));
1774        drop(buffer_b);
1775
1776        // See that the guest has left.
1777        project_a
1778            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1779            .await;
1780    }
1781
1782    #[gpui::test(iterations = 10)]
1783    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1784        cx_a.foreground().forbid_parking();
1785        let lang_registry = Arc::new(LanguageRegistry::test());
1786        let fs = FakeFs::new(cx_a.background());
1787
1788        // Connect to a server as 2 clients.
1789        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1790        let client_a = server.create_client(cx_a, "user_a").await;
1791        let client_b = server.create_client(cx_b, "user_b").await;
1792
1793        // Share a project as client A
1794        fs.insert_tree(
1795            "/a",
1796            json!({
1797                ".zed.toml": r#"collaborators = ["user_b"]"#,
1798                "a.txt": "a-contents",
1799                "b.txt": "b-contents",
1800            }),
1801        )
1802        .await;
1803        let project_a = cx_a.update(|cx| {
1804            Project::local(
1805                client_a.clone(),
1806                client_a.user_store.clone(),
1807                lang_registry.clone(),
1808                fs.clone(),
1809                cx,
1810            )
1811        });
1812        let (worktree_a, _) = project_a
1813            .update(cx_a, |p, cx| {
1814                p.find_or_create_local_worktree("/a", true, cx)
1815            })
1816            .await
1817            .unwrap();
1818        worktree_a
1819            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1820            .await;
1821        let project_id = project_a
1822            .update(cx_a, |project, _| project.next_remote_id())
1823            .await;
1824        project_a
1825            .update(cx_a, |project, cx| project.share(cx))
1826            .await
1827            .unwrap();
1828
1829        // Join that project as client B
1830        let _project_b = Project::remote(
1831            project_id,
1832            client_b.clone(),
1833            client_b.user_store.clone(),
1834            lang_registry.clone(),
1835            fs.clone(),
1836            &mut cx_b.to_async(),
1837        )
1838        .await
1839        .unwrap();
1840
1841        // Client A sees that a guest has joined.
1842        project_a
1843            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1844            .await;
1845
1846        // Drop client B's connection and ensure client A observes client B leaving the project.
1847        client_b.disconnect(&cx_b.to_async()).unwrap();
1848        project_a
1849            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1850            .await;
1851
1852        // Rejoin the project as client B
1853        let _project_b = Project::remote(
1854            project_id,
1855            client_b.clone(),
1856            client_b.user_store.clone(),
1857            lang_registry.clone(),
1858            fs.clone(),
1859            &mut cx_b.to_async(),
1860        )
1861        .await
1862        .unwrap();
1863
1864        // Client A sees that a guest has re-joined.
1865        project_a
1866            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1867            .await;
1868
1869        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1870        client_b.wait_for_current_user(cx_b).await;
1871        server.disconnect_client(client_b.current_user_id(cx_b));
1872        cx_a.foreground().advance_clock(Duration::from_secs(3));
1873        project_a
1874            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1875            .await;
1876    }
1877
1878    #[gpui::test(iterations = 10)]
1879    async fn test_collaborating_with_diagnostics(
1880        cx_a: &mut TestAppContext,
1881        cx_b: &mut TestAppContext,
1882    ) {
1883        cx_a.foreground().forbid_parking();
1884        let mut lang_registry = Arc::new(LanguageRegistry::test());
1885        let fs = FakeFs::new(cx_a.background());
1886
1887        // Set up a fake language server.
1888        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1889        Arc::get_mut(&mut lang_registry)
1890            .unwrap()
1891            .add(Arc::new(Language::new(
1892                LanguageConfig {
1893                    name: "Rust".into(),
1894                    path_suffixes: vec!["rs".to_string()],
1895                    language_server: Some(language_server_config),
1896                    ..Default::default()
1897                },
1898                Some(tree_sitter_rust::language()),
1899            )));
1900
1901        // Connect to a server as 2 clients.
1902        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1903        let client_a = server.create_client(cx_a, "user_a").await;
1904        let client_b = server.create_client(cx_b, "user_b").await;
1905
1906        // Share a project as client A
1907        fs.insert_tree(
1908            "/a",
1909            json!({
1910                ".zed.toml": r#"collaborators = ["user_b"]"#,
1911                "a.rs": "let one = two",
1912                "other.rs": "",
1913            }),
1914        )
1915        .await;
1916        let project_a = cx_a.update(|cx| {
1917            Project::local(
1918                client_a.clone(),
1919                client_a.user_store.clone(),
1920                lang_registry.clone(),
1921                fs.clone(),
1922                cx,
1923            )
1924        });
1925        let (worktree_a, _) = project_a
1926            .update(cx_a, |p, cx| {
1927                p.find_or_create_local_worktree("/a", true, cx)
1928            })
1929            .await
1930            .unwrap();
1931        worktree_a
1932            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1933            .await;
1934        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1935        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1936        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1937
1938        // Cause the language server to start.
1939        let _ = cx_a
1940            .background()
1941            .spawn(project_a.update(cx_a, |project, cx| {
1942                project.open_buffer(
1943                    ProjectPath {
1944                        worktree_id,
1945                        path: Path::new("other.rs").into(),
1946                    },
1947                    cx,
1948                )
1949            }))
1950            .await
1951            .unwrap();
1952
1953        // Simulate a language server reporting errors for a file.
1954        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1955        fake_language_server
1956            .receive_notification::<lsp::notification::DidOpenTextDocument>()
1957            .await;
1958        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1959            lsp::PublishDiagnosticsParams {
1960                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1961                version: None,
1962                diagnostics: vec![lsp::Diagnostic {
1963                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1964                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1965                    message: "message 1".to_string(),
1966                    ..Default::default()
1967                }],
1968            },
1969        );
1970
1971        // Wait for server to see the diagnostics update.
1972        server
1973            .condition(|store| {
1974                let worktree = store
1975                    .project(project_id)
1976                    .unwrap()
1977                    .share
1978                    .as_ref()
1979                    .unwrap()
1980                    .worktrees
1981                    .get(&worktree_id.to_proto())
1982                    .unwrap();
1983
1984                !worktree.diagnostic_summaries.is_empty()
1985            })
1986            .await;
1987
1988        // Join the worktree as client B.
1989        let project_b = Project::remote(
1990            project_id,
1991            client_b.clone(),
1992            client_b.user_store.clone(),
1993            lang_registry.clone(),
1994            fs.clone(),
1995            &mut cx_b.to_async(),
1996        )
1997        .await
1998        .unwrap();
1999
2000        project_b.read_with(cx_b, |project, cx| {
2001            assert_eq!(
2002                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2003                &[(
2004                    ProjectPath {
2005                        worktree_id,
2006                        path: Arc::from(Path::new("a.rs")),
2007                    },
2008                    DiagnosticSummary {
2009                        error_count: 1,
2010                        warning_count: 0,
2011                        ..Default::default()
2012                    },
2013                )]
2014            )
2015        });
2016
2017        // Simulate a language server reporting more errors for a file.
2018        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2019            lsp::PublishDiagnosticsParams {
2020                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2021                version: None,
2022                diagnostics: vec![
2023                    lsp::Diagnostic {
2024                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2025                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2026                        message: "message 1".to_string(),
2027                        ..Default::default()
2028                    },
2029                    lsp::Diagnostic {
2030                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2031                        range: lsp::Range::new(
2032                            lsp::Position::new(0, 10),
2033                            lsp::Position::new(0, 13),
2034                        ),
2035                        message: "message 2".to_string(),
2036                        ..Default::default()
2037                    },
2038                ],
2039            },
2040        );
2041
2042        // Client b gets the updated summaries
2043        project_b
2044            .condition(&cx_b, |project, cx| {
2045                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2046                    == &[(
2047                        ProjectPath {
2048                            worktree_id,
2049                            path: Arc::from(Path::new("a.rs")),
2050                        },
2051                        DiagnosticSummary {
2052                            error_count: 1,
2053                            warning_count: 1,
2054                            ..Default::default()
2055                        },
2056                    )]
2057            })
2058            .await;
2059
2060        // Open the file with the errors on client B. They should be present.
2061        let buffer_b = cx_b
2062            .background()
2063            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2064            .await
2065            .unwrap();
2066
2067        buffer_b.read_with(cx_b, |buffer, _| {
2068            assert_eq!(
2069                buffer
2070                    .snapshot()
2071                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2072                    .map(|entry| entry)
2073                    .collect::<Vec<_>>(),
2074                &[
2075                    DiagnosticEntry {
2076                        range: Point::new(0, 4)..Point::new(0, 7),
2077                        diagnostic: Diagnostic {
2078                            group_id: 0,
2079                            message: "message 1".to_string(),
2080                            severity: lsp::DiagnosticSeverity::ERROR,
2081                            is_primary: true,
2082                            ..Default::default()
2083                        }
2084                    },
2085                    DiagnosticEntry {
2086                        range: Point::new(0, 10)..Point::new(0, 13),
2087                        diagnostic: Diagnostic {
2088                            group_id: 1,
2089                            severity: lsp::DiagnosticSeverity::WARNING,
2090                            message: "message 2".to_string(),
2091                            is_primary: true,
2092                            ..Default::default()
2093                        }
2094                    }
2095                ]
2096            );
2097        });
2098    }
2099
2100    #[gpui::test(iterations = 10)]
2101    async fn test_collaborating_with_completion(
2102        cx_a: &mut TestAppContext,
2103        cx_b: &mut TestAppContext,
2104    ) {
2105        cx_a.foreground().forbid_parking();
2106        let mut lang_registry = Arc::new(LanguageRegistry::test());
2107        let fs = FakeFs::new(cx_a.background());
2108
2109        // Set up a fake language server.
2110        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2111        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2112            completion_provider: Some(lsp::CompletionOptions {
2113                trigger_characters: Some(vec![".".to_string()]),
2114                ..Default::default()
2115            }),
2116            ..Default::default()
2117        });
2118        Arc::get_mut(&mut lang_registry)
2119            .unwrap()
2120            .add(Arc::new(Language::new(
2121                LanguageConfig {
2122                    name: "Rust".into(),
2123                    path_suffixes: vec!["rs".to_string()],
2124                    language_server: Some(language_server_config),
2125                    ..Default::default()
2126                },
2127                Some(tree_sitter_rust::language()),
2128            )));
2129
2130        // Connect to a server as 2 clients.
2131        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2132        let client_a = server.create_client(cx_a, "user_a").await;
2133        let client_b = server.create_client(cx_b, "user_b").await;
2134
2135        // Share a project as client A
2136        fs.insert_tree(
2137            "/a",
2138            json!({
2139                ".zed.toml": r#"collaborators = ["user_b"]"#,
2140                "main.rs": "fn main() { a }",
2141                "other.rs": "",
2142            }),
2143        )
2144        .await;
2145        let project_a = cx_a.update(|cx| {
2146            Project::local(
2147                client_a.clone(),
2148                client_a.user_store.clone(),
2149                lang_registry.clone(),
2150                fs.clone(),
2151                cx,
2152            )
2153        });
2154        let (worktree_a, _) = project_a
2155            .update(cx_a, |p, cx| {
2156                p.find_or_create_local_worktree("/a", true, cx)
2157            })
2158            .await
2159            .unwrap();
2160        worktree_a
2161            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2162            .await;
2163        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2164        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2165        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2166
2167        // Join the worktree as client B.
2168        let project_b = Project::remote(
2169            project_id,
2170            client_b.clone(),
2171            client_b.user_store.clone(),
2172            lang_registry.clone(),
2173            fs.clone(),
2174            &mut cx_b.to_async(),
2175        )
2176        .await
2177        .unwrap();
2178
2179        // Open a file in an editor as the guest.
2180        let buffer_b = project_b
2181            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2182            .await
2183            .unwrap();
2184        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2185        let editor_b = cx_b.add_view(window_b, |cx| {
2186            Editor::for_buffer(
2187                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2188                Some(project_b.clone()),
2189                watch::channel_with(Settings::test(cx)).1,
2190                cx,
2191            )
2192        });
2193
2194        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2195        buffer_b
2196            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2197            .await;
2198
2199        // Type a completion trigger character as the guest.
2200        editor_b.update(cx_b, |editor, cx| {
2201            editor.select_ranges([13..13], None, cx);
2202            editor.handle_input(&Input(".".into()), cx);
2203            cx.focus(&editor_b);
2204        });
2205
2206        // Receive a completion request as the host's language server.
2207        // Return some completions from the host's language server.
2208        cx_a.foreground().start_waiting();
2209        fake_language_server
2210            .handle_request::<lsp::request::Completion, _>(|params, _| {
2211                assert_eq!(
2212                    params.text_document_position.text_document.uri,
2213                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2214                );
2215                assert_eq!(
2216                    params.text_document_position.position,
2217                    lsp::Position::new(0, 14),
2218                );
2219
2220                Some(lsp::CompletionResponse::Array(vec![
2221                    lsp::CompletionItem {
2222                        label: "first_method(…)".into(),
2223                        detail: Some("fn(&mut self, B) -> C".into()),
2224                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2225                            new_text: "first_method($1)".to_string(),
2226                            range: lsp::Range::new(
2227                                lsp::Position::new(0, 14),
2228                                lsp::Position::new(0, 14),
2229                            ),
2230                        })),
2231                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2232                        ..Default::default()
2233                    },
2234                    lsp::CompletionItem {
2235                        label: "second_method(…)".into(),
2236                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2237                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2238                            new_text: "second_method()".to_string(),
2239                            range: lsp::Range::new(
2240                                lsp::Position::new(0, 14),
2241                                lsp::Position::new(0, 14),
2242                            ),
2243                        })),
2244                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2245                        ..Default::default()
2246                    },
2247                ]))
2248            })
2249            .next()
2250            .await
2251            .unwrap();
2252        cx_a.foreground().finish_waiting();
2253
2254        // Open the buffer on the host.
2255        let buffer_a = project_a
2256            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2257            .await
2258            .unwrap();
2259        buffer_a
2260            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2261            .await;
2262
2263        // Confirm a completion on the guest.
2264        editor_b
2265            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2266            .await;
2267        editor_b.update(cx_b, |editor, cx| {
2268            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2269            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2270        });
2271
2272        // Return a resolved completion from the host's language server.
2273        // The resolved completion has an additional text edit.
2274        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2275            |params, _| {
2276                assert_eq!(params.label, "first_method(…)");
2277                lsp::CompletionItem {
2278                    label: "first_method(…)".into(),
2279                    detail: Some("fn(&mut self, B) -> C".into()),
2280                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2281                        new_text: "first_method($1)".to_string(),
2282                        range: lsp::Range::new(
2283                            lsp::Position::new(0, 14),
2284                            lsp::Position::new(0, 14),
2285                        ),
2286                    })),
2287                    additional_text_edits: Some(vec![lsp::TextEdit {
2288                        new_text: "use d::SomeTrait;\n".to_string(),
2289                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2290                    }]),
2291                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2292                    ..Default::default()
2293                }
2294            },
2295        );
2296
2297        // The additional edit is applied.
2298        buffer_a
2299            .condition(&cx_a, |buffer, _| {
2300                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2301            })
2302            .await;
2303        buffer_b
2304            .condition(&cx_b, |buffer, _| {
2305                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2306            })
2307            .await;
2308    }
2309
2310    #[gpui::test(iterations = 10)]
2311    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2312        cx_a.foreground().forbid_parking();
2313        let mut lang_registry = Arc::new(LanguageRegistry::test());
2314        let fs = FakeFs::new(cx_a.background());
2315
2316        // Set up a fake language server.
2317        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2318        Arc::get_mut(&mut lang_registry)
2319            .unwrap()
2320            .add(Arc::new(Language::new(
2321                LanguageConfig {
2322                    name: "Rust".into(),
2323                    path_suffixes: vec!["rs".to_string()],
2324                    language_server: Some(language_server_config),
2325                    ..Default::default()
2326                },
2327                Some(tree_sitter_rust::language()),
2328            )));
2329
2330        // Connect to a server as 2 clients.
2331        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2332        let client_a = server.create_client(cx_a, "user_a").await;
2333        let client_b = server.create_client(cx_b, "user_b").await;
2334
2335        // Share a project as client A
2336        fs.insert_tree(
2337            "/a",
2338            json!({
2339                ".zed.toml": r#"collaborators = ["user_b"]"#,
2340                "a.rs": "let one = two",
2341            }),
2342        )
2343        .await;
2344        let project_a = cx_a.update(|cx| {
2345            Project::local(
2346                client_a.clone(),
2347                client_a.user_store.clone(),
2348                lang_registry.clone(),
2349                fs.clone(),
2350                cx,
2351            )
2352        });
2353        let (worktree_a, _) = project_a
2354            .update(cx_a, |p, cx| {
2355                p.find_or_create_local_worktree("/a", true, cx)
2356            })
2357            .await
2358            .unwrap();
2359        worktree_a
2360            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2361            .await;
2362        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2363        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2364        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2365
2366        // Join the worktree as client B.
2367        let project_b = Project::remote(
2368            project_id,
2369            client_b.clone(),
2370            client_b.user_store.clone(),
2371            lang_registry.clone(),
2372            fs.clone(),
2373            &mut cx_b.to_async(),
2374        )
2375        .await
2376        .unwrap();
2377
2378        let buffer_b = cx_b
2379            .background()
2380            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2381            .await
2382            .unwrap();
2383
2384        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2385        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2386            Some(vec![
2387                lsp::TextEdit {
2388                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2389                    new_text: "h".to_string(),
2390                },
2391                lsp::TextEdit {
2392                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2393                    new_text: "y".to_string(),
2394                },
2395            ])
2396        });
2397
2398        project_b
2399            .update(cx_b, |project, cx| {
2400                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2401            })
2402            .await
2403            .unwrap();
2404        assert_eq!(
2405            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2406            "let honey = two"
2407        );
2408    }
2409
2410    #[gpui::test(iterations = 10)]
2411    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2412        cx_a.foreground().forbid_parking();
2413        let mut lang_registry = Arc::new(LanguageRegistry::test());
2414        let fs = FakeFs::new(cx_a.background());
2415        fs.insert_tree(
2416            "/root-1",
2417            json!({
2418                ".zed.toml": r#"collaborators = ["user_b"]"#,
2419                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2420            }),
2421        )
2422        .await;
2423        fs.insert_tree(
2424            "/root-2",
2425            json!({
2426                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2427            }),
2428        )
2429        .await;
2430
2431        // Set up a fake language server.
2432        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2433        Arc::get_mut(&mut lang_registry)
2434            .unwrap()
2435            .add(Arc::new(Language::new(
2436                LanguageConfig {
2437                    name: "Rust".into(),
2438                    path_suffixes: vec!["rs".to_string()],
2439                    language_server: Some(language_server_config),
2440                    ..Default::default()
2441                },
2442                Some(tree_sitter_rust::language()),
2443            )));
2444
2445        // Connect to a server as 2 clients.
2446        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2447        let client_a = server.create_client(cx_a, "user_a").await;
2448        let client_b = server.create_client(cx_b, "user_b").await;
2449
2450        // Share a project as client A
2451        let project_a = cx_a.update(|cx| {
2452            Project::local(
2453                client_a.clone(),
2454                client_a.user_store.clone(),
2455                lang_registry.clone(),
2456                fs.clone(),
2457                cx,
2458            )
2459        });
2460        let (worktree_a, _) = project_a
2461            .update(cx_a, |p, cx| {
2462                p.find_or_create_local_worktree("/root-1", true, cx)
2463            })
2464            .await
2465            .unwrap();
2466        worktree_a
2467            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2468            .await;
2469        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2470        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2471        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2472
2473        // Join the worktree as client B.
2474        let project_b = Project::remote(
2475            project_id,
2476            client_b.clone(),
2477            client_b.user_store.clone(),
2478            lang_registry.clone(),
2479            fs.clone(),
2480            &mut cx_b.to_async(),
2481        )
2482        .await
2483        .unwrap();
2484
2485        // Open the file on client B.
2486        let buffer_b = cx_b
2487            .background()
2488            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2489            .await
2490            .unwrap();
2491
2492        // Request the definition of a symbol as the guest.
2493        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2494        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2495            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2496                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2497                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2498            )))
2499        });
2500
2501        let definitions_1 = project_b
2502            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2503            .await
2504            .unwrap();
2505        cx_b.read(|cx| {
2506            assert_eq!(definitions_1.len(), 1);
2507            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2508            let target_buffer = definitions_1[0].buffer.read(cx);
2509            assert_eq!(
2510                target_buffer.text(),
2511                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2512            );
2513            assert_eq!(
2514                definitions_1[0].range.to_point(target_buffer),
2515                Point::new(0, 6)..Point::new(0, 9)
2516            );
2517        });
2518
2519        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2520        // the previous call to `definition`.
2521        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2522            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2523                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2524                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2525            )))
2526        });
2527
2528        let definitions_2 = project_b
2529            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2530            .await
2531            .unwrap();
2532        cx_b.read(|cx| {
2533            assert_eq!(definitions_2.len(), 1);
2534            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2535            let target_buffer = definitions_2[0].buffer.read(cx);
2536            assert_eq!(
2537                target_buffer.text(),
2538                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2539            );
2540            assert_eq!(
2541                definitions_2[0].range.to_point(target_buffer),
2542                Point::new(1, 6)..Point::new(1, 11)
2543            );
2544        });
2545        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2546    }
2547
2548    #[gpui::test(iterations = 10)]
2549    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2550        cx_a.foreground().forbid_parking();
2551        let mut lang_registry = Arc::new(LanguageRegistry::test());
2552        let fs = FakeFs::new(cx_a.background());
2553        fs.insert_tree(
2554            "/root-1",
2555            json!({
2556                ".zed.toml": r#"collaborators = ["user_b"]"#,
2557                "one.rs": "const ONE: usize = 1;",
2558                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2559            }),
2560        )
2561        .await;
2562        fs.insert_tree(
2563            "/root-2",
2564            json!({
2565                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2566            }),
2567        )
2568        .await;
2569
2570        // Set up a fake language server.
2571        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2572        Arc::get_mut(&mut lang_registry)
2573            .unwrap()
2574            .add(Arc::new(Language::new(
2575                LanguageConfig {
2576                    name: "Rust".into(),
2577                    path_suffixes: vec!["rs".to_string()],
2578                    language_server: Some(language_server_config),
2579                    ..Default::default()
2580                },
2581                Some(tree_sitter_rust::language()),
2582            )));
2583
2584        // Connect to a server as 2 clients.
2585        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2586        let client_a = server.create_client(cx_a, "user_a").await;
2587        let client_b = server.create_client(cx_b, "user_b").await;
2588
2589        // Share a project as client A
2590        let project_a = cx_a.update(|cx| {
2591            Project::local(
2592                client_a.clone(),
2593                client_a.user_store.clone(),
2594                lang_registry.clone(),
2595                fs.clone(),
2596                cx,
2597            )
2598        });
2599        let (worktree_a, _) = project_a
2600            .update(cx_a, |p, cx| {
2601                p.find_or_create_local_worktree("/root-1", true, cx)
2602            })
2603            .await
2604            .unwrap();
2605        worktree_a
2606            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2607            .await;
2608        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2609        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2610        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2611
2612        // Join the worktree as client B.
2613        let project_b = Project::remote(
2614            project_id,
2615            client_b.clone(),
2616            client_b.user_store.clone(),
2617            lang_registry.clone(),
2618            fs.clone(),
2619            &mut cx_b.to_async(),
2620        )
2621        .await
2622        .unwrap();
2623
2624        // Open the file on client B.
2625        let buffer_b = cx_b
2626            .background()
2627            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2628            .await
2629            .unwrap();
2630
2631        // Request references to a symbol as the guest.
2632        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2633        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2634            assert_eq!(
2635                params.text_document_position.text_document.uri.as_str(),
2636                "file:///root-1/one.rs"
2637            );
2638            Some(vec![
2639                lsp::Location {
2640                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2641                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2642                },
2643                lsp::Location {
2644                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2645                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2646                },
2647                lsp::Location {
2648                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2649                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2650                },
2651            ])
2652        });
2653
2654        let references = project_b
2655            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2656            .await
2657            .unwrap();
2658        cx_b.read(|cx| {
2659            assert_eq!(references.len(), 3);
2660            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2661
2662            let two_buffer = references[0].buffer.read(cx);
2663            let three_buffer = references[2].buffer.read(cx);
2664            assert_eq!(
2665                two_buffer.file().unwrap().path().as_ref(),
2666                Path::new("two.rs")
2667            );
2668            assert_eq!(references[1].buffer, references[0].buffer);
2669            assert_eq!(
2670                three_buffer.file().unwrap().full_path(cx),
2671                Path::new("three.rs")
2672            );
2673
2674            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2675            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2676            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2677        });
2678    }
2679
2680    #[gpui::test(iterations = 10)]
2681    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2682        cx_a.foreground().forbid_parking();
2683        let lang_registry = Arc::new(LanguageRegistry::test());
2684        let fs = FakeFs::new(cx_a.background());
2685        fs.insert_tree(
2686            "/root-1",
2687            json!({
2688                ".zed.toml": r#"collaborators = ["user_b"]"#,
2689                "a": "hello world",
2690                "b": "goodnight moon",
2691                "c": "a world of goo",
2692                "d": "world champion of clown world",
2693            }),
2694        )
2695        .await;
2696        fs.insert_tree(
2697            "/root-2",
2698            json!({
2699                "e": "disney world is fun",
2700            }),
2701        )
2702        .await;
2703
2704        // Connect to a server as 2 clients.
2705        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2706        let client_a = server.create_client(cx_a, "user_a").await;
2707        let client_b = server.create_client(cx_b, "user_b").await;
2708
2709        // Share a project as client A
2710        let project_a = cx_a.update(|cx| {
2711            Project::local(
2712                client_a.clone(),
2713                client_a.user_store.clone(),
2714                lang_registry.clone(),
2715                fs.clone(),
2716                cx,
2717            )
2718        });
2719        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2720
2721        let (worktree_1, _) = project_a
2722            .update(cx_a, |p, cx| {
2723                p.find_or_create_local_worktree("/root-1", true, cx)
2724            })
2725            .await
2726            .unwrap();
2727        worktree_1
2728            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2729            .await;
2730        let (worktree_2, _) = project_a
2731            .update(cx_a, |p, cx| {
2732                p.find_or_create_local_worktree("/root-2", true, cx)
2733            })
2734            .await
2735            .unwrap();
2736        worktree_2
2737            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2738            .await;
2739
2740        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2741
2742        // Join the worktree as client B.
2743        let project_b = Project::remote(
2744            project_id,
2745            client_b.clone(),
2746            client_b.user_store.clone(),
2747            lang_registry.clone(),
2748            fs.clone(),
2749            &mut cx_b.to_async(),
2750        )
2751        .await
2752        .unwrap();
2753
2754        let results = project_b
2755            .update(cx_b, |project, cx| {
2756                project.search(SearchQuery::text("world", false, false), cx)
2757            })
2758            .await
2759            .unwrap();
2760
2761        let mut ranges_by_path = results
2762            .into_iter()
2763            .map(|(buffer, ranges)| {
2764                buffer.read_with(cx_b, |buffer, cx| {
2765                    let path = buffer.file().unwrap().full_path(cx);
2766                    let offset_ranges = ranges
2767                        .into_iter()
2768                        .map(|range| range.to_offset(buffer))
2769                        .collect::<Vec<_>>();
2770                    (path, offset_ranges)
2771                })
2772            })
2773            .collect::<Vec<_>>();
2774        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2775
2776        assert_eq!(
2777            ranges_by_path,
2778            &[
2779                (PathBuf::from("root-1/a"), vec![6..11]),
2780                (PathBuf::from("root-1/c"), vec![2..7]),
2781                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2782                (PathBuf::from("root-2/e"), vec![7..12]),
2783            ]
2784        );
2785    }
2786
2787    #[gpui::test(iterations = 10)]
2788    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2789        cx_a.foreground().forbid_parking();
2790        let lang_registry = Arc::new(LanguageRegistry::test());
2791        let fs = FakeFs::new(cx_a.background());
2792        fs.insert_tree(
2793            "/root-1",
2794            json!({
2795                ".zed.toml": r#"collaborators = ["user_b"]"#,
2796                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2797            }),
2798        )
2799        .await;
2800
2801        // Set up a fake language server.
2802        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2803        lang_registry.add(Arc::new(Language::new(
2804            LanguageConfig {
2805                name: "Rust".into(),
2806                path_suffixes: vec!["rs".to_string()],
2807                language_server: Some(language_server_config),
2808                ..Default::default()
2809            },
2810            Some(tree_sitter_rust::language()),
2811        )));
2812
2813        // Connect to a server as 2 clients.
2814        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2815        let client_a = server.create_client(cx_a, "user_a").await;
2816        let client_b = server.create_client(cx_b, "user_b").await;
2817
2818        // Share a project as client A
2819        let project_a = cx_a.update(|cx| {
2820            Project::local(
2821                client_a.clone(),
2822                client_a.user_store.clone(),
2823                lang_registry.clone(),
2824                fs.clone(),
2825                cx,
2826            )
2827        });
2828        let (worktree_a, _) = project_a
2829            .update(cx_a, |p, cx| {
2830                p.find_or_create_local_worktree("/root-1", true, cx)
2831            })
2832            .await
2833            .unwrap();
2834        worktree_a
2835            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2836            .await;
2837        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2838        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2839        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2840
2841        // Join the worktree as client B.
2842        let project_b = Project::remote(
2843            project_id,
2844            client_b.clone(),
2845            client_b.user_store.clone(),
2846            lang_registry.clone(),
2847            fs.clone(),
2848            &mut cx_b.to_async(),
2849        )
2850        .await
2851        .unwrap();
2852
2853        // Open the file on client B.
2854        let buffer_b = cx_b
2855            .background()
2856            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2857            .await
2858            .unwrap();
2859
2860        // Request document highlights as the guest.
2861        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2862        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2863            |params, _| {
2864                assert_eq!(
2865                    params
2866                        .text_document_position_params
2867                        .text_document
2868                        .uri
2869                        .as_str(),
2870                    "file:///root-1/main.rs"
2871                );
2872                assert_eq!(
2873                    params.text_document_position_params.position,
2874                    lsp::Position::new(0, 34)
2875                );
2876                Some(vec![
2877                    lsp::DocumentHighlight {
2878                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2879                        range: lsp::Range::new(
2880                            lsp::Position::new(0, 10),
2881                            lsp::Position::new(0, 16),
2882                        ),
2883                    },
2884                    lsp::DocumentHighlight {
2885                        kind: Some(lsp::DocumentHighlightKind::READ),
2886                        range: lsp::Range::new(
2887                            lsp::Position::new(0, 32),
2888                            lsp::Position::new(0, 38),
2889                        ),
2890                    },
2891                    lsp::DocumentHighlight {
2892                        kind: Some(lsp::DocumentHighlightKind::READ),
2893                        range: lsp::Range::new(
2894                            lsp::Position::new(0, 41),
2895                            lsp::Position::new(0, 47),
2896                        ),
2897                    },
2898                ])
2899            },
2900        );
2901
2902        let highlights = project_b
2903            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2904            .await
2905            .unwrap();
2906        buffer_b.read_with(cx_b, |buffer, _| {
2907            let snapshot = buffer.snapshot();
2908
2909            let highlights = highlights
2910                .into_iter()
2911                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2912                .collect::<Vec<_>>();
2913            assert_eq!(
2914                highlights,
2915                &[
2916                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2917                    (lsp::DocumentHighlightKind::READ, 32..38),
2918                    (lsp::DocumentHighlightKind::READ, 41..47)
2919                ]
2920            )
2921        });
2922    }
2923
2924    #[gpui::test(iterations = 10)]
2925    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2926        cx_a.foreground().forbid_parking();
2927        let mut lang_registry = Arc::new(LanguageRegistry::test());
2928        let fs = FakeFs::new(cx_a.background());
2929        fs.insert_tree(
2930            "/code",
2931            json!({
2932                "crate-1": {
2933                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2934                    "one.rs": "const ONE: usize = 1;",
2935                },
2936                "crate-2": {
2937                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2938                },
2939                "private": {
2940                    "passwords.txt": "the-password",
2941                }
2942            }),
2943        )
2944        .await;
2945
2946        // Set up a fake language server.
2947        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2948        Arc::get_mut(&mut lang_registry)
2949            .unwrap()
2950            .add(Arc::new(Language::new(
2951                LanguageConfig {
2952                    name: "Rust".into(),
2953                    path_suffixes: vec!["rs".to_string()],
2954                    language_server: Some(language_server_config),
2955                    ..Default::default()
2956                },
2957                Some(tree_sitter_rust::language()),
2958            )));
2959
2960        // Connect to a server as 2 clients.
2961        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2962        let client_a = server.create_client(cx_a, "user_a").await;
2963        let client_b = server.create_client(cx_b, "user_b").await;
2964
2965        // Share a project as client A
2966        let project_a = cx_a.update(|cx| {
2967            Project::local(
2968                client_a.clone(),
2969                client_a.user_store.clone(),
2970                lang_registry.clone(),
2971                fs.clone(),
2972                cx,
2973            )
2974        });
2975        let (worktree_a, _) = project_a
2976            .update(cx_a, |p, cx| {
2977                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2978            })
2979            .await
2980            .unwrap();
2981        worktree_a
2982            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2983            .await;
2984        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2985        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2986        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2987
2988        // Join the worktree as client B.
2989        let project_b = Project::remote(
2990            project_id,
2991            client_b.clone(),
2992            client_b.user_store.clone(),
2993            lang_registry.clone(),
2994            fs.clone(),
2995            &mut cx_b.to_async(),
2996        )
2997        .await
2998        .unwrap();
2999
3000        // Cause the language server to start.
3001        let _buffer = cx_b
3002            .background()
3003            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3004            .await
3005            .unwrap();
3006
3007        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3008        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3009            #[allow(deprecated)]
3010            Some(vec![lsp::SymbolInformation {
3011                name: "TWO".into(),
3012                location: lsp::Location {
3013                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3014                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3015                },
3016                kind: lsp::SymbolKind::CONSTANT,
3017                tags: None,
3018                container_name: None,
3019                deprecated: None,
3020            }])
3021        });
3022
3023        // Request the definition of a symbol as the guest.
3024        let symbols = project_b
3025            .update(cx_b, |p, cx| p.symbols("two", cx))
3026            .await
3027            .unwrap();
3028        assert_eq!(symbols.len(), 1);
3029        assert_eq!(symbols[0].name, "TWO");
3030
3031        // Open one of the returned symbols.
3032        let buffer_b_2 = project_b
3033            .update(cx_b, |project, cx| {
3034                project.open_buffer_for_symbol(&symbols[0], cx)
3035            })
3036            .await
3037            .unwrap();
3038        buffer_b_2.read_with(cx_b, |buffer, _| {
3039            assert_eq!(
3040                buffer.file().unwrap().path().as_ref(),
3041                Path::new("../crate-2/two.rs")
3042            );
3043        });
3044
3045        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3046        let mut fake_symbol = symbols[0].clone();
3047        fake_symbol.path = Path::new("/code/secrets").into();
3048        let error = project_b
3049            .update(cx_b, |project, cx| {
3050                project.open_buffer_for_symbol(&fake_symbol, cx)
3051            })
3052            .await
3053            .unwrap_err();
3054        assert!(error.to_string().contains("invalid symbol signature"));
3055    }
3056
3057    #[gpui::test(iterations = 10)]
3058    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3059        cx_a: &mut TestAppContext,
3060        cx_b: &mut TestAppContext,
3061        mut rng: StdRng,
3062    ) {
3063        cx_a.foreground().forbid_parking();
3064        let mut lang_registry = Arc::new(LanguageRegistry::test());
3065        let fs = FakeFs::new(cx_a.background());
3066        fs.insert_tree(
3067            "/root",
3068            json!({
3069                ".zed.toml": r#"collaborators = ["user_b"]"#,
3070                "a.rs": "const ONE: usize = b::TWO;",
3071                "b.rs": "const TWO: usize = 2",
3072            }),
3073        )
3074        .await;
3075
3076        // Set up a fake language server.
3077        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3078
3079        Arc::get_mut(&mut lang_registry)
3080            .unwrap()
3081            .add(Arc::new(Language::new(
3082                LanguageConfig {
3083                    name: "Rust".into(),
3084                    path_suffixes: vec!["rs".to_string()],
3085                    language_server: Some(language_server_config),
3086                    ..Default::default()
3087                },
3088                Some(tree_sitter_rust::language()),
3089            )));
3090
3091        // Connect to a server as 2 clients.
3092        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3093        let client_a = server.create_client(cx_a, "user_a").await;
3094        let client_b = server.create_client(cx_b, "user_b").await;
3095
3096        // Share a project as client A
3097        let project_a = cx_a.update(|cx| {
3098            Project::local(
3099                client_a.clone(),
3100                client_a.user_store.clone(),
3101                lang_registry.clone(),
3102                fs.clone(),
3103                cx,
3104            )
3105        });
3106
3107        let (worktree_a, _) = project_a
3108            .update(cx_a, |p, cx| {
3109                p.find_or_create_local_worktree("/root", true, cx)
3110            })
3111            .await
3112            .unwrap();
3113        worktree_a
3114            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3115            .await;
3116        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3117        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3118        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3119
3120        // Join the worktree as client B.
3121        let project_b = Project::remote(
3122            project_id,
3123            client_b.clone(),
3124            client_b.user_store.clone(),
3125            lang_registry.clone(),
3126            fs.clone(),
3127            &mut cx_b.to_async(),
3128        )
3129        .await
3130        .unwrap();
3131
3132        let buffer_b1 = cx_b
3133            .background()
3134            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3135            .await
3136            .unwrap();
3137
3138        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3139        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3140            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3141                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3142                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3143            )))
3144        });
3145
3146        let definitions;
3147        let buffer_b2;
3148        if rng.gen() {
3149            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3150            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3151        } else {
3152            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3153            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3154        }
3155
3156        let buffer_b2 = buffer_b2.await.unwrap();
3157        let definitions = definitions.await.unwrap();
3158        assert_eq!(definitions.len(), 1);
3159        assert_eq!(definitions[0].buffer, buffer_b2);
3160    }
3161
3162    #[gpui::test(iterations = 10)]
3163    async fn test_collaborating_with_code_actions(
3164        cx_a: &mut TestAppContext,
3165        cx_b: &mut TestAppContext,
3166    ) {
3167        cx_a.foreground().forbid_parking();
3168        let mut lang_registry = Arc::new(LanguageRegistry::test());
3169        let fs = FakeFs::new(cx_a.background());
3170        let mut path_openers_b = Vec::new();
3171        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3172
3173        // Set up a fake language server.
3174        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3175        Arc::get_mut(&mut lang_registry)
3176            .unwrap()
3177            .add(Arc::new(Language::new(
3178                LanguageConfig {
3179                    name: "Rust".into(),
3180                    path_suffixes: vec!["rs".to_string()],
3181                    language_server: Some(language_server_config),
3182                    ..Default::default()
3183                },
3184                Some(tree_sitter_rust::language()),
3185            )));
3186
3187        // Connect to a server as 2 clients.
3188        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3189        let client_a = server.create_client(cx_a, "user_a").await;
3190        let client_b = server.create_client(cx_b, "user_b").await;
3191
3192        // Share a project as client A
3193        fs.insert_tree(
3194            "/a",
3195            json!({
3196                ".zed.toml": r#"collaborators = ["user_b"]"#,
3197                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3198                "other.rs": "pub fn foo() -> usize { 4 }",
3199            }),
3200        )
3201        .await;
3202        let project_a = cx_a.update(|cx| {
3203            Project::local(
3204                client_a.clone(),
3205                client_a.user_store.clone(),
3206                lang_registry.clone(),
3207                fs.clone(),
3208                cx,
3209            )
3210        });
3211        let (worktree_a, _) = project_a
3212            .update(cx_a, |p, cx| {
3213                p.find_or_create_local_worktree("/a", true, cx)
3214            })
3215            .await
3216            .unwrap();
3217        worktree_a
3218            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3219            .await;
3220        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3221        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3222        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3223
3224        // Join the worktree as client B.
3225        let project_b = Project::remote(
3226            project_id,
3227            client_b.clone(),
3228            client_b.user_store.clone(),
3229            lang_registry.clone(),
3230            fs.clone(),
3231            &mut cx_b.to_async(),
3232        )
3233        .await
3234        .unwrap();
3235        let mut params = cx_b.update(WorkspaceParams::test);
3236        params.languages = lang_registry.clone();
3237        params.client = client_b.client.clone();
3238        params.user_store = client_b.user_store.clone();
3239        params.project = project_b;
3240        params.path_openers = path_openers_b.into();
3241
3242        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3243        let editor_b = workspace_b
3244            .update(cx_b, |workspace, cx| {
3245                workspace.open_path((worktree_id, "main.rs").into(), cx)
3246            })
3247            .await
3248            .unwrap()
3249            .downcast::<Editor>()
3250            .unwrap();
3251
3252        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3253        fake_language_server
3254            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3255                assert_eq!(
3256                    params.text_document.uri,
3257                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3258                );
3259                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3260                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3261                None
3262            })
3263            .next()
3264            .await;
3265
3266        // Move cursor to a location that contains code actions.
3267        editor_b.update(cx_b, |editor, cx| {
3268            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3269            cx.focus(&editor_b);
3270        });
3271
3272        fake_language_server
3273            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3274                assert_eq!(
3275                    params.text_document.uri,
3276                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3277                );
3278                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3279                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3280
3281                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3282                    lsp::CodeAction {
3283                        title: "Inline into all callers".to_string(),
3284                        edit: Some(lsp::WorkspaceEdit {
3285                            changes: Some(
3286                                [
3287                                    (
3288                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3289                                        vec![lsp::TextEdit::new(
3290                                            lsp::Range::new(
3291                                                lsp::Position::new(1, 22),
3292                                                lsp::Position::new(1, 34),
3293                                            ),
3294                                            "4".to_string(),
3295                                        )],
3296                                    ),
3297                                    (
3298                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3299                                        vec![lsp::TextEdit::new(
3300                                            lsp::Range::new(
3301                                                lsp::Position::new(0, 0),
3302                                                lsp::Position::new(0, 27),
3303                                            ),
3304                                            "".to_string(),
3305                                        )],
3306                                    ),
3307                                ]
3308                                .into_iter()
3309                                .collect(),
3310                            ),
3311                            ..Default::default()
3312                        }),
3313                        data: Some(json!({
3314                            "codeActionParams": {
3315                                "range": {
3316                                    "start": {"line": 1, "column": 31},
3317                                    "end": {"line": 1, "column": 31},
3318                                }
3319                            }
3320                        })),
3321                        ..Default::default()
3322                    },
3323                )])
3324            })
3325            .next()
3326            .await;
3327
3328        // Toggle code actions and wait for them to display.
3329        editor_b.update(cx_b, |editor, cx| {
3330            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3331        });
3332        editor_b
3333            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3334            .await;
3335
3336        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3337
3338        // Confirming the code action will trigger a resolve request.
3339        let confirm_action = workspace_b
3340            .update(cx_b, |workspace, cx| {
3341                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3342            })
3343            .unwrap();
3344        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3345            lsp::CodeAction {
3346                title: "Inline into all callers".to_string(),
3347                edit: Some(lsp::WorkspaceEdit {
3348                    changes: Some(
3349                        [
3350                            (
3351                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3352                                vec![lsp::TextEdit::new(
3353                                    lsp::Range::new(
3354                                        lsp::Position::new(1, 22),
3355                                        lsp::Position::new(1, 34),
3356                                    ),
3357                                    "4".to_string(),
3358                                )],
3359                            ),
3360                            (
3361                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3362                                vec![lsp::TextEdit::new(
3363                                    lsp::Range::new(
3364                                        lsp::Position::new(0, 0),
3365                                        lsp::Position::new(0, 27),
3366                                    ),
3367                                    "".to_string(),
3368                                )],
3369                            ),
3370                        ]
3371                        .into_iter()
3372                        .collect(),
3373                    ),
3374                    ..Default::default()
3375                }),
3376                ..Default::default()
3377            }
3378        });
3379
3380        // After the action is confirmed, an editor containing both modified files is opened.
3381        confirm_action.await.unwrap();
3382        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3383            workspace
3384                .active_item(cx)
3385                .unwrap()
3386                .downcast::<Editor>()
3387                .unwrap()
3388        });
3389        code_action_editor.update(cx_b, |editor, cx| {
3390            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3391            editor.undo(&Undo, cx);
3392            assert_eq!(
3393                editor.text(cx),
3394                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3395            );
3396            editor.redo(&Redo, cx);
3397            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3398        });
3399    }
3400
3401    #[gpui::test(iterations = 10)]
3402    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3403        cx_a.foreground().forbid_parking();
3404        let mut lang_registry = Arc::new(LanguageRegistry::test());
3405        let fs = FakeFs::new(cx_a.background());
3406        let mut path_openers_b = Vec::new();
3407        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3408
3409        // Set up a fake language server.
3410        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3411        Arc::get_mut(&mut lang_registry)
3412            .unwrap()
3413            .add(Arc::new(Language::new(
3414                LanguageConfig {
3415                    name: "Rust".into(),
3416                    path_suffixes: vec!["rs".to_string()],
3417                    language_server: Some(language_server_config),
3418                    ..Default::default()
3419                },
3420                Some(tree_sitter_rust::language()),
3421            )));
3422
3423        // Connect to a server as 2 clients.
3424        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3425        let client_a = server.create_client(cx_a, "user_a").await;
3426        let client_b = server.create_client(cx_b, "user_b").await;
3427
3428        // Share a project as client A
3429        fs.insert_tree(
3430            "/dir",
3431            json!({
3432                ".zed.toml": r#"collaborators = ["user_b"]"#,
3433                "one.rs": "const ONE: usize = 1;",
3434                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3435            }),
3436        )
3437        .await;
3438        let project_a = cx_a.update(|cx| {
3439            Project::local(
3440                client_a.clone(),
3441                client_a.user_store.clone(),
3442                lang_registry.clone(),
3443                fs.clone(),
3444                cx,
3445            )
3446        });
3447        let (worktree_a, _) = project_a
3448            .update(cx_a, |p, cx| {
3449                p.find_or_create_local_worktree("/dir", true, cx)
3450            })
3451            .await
3452            .unwrap();
3453        worktree_a
3454            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3455            .await;
3456        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3457        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3458        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3459
3460        // Join the worktree as client B.
3461        let project_b = Project::remote(
3462            project_id,
3463            client_b.clone(),
3464            client_b.user_store.clone(),
3465            lang_registry.clone(),
3466            fs.clone(),
3467            &mut cx_b.to_async(),
3468        )
3469        .await
3470        .unwrap();
3471        let mut params = cx_b.update(WorkspaceParams::test);
3472        params.languages = lang_registry.clone();
3473        params.client = client_b.client.clone();
3474        params.user_store = client_b.user_store.clone();
3475        params.project = project_b;
3476        params.path_openers = path_openers_b.into();
3477
3478        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3479        let editor_b = workspace_b
3480            .update(cx_b, |workspace, cx| {
3481                workspace.open_path((worktree_id, "one.rs").into(), cx)
3482            })
3483            .await
3484            .unwrap()
3485            .downcast::<Editor>()
3486            .unwrap();
3487        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3488
3489        // Move cursor to a location that can be renamed.
3490        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3491            editor.select_ranges([7..7], None, cx);
3492            editor.rename(&Rename, cx).unwrap()
3493        });
3494
3495        fake_language_server
3496            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3497                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3498                assert_eq!(params.position, lsp::Position::new(0, 7));
3499                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3500                    lsp::Position::new(0, 6),
3501                    lsp::Position::new(0, 9),
3502                )))
3503            })
3504            .next()
3505            .await
3506            .unwrap();
3507        prepare_rename.await.unwrap();
3508        editor_b.update(cx_b, |editor, cx| {
3509            let rename = editor.pending_rename().unwrap();
3510            let buffer = editor.buffer().read(cx).snapshot(cx);
3511            assert_eq!(
3512                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3513                6..9
3514            );
3515            rename.editor.update(cx, |rename_editor, cx| {
3516                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3517                    rename_buffer.edit([0..3], "THREE", cx);
3518                });
3519            });
3520        });
3521
3522        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3523            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3524        });
3525        fake_language_server
3526            .handle_request::<lsp::request::Rename, _>(|params, _| {
3527                assert_eq!(
3528                    params.text_document_position.text_document.uri.as_str(),
3529                    "file:///dir/one.rs"
3530                );
3531                assert_eq!(
3532                    params.text_document_position.position,
3533                    lsp::Position::new(0, 6)
3534                );
3535                assert_eq!(params.new_name, "THREE");
3536                Some(lsp::WorkspaceEdit {
3537                    changes: Some(
3538                        [
3539                            (
3540                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3541                                vec![lsp::TextEdit::new(
3542                                    lsp::Range::new(
3543                                        lsp::Position::new(0, 6),
3544                                        lsp::Position::new(0, 9),
3545                                    ),
3546                                    "THREE".to_string(),
3547                                )],
3548                            ),
3549                            (
3550                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3551                                vec![
3552                                    lsp::TextEdit::new(
3553                                        lsp::Range::new(
3554                                            lsp::Position::new(0, 24),
3555                                            lsp::Position::new(0, 27),
3556                                        ),
3557                                        "THREE".to_string(),
3558                                    ),
3559                                    lsp::TextEdit::new(
3560                                        lsp::Range::new(
3561                                            lsp::Position::new(0, 35),
3562                                            lsp::Position::new(0, 38),
3563                                        ),
3564                                        "THREE".to_string(),
3565                                    ),
3566                                ],
3567                            ),
3568                        ]
3569                        .into_iter()
3570                        .collect(),
3571                    ),
3572                    ..Default::default()
3573                })
3574            })
3575            .next()
3576            .await
3577            .unwrap();
3578        confirm_rename.await.unwrap();
3579
3580        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3581            workspace
3582                .active_item(cx)
3583                .unwrap()
3584                .downcast::<Editor>()
3585                .unwrap()
3586        });
3587        rename_editor.update(cx_b, |editor, cx| {
3588            assert_eq!(
3589                editor.text(cx),
3590                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3591            );
3592            editor.undo(&Undo, cx);
3593            assert_eq!(
3594                editor.text(cx),
3595                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3596            );
3597            editor.redo(&Redo, cx);
3598            assert_eq!(
3599                editor.text(cx),
3600                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3601            );
3602        });
3603
3604        // Ensure temporary rename edits cannot be undone/redone.
3605        editor_b.update(cx_b, |editor, cx| {
3606            editor.undo(&Undo, cx);
3607            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3608            editor.undo(&Undo, cx);
3609            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3610            editor.redo(&Redo, cx);
3611            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3612        })
3613    }
3614
3615    #[gpui::test(iterations = 10)]
3616    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3617        cx_a.foreground().forbid_parking();
3618
3619        // Connect to a server as 2 clients.
3620        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3621        let client_a = server.create_client(cx_a, "user_a").await;
3622        let client_b = server.create_client(cx_b, "user_b").await;
3623
3624        // Create an org that includes these 2 users.
3625        let db = &server.app_state.db;
3626        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3627        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3628            .await
3629            .unwrap();
3630        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3631            .await
3632            .unwrap();
3633
3634        // Create a channel that includes all the users.
3635        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3636        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3637            .await
3638            .unwrap();
3639        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3640            .await
3641            .unwrap();
3642        db.create_channel_message(
3643            channel_id,
3644            client_b.current_user_id(&cx_b),
3645            "hello A, it's B.",
3646            OffsetDateTime::now_utc(),
3647            1,
3648        )
3649        .await
3650        .unwrap();
3651
3652        let channels_a = cx_a
3653            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3654        channels_a
3655            .condition(cx_a, |list, _| list.available_channels().is_some())
3656            .await;
3657        channels_a.read_with(cx_a, |list, _| {
3658            assert_eq!(
3659                list.available_channels().unwrap(),
3660                &[ChannelDetails {
3661                    id: channel_id.to_proto(),
3662                    name: "test-channel".to_string()
3663                }]
3664            )
3665        });
3666        let channel_a = channels_a.update(cx_a, |this, cx| {
3667            this.get_channel(channel_id.to_proto(), cx).unwrap()
3668        });
3669        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3670        channel_a
3671            .condition(&cx_a, |channel, _| {
3672                channel_messages(channel)
3673                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3674            })
3675            .await;
3676
3677        let channels_b = cx_b
3678            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3679        channels_b
3680            .condition(cx_b, |list, _| list.available_channels().is_some())
3681            .await;
3682        channels_b.read_with(cx_b, |list, _| {
3683            assert_eq!(
3684                list.available_channels().unwrap(),
3685                &[ChannelDetails {
3686                    id: channel_id.to_proto(),
3687                    name: "test-channel".to_string()
3688                }]
3689            )
3690        });
3691
3692        let channel_b = channels_b.update(cx_b, |this, cx| {
3693            this.get_channel(channel_id.to_proto(), cx).unwrap()
3694        });
3695        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3696        channel_b
3697            .condition(&cx_b, |channel, _| {
3698                channel_messages(channel)
3699                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3700            })
3701            .await;
3702
3703        channel_a
3704            .update(cx_a, |channel, cx| {
3705                channel
3706                    .send_message("oh, hi B.".to_string(), cx)
3707                    .unwrap()
3708                    .detach();
3709                let task = channel.send_message("sup".to_string(), cx).unwrap();
3710                assert_eq!(
3711                    channel_messages(channel),
3712                    &[
3713                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3714                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3715                        ("user_a".to_string(), "sup".to_string(), true)
3716                    ]
3717                );
3718                task
3719            })
3720            .await
3721            .unwrap();
3722
3723        channel_b
3724            .condition(&cx_b, |channel, _| {
3725                channel_messages(channel)
3726                    == [
3727                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3728                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3729                        ("user_a".to_string(), "sup".to_string(), false),
3730                    ]
3731            })
3732            .await;
3733
3734        assert_eq!(
3735            server
3736                .state()
3737                .await
3738                .channel(channel_id)
3739                .unwrap()
3740                .connection_ids
3741                .len(),
3742            2
3743        );
3744        cx_b.update(|_| drop(channel_b));
3745        server
3746            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3747            .await;
3748
3749        cx_a.update(|_| drop(channel_a));
3750        server
3751            .condition(|state| state.channel(channel_id).is_none())
3752            .await;
3753    }
3754
3755    #[gpui::test(iterations = 10)]
3756    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3757        cx_a.foreground().forbid_parking();
3758
3759        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3760        let client_a = server.create_client(cx_a, "user_a").await;
3761
3762        let db = &server.app_state.db;
3763        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3764        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3765        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3766            .await
3767            .unwrap();
3768        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3769            .await
3770            .unwrap();
3771
3772        let channels_a = cx_a
3773            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3774        channels_a
3775            .condition(cx_a, |list, _| list.available_channels().is_some())
3776            .await;
3777        let channel_a = channels_a.update(cx_a, |this, cx| {
3778            this.get_channel(channel_id.to_proto(), cx).unwrap()
3779        });
3780
3781        // Messages aren't allowed to be too long.
3782        channel_a
3783            .update(cx_a, |channel, cx| {
3784                let long_body = "this is long.\n".repeat(1024);
3785                channel.send_message(long_body, cx).unwrap()
3786            })
3787            .await
3788            .unwrap_err();
3789
3790        // Messages aren't allowed to be blank.
3791        channel_a.update(cx_a, |channel, cx| {
3792            channel.send_message(String::new(), cx).unwrap_err()
3793        });
3794
3795        // Leading and trailing whitespace are trimmed.
3796        channel_a
3797            .update(cx_a, |channel, cx| {
3798                channel
3799                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3800                    .unwrap()
3801            })
3802            .await
3803            .unwrap();
3804        assert_eq!(
3805            db.get_channel_messages(channel_id, 10, None)
3806                .await
3807                .unwrap()
3808                .iter()
3809                .map(|m| &m.body)
3810                .collect::<Vec<_>>(),
3811            &["surrounded by whitespace"]
3812        );
3813    }
3814
3815    #[gpui::test(iterations = 10)]
3816    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3817        cx_a.foreground().forbid_parking();
3818
3819        // Connect to a server as 2 clients.
3820        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3821        let client_a = server.create_client(cx_a, "user_a").await;
3822        let client_b = server.create_client(cx_b, "user_b").await;
3823        let mut status_b = client_b.status();
3824
3825        // Create an org that includes these 2 users.
3826        let db = &server.app_state.db;
3827        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3828        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3829            .await
3830            .unwrap();
3831        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3832            .await
3833            .unwrap();
3834
3835        // Create a channel that includes all the users.
3836        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3837        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3838            .await
3839            .unwrap();
3840        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3841            .await
3842            .unwrap();
3843        db.create_channel_message(
3844            channel_id,
3845            client_b.current_user_id(&cx_b),
3846            "hello A, it's B.",
3847            OffsetDateTime::now_utc(),
3848            2,
3849        )
3850        .await
3851        .unwrap();
3852
3853        let channels_a = cx_a
3854            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3855        channels_a
3856            .condition(cx_a, |list, _| list.available_channels().is_some())
3857            .await;
3858
3859        channels_a.read_with(cx_a, |list, _| {
3860            assert_eq!(
3861                list.available_channels().unwrap(),
3862                &[ChannelDetails {
3863                    id: channel_id.to_proto(),
3864                    name: "test-channel".to_string()
3865                }]
3866            )
3867        });
3868        let channel_a = channels_a.update(cx_a, |this, cx| {
3869            this.get_channel(channel_id.to_proto(), cx).unwrap()
3870        });
3871        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3872        channel_a
3873            .condition(&cx_a, |channel, _| {
3874                channel_messages(channel)
3875                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3876            })
3877            .await;
3878
3879        let channels_b = cx_b
3880            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3881        channels_b
3882            .condition(cx_b, |list, _| list.available_channels().is_some())
3883            .await;
3884        channels_b.read_with(cx_b, |list, _| {
3885            assert_eq!(
3886                list.available_channels().unwrap(),
3887                &[ChannelDetails {
3888                    id: channel_id.to_proto(),
3889                    name: "test-channel".to_string()
3890                }]
3891            )
3892        });
3893
3894        let channel_b = channels_b.update(cx_b, |this, cx| {
3895            this.get_channel(channel_id.to_proto(), cx).unwrap()
3896        });
3897        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3898        channel_b
3899            .condition(&cx_b, |channel, _| {
3900                channel_messages(channel)
3901                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3902            })
3903            .await;
3904
3905        // Disconnect client B, ensuring we can still access its cached channel data.
3906        server.forbid_connections();
3907        server.disconnect_client(client_b.current_user_id(&cx_b));
3908        cx_b.foreground().advance_clock(Duration::from_secs(3));
3909        while !matches!(
3910            status_b.next().await,
3911            Some(client::Status::ReconnectionError { .. })
3912        ) {}
3913
3914        channels_b.read_with(cx_b, |channels, _| {
3915            assert_eq!(
3916                channels.available_channels().unwrap(),
3917                [ChannelDetails {
3918                    id: channel_id.to_proto(),
3919                    name: "test-channel".to_string()
3920                }]
3921            )
3922        });
3923        channel_b.read_with(cx_b, |channel, _| {
3924            assert_eq!(
3925                channel_messages(channel),
3926                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3927            )
3928        });
3929
3930        // Send a message from client B while it is disconnected.
3931        channel_b
3932            .update(cx_b, |channel, cx| {
3933                let task = channel
3934                    .send_message("can you see this?".to_string(), cx)
3935                    .unwrap();
3936                assert_eq!(
3937                    channel_messages(channel),
3938                    &[
3939                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3940                        ("user_b".to_string(), "can you see this?".to_string(), true)
3941                    ]
3942                );
3943                task
3944            })
3945            .await
3946            .unwrap_err();
3947
3948        // Send a message from client A while B is disconnected.
3949        channel_a
3950            .update(cx_a, |channel, cx| {
3951                channel
3952                    .send_message("oh, hi B.".to_string(), cx)
3953                    .unwrap()
3954                    .detach();
3955                let task = channel.send_message("sup".to_string(), cx).unwrap();
3956                assert_eq!(
3957                    channel_messages(channel),
3958                    &[
3959                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3960                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3961                        ("user_a".to_string(), "sup".to_string(), true)
3962                    ]
3963                );
3964                task
3965            })
3966            .await
3967            .unwrap();
3968
3969        // Give client B a chance to reconnect.
3970        server.allow_connections();
3971        cx_b.foreground().advance_clock(Duration::from_secs(10));
3972
3973        // Verify that B sees the new messages upon reconnection, as well as the message client B
3974        // sent while offline.
3975        channel_b
3976            .condition(&cx_b, |channel, _| {
3977                channel_messages(channel)
3978                    == [
3979                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3980                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3981                        ("user_a".to_string(), "sup".to_string(), false),
3982                        ("user_b".to_string(), "can you see this?".to_string(), false),
3983                    ]
3984            })
3985            .await;
3986
3987        // Ensure client A and B can communicate normally after reconnection.
3988        channel_a
3989            .update(cx_a, |channel, cx| {
3990                channel.send_message("you online?".to_string(), cx).unwrap()
3991            })
3992            .await
3993            .unwrap();
3994        channel_b
3995            .condition(&cx_b, |channel, _| {
3996                channel_messages(channel)
3997                    == [
3998                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3999                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4000                        ("user_a".to_string(), "sup".to_string(), false),
4001                        ("user_b".to_string(), "can you see this?".to_string(), false),
4002                        ("user_a".to_string(), "you online?".to_string(), false),
4003                    ]
4004            })
4005            .await;
4006
4007        channel_b
4008            .update(cx_b, |channel, cx| {
4009                channel.send_message("yep".to_string(), cx).unwrap()
4010            })
4011            .await
4012            .unwrap();
4013        channel_a
4014            .condition(&cx_a, |channel, _| {
4015                channel_messages(channel)
4016                    == [
4017                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4018                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4019                        ("user_a".to_string(), "sup".to_string(), false),
4020                        ("user_b".to_string(), "can you see this?".to_string(), false),
4021                        ("user_a".to_string(), "you online?".to_string(), false),
4022                        ("user_b".to_string(), "yep".to_string(), false),
4023                    ]
4024            })
4025            .await;
4026    }
4027
4028    #[gpui::test(iterations = 10)]
4029    async fn test_contacts(
4030        cx_a: &mut TestAppContext,
4031        cx_b: &mut TestAppContext,
4032        cx_c: &mut TestAppContext,
4033    ) {
4034        cx_a.foreground().forbid_parking();
4035        let lang_registry = Arc::new(LanguageRegistry::test());
4036        let fs = FakeFs::new(cx_a.background());
4037
4038        // Connect to a server as 3 clients.
4039        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4040        let client_a = server.create_client(cx_a, "user_a").await;
4041        let client_b = server.create_client(cx_b, "user_b").await;
4042        let client_c = server.create_client(cx_c, "user_c").await;
4043
4044        // Share a worktree as client A.
4045        fs.insert_tree(
4046            "/a",
4047            json!({
4048                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4049            }),
4050        )
4051        .await;
4052
4053        let project_a = cx_a.update(|cx| {
4054            Project::local(
4055                client_a.clone(),
4056                client_a.user_store.clone(),
4057                lang_registry.clone(),
4058                fs.clone(),
4059                cx,
4060            )
4061        });
4062        let (worktree_a, _) = project_a
4063            .update(cx_a, |p, cx| {
4064                p.find_or_create_local_worktree("/a", true, cx)
4065            })
4066            .await
4067            .unwrap();
4068        worktree_a
4069            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4070            .await;
4071
4072        client_a
4073            .user_store
4074            .condition(&cx_a, |user_store, _| {
4075                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4076            })
4077            .await;
4078        client_b
4079            .user_store
4080            .condition(&cx_b, |user_store, _| {
4081                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4082            })
4083            .await;
4084        client_c
4085            .user_store
4086            .condition(&cx_c, |user_store, _| {
4087                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4088            })
4089            .await;
4090
4091        let project_id = project_a
4092            .update(cx_a, |project, _| project.next_remote_id())
4093            .await;
4094        project_a
4095            .update(cx_a, |project, cx| project.share(cx))
4096            .await
4097            .unwrap();
4098
4099        let _project_b = Project::remote(
4100            project_id,
4101            client_b.clone(),
4102            client_b.user_store.clone(),
4103            lang_registry.clone(),
4104            fs.clone(),
4105            &mut cx_b.to_async(),
4106        )
4107        .await
4108        .unwrap();
4109
4110        client_a
4111            .user_store
4112            .condition(&cx_a, |user_store, _| {
4113                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4114            })
4115            .await;
4116        client_b
4117            .user_store
4118            .condition(&cx_b, |user_store, _| {
4119                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4120            })
4121            .await;
4122        client_c
4123            .user_store
4124            .condition(&cx_c, |user_store, _| {
4125                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4126            })
4127            .await;
4128
4129        project_a
4130            .condition(&cx_a, |project, _| {
4131                project.collaborators().contains_key(&client_b.peer_id)
4132            })
4133            .await;
4134
4135        cx_a.update(move |_| drop(project_a));
4136        client_a
4137            .user_store
4138            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4139            .await;
4140        client_b
4141            .user_store
4142            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4143            .await;
4144        client_c
4145            .user_store
4146            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4147            .await;
4148
4149        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4150            user_store
4151                .contacts()
4152                .iter()
4153                .map(|contact| {
4154                    let worktrees = contact
4155                        .projects
4156                        .iter()
4157                        .map(|p| {
4158                            (
4159                                p.worktree_root_names[0].as_str(),
4160                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4161                            )
4162                        })
4163                        .collect();
4164                    (contact.user.github_login.as_str(), worktrees)
4165                })
4166                .collect()
4167        }
4168    }
4169
4170    #[gpui::test(iterations = 100)]
4171    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4172        cx.foreground().forbid_parking();
4173        let max_peers = env::var("MAX_PEERS")
4174            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4175            .unwrap_or(5);
4176        let max_operations = env::var("OPERATIONS")
4177            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4178            .unwrap_or(10);
4179
4180        let rng = Arc::new(Mutex::new(rng));
4181
4182        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4183        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4184
4185        let fs = FakeFs::new(cx.background());
4186        fs.insert_tree(
4187            "/_collab",
4188            json!({
4189                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4190            }),
4191        )
4192        .await;
4193
4194        let operations = Rc::new(Cell::new(0));
4195        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4196        let mut clients = Vec::new();
4197
4198        let mut next_entity_id = 100000;
4199        let mut host_cx = TestAppContext::new(
4200            cx.foreground_platform(),
4201            cx.platform(),
4202            cx.foreground(),
4203            cx.background(),
4204            cx.font_cache(),
4205            cx.leak_detector(),
4206            next_entity_id,
4207        );
4208        let host = server.create_client(&mut host_cx, "host").await;
4209        let host_project = host_cx.update(|cx| {
4210            Project::local(
4211                host.client.clone(),
4212                host.user_store.clone(),
4213                Arc::new(LanguageRegistry::test()),
4214                fs.clone(),
4215                cx,
4216            )
4217        });
4218        let host_project_id = host_project
4219            .update(&mut host_cx, |p, _| p.next_remote_id())
4220            .await;
4221
4222        let (collab_worktree, _) = host_project
4223            .update(&mut host_cx, |project, cx| {
4224                project.find_or_create_local_worktree("/_collab", true, cx)
4225            })
4226            .await
4227            .unwrap();
4228        collab_worktree
4229            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4230            .await;
4231        host_project
4232            .update(&mut host_cx, |project, cx| project.share(cx))
4233            .await
4234            .unwrap();
4235
4236        clients.push(cx.foreground().spawn(host.simulate_host(
4237            host_project,
4238            language_server_config,
4239            operations.clone(),
4240            max_operations,
4241            rng.clone(),
4242            host_cx,
4243        )));
4244
4245        while operations.get() < max_operations {
4246            cx.background().simulate_random_delay().await;
4247            if clients.len() >= max_peers {
4248                break;
4249            } else if rng.lock().gen_bool(0.05) {
4250                operations.set(operations.get() + 1);
4251
4252                let guest_id = clients.len();
4253                log::info!("Adding guest {}", guest_id);
4254                next_entity_id += 100000;
4255                let mut guest_cx = TestAppContext::new(
4256                    cx.foreground_platform(),
4257                    cx.platform(),
4258                    cx.foreground(),
4259                    cx.background(),
4260                    cx.font_cache(),
4261                    cx.leak_detector(),
4262                    next_entity_id,
4263                );
4264                let guest = server
4265                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4266                    .await;
4267                let guest_project = Project::remote(
4268                    host_project_id,
4269                    guest.client.clone(),
4270                    guest.user_store.clone(),
4271                    guest_lang_registry.clone(),
4272                    FakeFs::new(cx.background()),
4273                    &mut guest_cx.to_async(),
4274                )
4275                .await
4276                .unwrap();
4277                clients.push(cx.foreground().spawn(guest.simulate_guest(
4278                    guest_id,
4279                    guest_project,
4280                    operations.clone(),
4281                    max_operations,
4282                    rng.clone(),
4283                    guest_cx,
4284                )));
4285
4286                log::info!("Guest {} added", guest_id);
4287            }
4288        }
4289
4290        let mut clients = futures::future::join_all(clients).await;
4291        cx.foreground().run_until_parked();
4292
4293        let (host_client, mut host_cx) = clients.remove(0);
4294        let host_project = host_client.project.as_ref().unwrap();
4295        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4296            project
4297                .worktrees(cx)
4298                .map(|worktree| {
4299                    let snapshot = worktree.read(cx).snapshot();
4300                    (snapshot.id(), snapshot)
4301                })
4302                .collect::<BTreeMap<_, _>>()
4303        });
4304
4305        host_client
4306            .project
4307            .as_ref()
4308            .unwrap()
4309            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4310
4311        for (guest_client, mut guest_cx) in clients.into_iter() {
4312            let guest_id = guest_client.client.id();
4313            let worktree_snapshots =
4314                guest_client
4315                    .project
4316                    .as_ref()
4317                    .unwrap()
4318                    .read_with(&guest_cx, |project, cx| {
4319                        project
4320                            .worktrees(cx)
4321                            .map(|worktree| {
4322                                let worktree = worktree.read(cx);
4323                                (worktree.id(), worktree.snapshot())
4324                            })
4325                            .collect::<BTreeMap<_, _>>()
4326                    });
4327
4328            assert_eq!(
4329                worktree_snapshots.keys().collect::<Vec<_>>(),
4330                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4331                "guest {} has different worktrees than the host",
4332                guest_id
4333            );
4334            for (id, host_snapshot) in &host_worktree_snapshots {
4335                let guest_snapshot = &worktree_snapshots[id];
4336                assert_eq!(
4337                    guest_snapshot.root_name(),
4338                    host_snapshot.root_name(),
4339                    "guest {} has different root name than the host for worktree {}",
4340                    guest_id,
4341                    id
4342                );
4343                assert_eq!(
4344                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4345                    host_snapshot.entries(false).collect::<Vec<_>>(),
4346                    "guest {} has different snapshot than the host for worktree {}",
4347                    guest_id,
4348                    id
4349                );
4350            }
4351
4352            guest_client
4353                .project
4354                .as_ref()
4355                .unwrap()
4356                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4357
4358            for guest_buffer in &guest_client.buffers {
4359                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4360                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4361                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4362                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4363                        guest_id, guest_client.peer_id, buffer_id
4364                    ))
4365                });
4366                let path = host_buffer
4367                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4368
4369                assert_eq!(
4370                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4371                    0,
4372                    "guest {}, buffer {}, path {:?} has deferred operations",
4373                    guest_id,
4374                    buffer_id,
4375                    path,
4376                );
4377                assert_eq!(
4378                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4379                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4380                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4381                    guest_id,
4382                    buffer_id,
4383                    path
4384                );
4385            }
4386
4387            guest_cx.update(|_| drop(guest_client));
4388        }
4389
4390        host_cx.update(|_| drop(host_client));
4391    }
4392
4393    struct TestServer {
4394        peer: Arc<Peer>,
4395        app_state: Arc<AppState>,
4396        server: Arc<Server>,
4397        foreground: Rc<executor::Foreground>,
4398        notifications: mpsc::UnboundedReceiver<()>,
4399        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4400        forbid_connections: Arc<AtomicBool>,
4401        _test_db: TestDb,
4402    }
4403
4404    impl TestServer {
4405        async fn start(
4406            foreground: Rc<executor::Foreground>,
4407            background: Arc<executor::Background>,
4408        ) -> Self {
4409            let test_db = TestDb::fake(background);
4410            let app_state = Self::build_app_state(&test_db).await;
4411            let peer = Peer::new();
4412            let notifications = mpsc::unbounded();
4413            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4414            Self {
4415                peer,
4416                app_state,
4417                server,
4418                foreground,
4419                notifications: notifications.1,
4420                connection_killers: Default::default(),
4421                forbid_connections: Default::default(),
4422                _test_db: test_db,
4423            }
4424        }
4425
4426        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4427            let http = FakeHttpClient::with_404_response();
4428            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4429            let client_name = name.to_string();
4430            let mut client = Client::new(http.clone());
4431            let server = self.server.clone();
4432            let connection_killers = self.connection_killers.clone();
4433            let forbid_connections = self.forbid_connections.clone();
4434            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4435
4436            Arc::get_mut(&mut client)
4437                .unwrap()
4438                .override_authenticate(move |cx| {
4439                    cx.spawn(|_| async move {
4440                        let access_token = "the-token".to_string();
4441                        Ok(Credentials {
4442                            user_id: user_id.0 as u64,
4443                            access_token,
4444                        })
4445                    })
4446                })
4447                .override_establish_connection(move |credentials, cx| {
4448                    assert_eq!(credentials.user_id, user_id.0 as u64);
4449                    assert_eq!(credentials.access_token, "the-token");
4450
4451                    let server = server.clone();
4452                    let connection_killers = connection_killers.clone();
4453                    let forbid_connections = forbid_connections.clone();
4454                    let client_name = client_name.clone();
4455                    let connection_id_tx = connection_id_tx.clone();
4456                    cx.spawn(move |cx| async move {
4457                        if forbid_connections.load(SeqCst) {
4458                            Err(EstablishConnectionError::other(anyhow!(
4459                                "server is forbidding connections"
4460                            )))
4461                        } else {
4462                            let (client_conn, server_conn, kill_conn) =
4463                                Connection::in_memory(cx.background());
4464                            connection_killers.lock().insert(user_id, kill_conn);
4465                            cx.background()
4466                                .spawn(server.handle_connection(
4467                                    server_conn,
4468                                    client_name,
4469                                    user_id,
4470                                    Some(connection_id_tx),
4471                                    cx.background(),
4472                                ))
4473                                .detach();
4474                            Ok(client_conn)
4475                        }
4476                    })
4477                });
4478
4479            client
4480                .authenticate_and_connect(&cx.to_async())
4481                .await
4482                .unwrap();
4483
4484            Channel::init(&client);
4485            Project::init(&client);
4486
4487            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4488            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4489
4490            let client = TestClient {
4491                client,
4492                peer_id,
4493                user_store,
4494                project: Default::default(),
4495                buffers: Default::default(),
4496            };
4497            client.wait_for_current_user(cx).await;
4498            client
4499        }
4500
4501        fn disconnect_client(&self, user_id: UserId) {
4502            self.connection_killers.lock().remove(&user_id);
4503        }
4504
4505        fn forbid_connections(&self) {
4506            self.forbid_connections.store(true, SeqCst);
4507        }
4508
4509        fn allow_connections(&self) {
4510            self.forbid_connections.store(false, SeqCst);
4511        }
4512
4513        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4514            let mut config = Config::default();
4515            config.session_secret = "a".repeat(32);
4516            config.database_url = test_db.url.clone();
4517            let github_client = github::AppClient::test();
4518            Arc::new(AppState {
4519                db: test_db.db().clone(),
4520                handlebars: Default::default(),
4521                auth_client: auth::build_client("", ""),
4522                repo_client: github::RepoClient::test(&github_client),
4523                github_client,
4524                config,
4525            })
4526        }
4527
4528        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4529            self.server.store.read()
4530        }
4531
4532        async fn condition<F>(&mut self, mut predicate: F)
4533        where
4534            F: FnMut(&Store) -> bool,
4535        {
4536            async_std::future::timeout(Duration::from_millis(500), async {
4537                while !(predicate)(&*self.server.store.read()) {
4538                    self.foreground.start_waiting();
4539                    self.notifications.next().await;
4540                    self.foreground.finish_waiting();
4541                }
4542            })
4543            .await
4544            .expect("condition timed out");
4545        }
4546    }
4547
4548    impl Drop for TestServer {
4549        fn drop(&mut self) {
4550            self.peer.reset();
4551        }
4552    }
4553
4554    struct TestClient {
4555        client: Arc<Client>,
4556        pub peer_id: PeerId,
4557        pub user_store: ModelHandle<UserStore>,
4558        project: Option<ModelHandle<Project>>,
4559        buffers: HashSet<ModelHandle<language::Buffer>>,
4560    }
4561
4562    impl Deref for TestClient {
4563        type Target = Arc<Client>;
4564
4565        fn deref(&self) -> &Self::Target {
4566            &self.client
4567        }
4568    }
4569
4570    impl TestClient {
4571        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4572            UserId::from_proto(
4573                self.user_store
4574                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4575            )
4576        }
4577
4578        async fn wait_for_current_user(&self, cx: &TestAppContext) {
4579            let mut authed_user = self
4580                .user_store
4581                .read_with(cx, |user_store, _| user_store.watch_current_user());
4582            while authed_user.next().await.unwrap().is_none() {}
4583        }
4584
4585        fn simulate_host(
4586            mut self,
4587            project: ModelHandle<Project>,
4588            mut language_server_config: LanguageServerConfig,
4589            operations: Rc<Cell<usize>>,
4590            max_operations: usize,
4591            rng: Arc<Mutex<StdRng>>,
4592            mut cx: TestAppContext,
4593        ) -> impl Future<Output = (Self, TestAppContext)> {
4594            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4595
4596            // Set up a fake language server.
4597            language_server_config.set_fake_initializer({
4598                let rng = rng.clone();
4599                let files = files.clone();
4600                let project = project.downgrade();
4601                move |fake_server| {
4602                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4603                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4604                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4605                                range: lsp::Range::new(
4606                                    lsp::Position::new(0, 0),
4607                                    lsp::Position::new(0, 0),
4608                                ),
4609                                new_text: "the-new-text".to_string(),
4610                            })),
4611                            ..Default::default()
4612                        }]))
4613                    });
4614
4615                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4616                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4617                            lsp::CodeAction {
4618                                title: "the-code-action".to_string(),
4619                                ..Default::default()
4620                            },
4621                        )])
4622                    });
4623
4624                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4625                        |params, _| {
4626                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4627                                params.position,
4628                                params.position,
4629                            )))
4630                        },
4631                    );
4632
4633                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4634                        let files = files.clone();
4635                        let rng = rng.clone();
4636                        move |_, _| {
4637                            let files = files.lock();
4638                            let mut rng = rng.lock();
4639                            let count = rng.gen_range::<usize, _>(1..3);
4640                            let files = (0..count)
4641                                .map(|_| files.choose(&mut *rng).unwrap())
4642                                .collect::<Vec<_>>();
4643                            log::info!("LSP: Returning definitions in files {:?}", &files);
4644                            Some(lsp::GotoDefinitionResponse::Array(
4645                                files
4646                                    .into_iter()
4647                                    .map(|file| lsp::Location {
4648                                        uri: lsp::Url::from_file_path(file).unwrap(),
4649                                        range: Default::default(),
4650                                    })
4651                                    .collect(),
4652                            ))
4653                        }
4654                    });
4655
4656                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4657                        let rng = rng.clone();
4658                        let project = project.clone();
4659                        move |params, mut cx| {
4660                            if let Some(project) = project.upgrade(&cx) {
4661                                project.update(&mut cx, |project, cx| {
4662                                    let path = params
4663                                        .text_document_position_params
4664                                        .text_document
4665                                        .uri
4666                                        .to_file_path()
4667                                        .unwrap();
4668                                    let (worktree, relative_path) =
4669                                        project.find_local_worktree(&path, cx)?;
4670                                    let project_path =
4671                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4672                                    let buffer =
4673                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4674
4675                                    let mut highlights = Vec::new();
4676                                    let highlight_count = rng.lock().gen_range(1..=5);
4677                                    let mut prev_end = 0;
4678                                    for _ in 0..highlight_count {
4679                                        let range =
4680                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4681                                        let start = buffer
4682                                            .offset_to_point_utf16(range.start)
4683                                            .to_lsp_position();
4684                                        let end = buffer
4685                                            .offset_to_point_utf16(range.end)
4686                                            .to_lsp_position();
4687                                        highlights.push(lsp::DocumentHighlight {
4688                                            range: lsp::Range::new(start, end),
4689                                            kind: Some(lsp::DocumentHighlightKind::READ),
4690                                        });
4691                                        prev_end = range.end;
4692                                    }
4693                                    Some(highlights)
4694                                })
4695                            } else {
4696                                None
4697                            }
4698                        }
4699                    });
4700                }
4701            });
4702
4703            project.update(&mut cx, |project, _| {
4704                project.languages().add(Arc::new(Language::new(
4705                    LanguageConfig {
4706                        name: "Rust".into(),
4707                        path_suffixes: vec!["rs".to_string()],
4708                        language_server: Some(language_server_config),
4709                        ..Default::default()
4710                    },
4711                    None,
4712                )));
4713            });
4714
4715            async move {
4716                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4717                while operations.get() < max_operations {
4718                    operations.set(operations.get() + 1);
4719
4720                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4721                    match distribution {
4722                        0..=20 if !files.lock().is_empty() => {
4723                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4724                            let mut path = path.as_path();
4725                            while let Some(parent_path) = path.parent() {
4726                                path = parent_path;
4727                                if rng.lock().gen() {
4728                                    break;
4729                                }
4730                            }
4731
4732                            log::info!("Host: find/create local worktree {:?}", path);
4733                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4734                                project.find_or_create_local_worktree(path, true, cx)
4735                            });
4736                            let find_or_create_worktree = async move {
4737                                find_or_create_worktree.await.unwrap();
4738                            };
4739                            if rng.lock().gen() {
4740                                cx.background().spawn(find_or_create_worktree).detach();
4741                            } else {
4742                                find_or_create_worktree.await;
4743                            }
4744                        }
4745                        10..=80 if !files.lock().is_empty() => {
4746                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4747                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4748                                let (worktree, path) = project
4749                                    .update(&mut cx, |project, cx| {
4750                                        project.find_or_create_local_worktree(
4751                                            file.clone(),
4752                                            true,
4753                                            cx,
4754                                        )
4755                                    })
4756                                    .await
4757                                    .unwrap();
4758                                let project_path =
4759                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4760                                log::info!(
4761                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4762                                    file,
4763                                    project_path.0,
4764                                    project_path.1
4765                                );
4766                                let buffer = project
4767                                    .update(&mut cx, |project, cx| {
4768                                        project.open_buffer(project_path, cx)
4769                                    })
4770                                    .await
4771                                    .unwrap();
4772                                self.buffers.insert(buffer.clone());
4773                                buffer
4774                            } else {
4775                                self.buffers
4776                                    .iter()
4777                                    .choose(&mut *rng.lock())
4778                                    .unwrap()
4779                                    .clone()
4780                            };
4781
4782                            if rng.lock().gen_bool(0.1) {
4783                                cx.update(|cx| {
4784                                    log::info!(
4785                                        "Host: dropping buffer {:?}",
4786                                        buffer.read(cx).file().unwrap().full_path(cx)
4787                                    );
4788                                    self.buffers.remove(&buffer);
4789                                    drop(buffer);
4790                                });
4791                            } else {
4792                                buffer.update(&mut cx, |buffer, cx| {
4793                                    log::info!(
4794                                        "Host: updating buffer {:?} ({})",
4795                                        buffer.file().unwrap().full_path(cx),
4796                                        buffer.remote_id()
4797                                    );
4798                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4799                                });
4800                            }
4801                        }
4802                        _ => loop {
4803                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4804                            let mut path = PathBuf::new();
4805                            path.push("/");
4806                            for _ in 0..path_component_count {
4807                                let letter = rng.lock().gen_range(b'a'..=b'z');
4808                                path.push(std::str::from_utf8(&[letter]).unwrap());
4809                            }
4810                            path.set_extension("rs");
4811                            let parent_path = path.parent().unwrap();
4812
4813                            log::info!("Host: creating file {:?}", path,);
4814
4815                            if fs.create_dir(&parent_path).await.is_ok()
4816                                && fs.create_file(&path, Default::default()).await.is_ok()
4817                            {
4818                                files.lock().push(path);
4819                                break;
4820                            } else {
4821                                log::info!("Host: cannot create file");
4822                            }
4823                        },
4824                    }
4825
4826                    cx.background().simulate_random_delay().await;
4827                }
4828
4829                log::info!("Host done");
4830
4831                self.project = Some(project);
4832                (self, cx)
4833            }
4834        }
4835
4836        pub async fn simulate_guest(
4837            mut self,
4838            guest_id: usize,
4839            project: ModelHandle<Project>,
4840            operations: Rc<Cell<usize>>,
4841            max_operations: usize,
4842            rng: Arc<Mutex<StdRng>>,
4843            mut cx: TestAppContext,
4844        ) -> (Self, TestAppContext) {
4845            while operations.get() < max_operations {
4846                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4847                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4848                        project
4849                            .worktrees(&cx)
4850                            .filter(|worktree| {
4851                                let worktree = worktree.read(cx);
4852                                worktree.is_visible()
4853                                    && worktree.entries(false).any(|e| e.is_file())
4854                            })
4855                            .choose(&mut *rng.lock())
4856                    }) {
4857                        worktree
4858                    } else {
4859                        cx.background().simulate_random_delay().await;
4860                        continue;
4861                    };
4862
4863                    operations.set(operations.get() + 1);
4864                    let (worktree_root_name, project_path) =
4865                        worktree.read_with(&cx, |worktree, _| {
4866                            let entry = worktree
4867                                .entries(false)
4868                                .filter(|e| e.is_file())
4869                                .choose(&mut *rng.lock())
4870                                .unwrap();
4871                            (
4872                                worktree.root_name().to_string(),
4873                                (worktree.id(), entry.path.clone()),
4874                            )
4875                        });
4876                    log::info!(
4877                        "Guest {}: opening path {:?} in worktree {} ({})",
4878                        guest_id,
4879                        project_path.1,
4880                        project_path.0,
4881                        worktree_root_name,
4882                    );
4883                    let buffer = project
4884                        .update(&mut cx, |project, cx| {
4885                            project.open_buffer(project_path.clone(), cx)
4886                        })
4887                        .await
4888                        .unwrap();
4889                    log::info!(
4890                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4891                        guest_id,
4892                        project_path.1,
4893                        project_path.0,
4894                        worktree_root_name,
4895                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4896                    );
4897                    self.buffers.insert(buffer.clone());
4898                    buffer
4899                } else {
4900                    operations.set(operations.get() + 1);
4901
4902                    self.buffers
4903                        .iter()
4904                        .choose(&mut *rng.lock())
4905                        .unwrap()
4906                        .clone()
4907                };
4908
4909                let choice = rng.lock().gen_range(0..100);
4910                match choice {
4911                    0..=9 => {
4912                        cx.update(|cx| {
4913                            log::info!(
4914                                "Guest {}: dropping buffer {:?}",
4915                                guest_id,
4916                                buffer.read(cx).file().unwrap().full_path(cx)
4917                            );
4918                            self.buffers.remove(&buffer);
4919                            drop(buffer);
4920                        });
4921                    }
4922                    10..=19 => {
4923                        let completions = project.update(&mut cx, |project, cx| {
4924                            log::info!(
4925                                "Guest {}: requesting completions for buffer {} ({:?})",
4926                                guest_id,
4927                                buffer.read(cx).remote_id(),
4928                                buffer.read(cx).file().unwrap().full_path(cx)
4929                            );
4930                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4931                            project.completions(&buffer, offset, cx)
4932                        });
4933                        let completions = cx.background().spawn(async move {
4934                            completions.await.expect("completions request failed");
4935                        });
4936                        if rng.lock().gen_bool(0.3) {
4937                            log::info!("Guest {}: detaching completions request", guest_id);
4938                            completions.detach();
4939                        } else {
4940                            completions.await;
4941                        }
4942                    }
4943                    20..=29 => {
4944                        let code_actions = project.update(&mut cx, |project, cx| {
4945                            log::info!(
4946                                "Guest {}: requesting code actions for buffer {} ({:?})",
4947                                guest_id,
4948                                buffer.read(cx).remote_id(),
4949                                buffer.read(cx).file().unwrap().full_path(cx)
4950                            );
4951                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4952                            project.code_actions(&buffer, range, cx)
4953                        });
4954                        let code_actions = cx.background().spawn(async move {
4955                            code_actions.await.expect("code actions request failed");
4956                        });
4957                        if rng.lock().gen_bool(0.3) {
4958                            log::info!("Guest {}: detaching code actions request", guest_id);
4959                            code_actions.detach();
4960                        } else {
4961                            code_actions.await;
4962                        }
4963                    }
4964                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4965                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4966                            log::info!(
4967                                "Guest {}: saving buffer {} ({:?})",
4968                                guest_id,
4969                                buffer.remote_id(),
4970                                buffer.file().unwrap().full_path(cx)
4971                            );
4972                            (buffer.version(), buffer.save(cx))
4973                        });
4974                        let save = cx.background().spawn(async move {
4975                            let (saved_version, _) = save.await.expect("save request failed");
4976                            assert!(saved_version.observed_all(&requested_version));
4977                        });
4978                        if rng.lock().gen_bool(0.3) {
4979                            log::info!("Guest {}: detaching save request", guest_id);
4980                            save.detach();
4981                        } else {
4982                            save.await;
4983                        }
4984                    }
4985                    40..=44 => {
4986                        let prepare_rename = project.update(&mut cx, |project, cx| {
4987                            log::info!(
4988                                "Guest {}: preparing rename for buffer {} ({:?})",
4989                                guest_id,
4990                                buffer.read(cx).remote_id(),
4991                                buffer.read(cx).file().unwrap().full_path(cx)
4992                            );
4993                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4994                            project.prepare_rename(buffer, offset, cx)
4995                        });
4996                        let prepare_rename = cx.background().spawn(async move {
4997                            prepare_rename.await.expect("prepare rename request failed");
4998                        });
4999                        if rng.lock().gen_bool(0.3) {
5000                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5001                            prepare_rename.detach();
5002                        } else {
5003                            prepare_rename.await;
5004                        }
5005                    }
5006                    45..=49 => {
5007                        let definitions = project.update(&mut cx, |project, cx| {
5008                            log::info!(
5009                                "Guest {}: requesting definitions for buffer {} ({:?})",
5010                                guest_id,
5011                                buffer.read(cx).remote_id(),
5012                                buffer.read(cx).file().unwrap().full_path(cx)
5013                            );
5014                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5015                            project.definition(&buffer, offset, cx)
5016                        });
5017                        let definitions = cx.background().spawn(async move {
5018                            definitions.await.expect("definitions request failed")
5019                        });
5020                        if rng.lock().gen_bool(0.3) {
5021                            log::info!("Guest {}: detaching definitions request", guest_id);
5022                            definitions.detach();
5023                        } else {
5024                            self.buffers
5025                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5026                        }
5027                    }
5028                    50..=54 => {
5029                        let highlights = project.update(&mut cx, |project, cx| {
5030                            log::info!(
5031                                "Guest {}: requesting highlights for buffer {} ({:?})",
5032                                guest_id,
5033                                buffer.read(cx).remote_id(),
5034                                buffer.read(cx).file().unwrap().full_path(cx)
5035                            );
5036                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5037                            project.document_highlights(&buffer, offset, cx)
5038                        });
5039                        let highlights = cx.background().spawn(async move {
5040                            highlights.await.expect("highlights request failed");
5041                        });
5042                        if rng.lock().gen_bool(0.3) {
5043                            log::info!("Guest {}: detaching highlights request", guest_id);
5044                            highlights.detach();
5045                        } else {
5046                            highlights.await;
5047                        }
5048                    }
5049                    55..=59 => {
5050                        let search = project.update(&mut cx, |project, cx| {
5051                            let query = rng.lock().gen_range('a'..='z');
5052                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5053                            project.search(SearchQuery::text(query, false, false), cx)
5054                        });
5055                        let search = cx
5056                            .background()
5057                            .spawn(async move { search.await.expect("search request failed") });
5058                        if rng.lock().gen_bool(0.3) {
5059                            log::info!("Guest {}: detaching search request", guest_id);
5060                            search.detach();
5061                        } else {
5062                            self.buffers.extend(search.await.into_keys());
5063                        }
5064                    }
5065                    _ => {
5066                        buffer.update(&mut cx, |buffer, cx| {
5067                            log::info!(
5068                                "Guest {}: updating buffer {} ({:?})",
5069                                guest_id,
5070                                buffer.remote_id(),
5071                                buffer.file().unwrap().full_path(cx)
5072                            );
5073                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5074                        });
5075                    }
5076                }
5077                cx.background().simulate_random_delay().await;
5078            }
5079
5080            log::info!("Guest {} done", guest_id);
5081
5082            self.project = Some(project);
5083            (self, cx)
5084        }
5085    }
5086
5087    impl Drop for TestClient {
5088        fn drop(&mut self) {
5089            self.client.tear_down();
5090        }
5091    }
5092
5093    impl Executor for Arc<gpui::executor::Background> {
5094        type Timer = gpui::executor::Timer;
5095
5096        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5097            self.spawn(future).detach();
5098        }
5099
5100        fn timer(&self, duration: Duration) -> Self::Timer {
5101            self.as_ref().timer(duration)
5102        }
5103    }
5104
5105    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5106        channel
5107            .messages()
5108            .cursor::<()>()
5109            .map(|m| {
5110                (
5111                    m.sender.github_login.clone(),
5112                    m.body.clone(),
5113                    m.is_pending(),
5114                )
5115            })
5116            .collect()
5117    }
5118
5119    struct EmptyView;
5120
5121    impl gpui::Entity for EmptyView {
5122        type Event = ();
5123    }
5124
5125    impl gpui::View for EmptyView {
5126        fn ui_name() -> &'static str {
5127            "empty view"
5128        }
5129
5130        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5131            gpui::Element::boxed(gpui::elements::Empty)
5132        }
5133    }
5134}