rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::update_diagnostic_summary)
  87            .add_message_handler(Server::disk_based_diagnostics_updating)
  88            .add_message_handler(Server::disk_based_diagnostics_updated)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  96            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  97            .add_request_handler(
  98                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  99            )
 100            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 101            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 102            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 105            .add_request_handler(Server::update_buffer)
 106            .add_message_handler(Server::update_buffer_file)
 107            .add_message_handler(Server::buffer_reloaded)
 108            .add_message_handler(Server::buffer_saved)
 109            .add_request_handler(Server::save_buffer)
 110            .add_request_handler(Server::get_channels)
 111            .add_request_handler(Server::get_users)
 112            .add_request_handler(Server::join_channel)
 113            .add_message_handler(Server::leave_channel)
 114            .add_request_handler(Server::send_channel_message)
 115            .add_request_handler(Server::get_channel_messages);
 116
 117        Arc::new(server)
 118    }
 119
 120    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 121    where
 122        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 123        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 124        M: EnvelopedMessage,
 125    {
 126        let prev_handler = self.handlers.insert(
 127            TypeId::of::<M>(),
 128            Box::new(move |server, envelope| {
 129                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 130                (handler)(server, *envelope).boxed()
 131            }),
 132        );
 133        if prev_handler.is_some() {
 134            panic!("registered a handler for the same message twice");
 135        }
 136        self
 137    }
 138
 139    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 140    where
 141        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 142        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 143        M: RequestMessage,
 144    {
 145        self.add_message_handler(move |server, envelope| {
 146            let receipt = envelope.receipt();
 147            let response = (handler)(server.clone(), envelope);
 148            async move {
 149                match response.await {
 150                    Ok(response) => {
 151                        server.peer.respond(receipt, response)?;
 152                        Ok(())
 153                    }
 154                    Err(error) => {
 155                        server.peer.respond_with_error(
 156                            receipt,
 157                            proto::Error {
 158                                message: error.to_string(),
 159                            },
 160                        )?;
 161                        Err(error)
 162                    }
 163                }
 164            }
 165        })
 166    }
 167
 168    pub fn handle_connection<E: Executor>(
 169        self: &Arc<Self>,
 170        connection: Connection,
 171        addr: String,
 172        user_id: UserId,
 173        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 174        executor: E,
 175    ) -> impl Future<Output = ()> {
 176        let mut this = self.clone();
 177        async move {
 178            let (connection_id, handle_io, mut incoming_rx) = this
 179                .peer
 180                .add_connection(connection, {
 181                    let executor = executor.clone();
 182                    move |duration| {
 183                        let timer = executor.timer(duration);
 184                        async move {
 185                            timer.await;
 186                        }
 187                    }
 188                })
 189                .await;
 190
 191            if let Some(send_connection_id) = send_connection_id.as_mut() {
 192                let _ = send_connection_id.send(connection_id).await;
 193            }
 194
 195            this.state_mut().add_connection(connection_id, user_id);
 196            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 197                log::error!("error updating contacts for {:?}: {}", user_id, err);
 198            }
 199
 200            let handle_io = handle_io.fuse();
 201            futures::pin_mut!(handle_io);
 202            loop {
 203                let next_message = incoming_rx.next().fuse();
 204                futures::pin_mut!(next_message);
 205                futures::select_biased! {
 206                    result = handle_io => {
 207                        if let Err(err) = result {
 208                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 209                        }
 210                        break;
 211                    }
 212                    message = next_message => {
 213                        if let Some(message) = message {
 214                            let start_time = Instant::now();
 215                            let type_name = message.payload_type_name();
 216                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 217                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 218                                let notifications = this.notifications.clone();
 219                                let is_background = message.is_background();
 220                                let handle_message = (handler)(this.clone(), message);
 221                                let handle_message = async move {
 222                                    if let Err(err) = handle_message.await {
 223                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 224                                    } else {
 225                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 226                                    }
 227                                    if let Some(mut notifications) = notifications {
 228                                        let _ = notifications.send(()).await;
 229                                    }
 230                                };
 231                                if is_background {
 232                                    executor.spawn_detached(handle_message);
 233                                } else {
 234                                    handle_message.await;
 235                                }
 236                            } else {
 237                                log::warn!("unhandled message: {}", type_name);
 238                            }
 239                        } else {
 240                            log::info!("rpc connection closed {:?}", addr);
 241                            break;
 242                        }
 243                    }
 244                }
 245            }
 246
 247            if let Err(err) = this.sign_out(connection_id).await {
 248                log::error!("error signing out connection {:?} - {:?}", addr, err);
 249            }
 250        }
 251    }
 252
 253    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 254        self.peer.disconnect(connection_id);
 255        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 256
 257        for (project_id, project) in removed_connection.hosted_projects {
 258            if let Some(share) = project.share {
 259                broadcast(
 260                    connection_id,
 261                    share.guests.keys().copied().collect(),
 262                    |conn_id| {
 263                        self.peer
 264                            .send(conn_id, proto::UnshareProject { project_id })
 265                    },
 266                )?;
 267            }
 268        }
 269
 270        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 271            broadcast(connection_id, peer_ids, |conn_id| {
 272                self.peer.send(
 273                    conn_id,
 274                    proto::RemoveProjectCollaborator {
 275                        project_id,
 276                        peer_id: connection_id.0,
 277                    },
 278                )
 279            })?;
 280        }
 281
 282        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 283        Ok(())
 284    }
 285
 286    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 287        Ok(proto::Ack {})
 288    }
 289
 290    async fn register_project(
 291        mut self: Arc<Server>,
 292        request: TypedEnvelope<proto::RegisterProject>,
 293    ) -> tide::Result<proto::RegisterProjectResponse> {
 294        let project_id = {
 295            let mut state = self.state_mut();
 296            let user_id = state.user_id_for_connection(request.sender_id)?;
 297            state.register_project(request.sender_id, user_id)
 298        };
 299        Ok(proto::RegisterProjectResponse { project_id })
 300    }
 301
 302    async fn unregister_project(
 303        mut self: Arc<Server>,
 304        request: TypedEnvelope<proto::UnregisterProject>,
 305    ) -> tide::Result<()> {
 306        let project = self
 307            .state_mut()
 308            .unregister_project(request.payload.project_id, request.sender_id)?;
 309        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 310        Ok(())
 311    }
 312
 313    async fn share_project(
 314        mut self: Arc<Server>,
 315        request: TypedEnvelope<proto::ShareProject>,
 316    ) -> tide::Result<proto::Ack> {
 317        self.state_mut()
 318            .share_project(request.payload.project_id, request.sender_id);
 319        Ok(proto::Ack {})
 320    }
 321
 322    async fn unshare_project(
 323        mut self: Arc<Server>,
 324        request: TypedEnvelope<proto::UnshareProject>,
 325    ) -> tide::Result<()> {
 326        let project_id = request.payload.project_id;
 327        let project = self
 328            .state_mut()
 329            .unshare_project(project_id, request.sender_id)?;
 330
 331        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 332            self.peer
 333                .send(conn_id, proto::UnshareProject { project_id })
 334        })?;
 335        self.update_contacts_for_users(&project.authorized_user_ids)?;
 336        Ok(())
 337    }
 338
 339    async fn join_project(
 340        mut self: Arc<Server>,
 341        request: TypedEnvelope<proto::JoinProject>,
 342    ) -> tide::Result<proto::JoinProjectResponse> {
 343        let project_id = request.payload.project_id;
 344
 345        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 346        let (response, connection_ids, contact_user_ids) = self
 347            .state_mut()
 348            .join_project(request.sender_id, user_id, project_id)
 349            .and_then(|joined| {
 350                let share = joined.project.share()?;
 351                let peer_count = share.guests.len();
 352                let mut collaborators = Vec::with_capacity(peer_count);
 353                collaborators.push(proto::Collaborator {
 354                    peer_id: joined.project.host_connection_id.0,
 355                    replica_id: 0,
 356                    user_id: joined.project.host_user_id.to_proto(),
 357                });
 358                let worktrees = share
 359                    .worktrees
 360                    .iter()
 361                    .filter_map(|(id, shared_worktree)| {
 362                        let worktree = joined.project.worktrees.get(&id)?;
 363                        Some(proto::Worktree {
 364                            id: *id,
 365                            root_name: worktree.root_name.clone(),
 366                            entries: shared_worktree.entries.values().cloned().collect(),
 367                            diagnostic_summaries: shared_worktree
 368                                .diagnostic_summaries
 369                                .values()
 370                                .cloned()
 371                                .collect(),
 372                            visible: worktree.visible,
 373                        })
 374                    })
 375                    .collect();
 376                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 377                    if *peer_conn_id != request.sender_id {
 378                        collaborators.push(proto::Collaborator {
 379                            peer_id: peer_conn_id.0,
 380                            replica_id: *peer_replica_id as u32,
 381                            user_id: peer_user_id.to_proto(),
 382                        });
 383                    }
 384                }
 385                let response = proto::JoinProjectResponse {
 386                    worktrees,
 387                    replica_id: joined.replica_id as u32,
 388                    collaborators,
 389                };
 390                let connection_ids = joined.project.connection_ids();
 391                let contact_user_ids = joined.project.authorized_user_ids();
 392                Ok((response, connection_ids, contact_user_ids))
 393            })?;
 394
 395        broadcast(request.sender_id, connection_ids, |conn_id| {
 396            self.peer.send(
 397                conn_id,
 398                proto::AddProjectCollaborator {
 399                    project_id,
 400                    collaborator: Some(proto::Collaborator {
 401                        peer_id: request.sender_id.0,
 402                        replica_id: response.replica_id,
 403                        user_id: user_id.to_proto(),
 404                    }),
 405                },
 406            )
 407        })?;
 408        self.update_contacts_for_users(&contact_user_ids)?;
 409        Ok(response)
 410    }
 411
 412    async fn leave_project(
 413        mut self: Arc<Server>,
 414        request: TypedEnvelope<proto::LeaveProject>,
 415    ) -> tide::Result<()> {
 416        let sender_id = request.sender_id;
 417        let project_id = request.payload.project_id;
 418        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 419
 420        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 421            self.peer.send(
 422                conn_id,
 423                proto::RemoveProjectCollaborator {
 424                    project_id,
 425                    peer_id: sender_id.0,
 426                },
 427            )
 428        })?;
 429        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 430
 431        Ok(())
 432    }
 433
 434    async fn register_worktree(
 435        mut self: Arc<Server>,
 436        request: TypedEnvelope<proto::RegisterWorktree>,
 437    ) -> tide::Result<proto::Ack> {
 438        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 439
 440        let mut contact_user_ids = HashSet::default();
 441        contact_user_ids.insert(host_user_id);
 442        for github_login in &request.payload.authorized_logins {
 443            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 444            contact_user_ids.insert(contact_user_id);
 445        }
 446
 447        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 448        let guest_connection_ids;
 449        {
 450            let mut state = self.state_mut();
 451            guest_connection_ids = state
 452                .read_project(request.payload.project_id, request.sender_id)?
 453                .guest_connection_ids();
 454            state.register_worktree(
 455                request.payload.project_id,
 456                request.payload.worktree_id,
 457                request.sender_id,
 458                Worktree {
 459                    authorized_user_ids: contact_user_ids.clone(),
 460                    root_name: request.payload.root_name.clone(),
 461                    visible: request.payload.visible,
 462                },
 463            )?;
 464        }
 465        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 466            self.peer
 467                .forward_send(request.sender_id, connection_id, request.payload.clone())
 468        })?;
 469        self.update_contacts_for_users(&contact_user_ids)?;
 470        Ok(proto::Ack {})
 471    }
 472
 473    async fn unregister_worktree(
 474        mut self: Arc<Server>,
 475        request: TypedEnvelope<proto::UnregisterWorktree>,
 476    ) -> tide::Result<()> {
 477        let project_id = request.payload.project_id;
 478        let worktree_id = request.payload.worktree_id;
 479        let (worktree, guest_connection_ids) =
 480            self.state_mut()
 481                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 482        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 483            self.peer.send(
 484                conn_id,
 485                proto::UnregisterWorktree {
 486                    project_id,
 487                    worktree_id,
 488                },
 489            )
 490        })?;
 491        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 492        Ok(())
 493    }
 494
 495    async fn update_worktree(
 496        mut self: Arc<Server>,
 497        request: TypedEnvelope<proto::UpdateWorktree>,
 498    ) -> tide::Result<proto::Ack> {
 499        let connection_ids = self.state_mut().update_worktree(
 500            request.sender_id,
 501            request.payload.project_id,
 502            request.payload.worktree_id,
 503            &request.payload.removed_entries,
 504            &request.payload.updated_entries,
 505        )?;
 506
 507        broadcast(request.sender_id, connection_ids, |connection_id| {
 508            self.peer
 509                .forward_send(request.sender_id, connection_id, request.payload.clone())
 510        })?;
 511
 512        Ok(proto::Ack {})
 513    }
 514
 515    async fn update_diagnostic_summary(
 516        mut self: Arc<Server>,
 517        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 518    ) -> tide::Result<()> {
 519        let summary = request
 520            .payload
 521            .summary
 522            .clone()
 523            .ok_or_else(|| anyhow!("invalid summary"))?;
 524        let receiver_ids = self.state_mut().update_diagnostic_summary(
 525            request.payload.project_id,
 526            request.payload.worktree_id,
 527            request.sender_id,
 528            summary,
 529        )?;
 530
 531        broadcast(request.sender_id, receiver_ids, |connection_id| {
 532            self.peer
 533                .forward_send(request.sender_id, connection_id, request.payload.clone())
 534        })?;
 535        Ok(())
 536    }
 537
 538    async fn disk_based_diagnostics_updating(
 539        self: Arc<Server>,
 540        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 541    ) -> tide::Result<()> {
 542        let receiver_ids = self
 543            .state()
 544            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 545        broadcast(request.sender_id, receiver_ids, |connection_id| {
 546            self.peer
 547                .forward_send(request.sender_id, connection_id, request.payload.clone())
 548        })?;
 549        Ok(())
 550    }
 551
 552    async fn disk_based_diagnostics_updated(
 553        self: Arc<Server>,
 554        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 555    ) -> tide::Result<()> {
 556        let receiver_ids = self
 557            .state()
 558            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 559        broadcast(request.sender_id, receiver_ids, |connection_id| {
 560            self.peer
 561                .forward_send(request.sender_id, connection_id, request.payload.clone())
 562        })?;
 563        Ok(())
 564    }
 565
 566    async fn forward_project_request<T>(
 567        self: Arc<Server>,
 568        request: TypedEnvelope<T>,
 569    ) -> tide::Result<T::Response>
 570    where
 571        T: EntityMessage + RequestMessage,
 572    {
 573        let host_connection_id = self
 574            .state()
 575            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 576            .host_connection_id;
 577        Ok(self
 578            .peer
 579            .forward_request(request.sender_id, host_connection_id, request.payload)
 580            .await?)
 581    }
 582
 583    async fn save_buffer(
 584        self: Arc<Server>,
 585        request: TypedEnvelope<proto::SaveBuffer>,
 586    ) -> tide::Result<proto::BufferSaved> {
 587        let host;
 588        let mut guests;
 589        {
 590            let state = self.state();
 591            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 592            host = project.host_connection_id;
 593            guests = project.guest_connection_ids()
 594        }
 595
 596        let response = self
 597            .peer
 598            .forward_request(request.sender_id, host, request.payload.clone())
 599            .await?;
 600
 601        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 602        broadcast(host, guests, |conn_id| {
 603            self.peer.forward_send(host, conn_id, response.clone())
 604        })?;
 605
 606        Ok(response)
 607    }
 608
 609    async fn update_buffer(
 610        self: Arc<Server>,
 611        request: TypedEnvelope<proto::UpdateBuffer>,
 612    ) -> tide::Result<proto::Ack> {
 613        let receiver_ids = self
 614            .state()
 615            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 616        broadcast(request.sender_id, receiver_ids, |connection_id| {
 617            self.peer
 618                .forward_send(request.sender_id, connection_id, request.payload.clone())
 619        })?;
 620        Ok(proto::Ack {})
 621    }
 622
 623    async fn update_buffer_file(
 624        self: Arc<Server>,
 625        request: TypedEnvelope<proto::UpdateBufferFile>,
 626    ) -> tide::Result<()> {
 627        let receiver_ids = self
 628            .state()
 629            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 630        broadcast(request.sender_id, receiver_ids, |connection_id| {
 631            self.peer
 632                .forward_send(request.sender_id, connection_id, request.payload.clone())
 633        })?;
 634        Ok(())
 635    }
 636
 637    async fn buffer_reloaded(
 638        self: Arc<Server>,
 639        request: TypedEnvelope<proto::BufferReloaded>,
 640    ) -> tide::Result<()> {
 641        let receiver_ids = self
 642            .state()
 643            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 644        broadcast(request.sender_id, receiver_ids, |connection_id| {
 645            self.peer
 646                .forward_send(request.sender_id, connection_id, request.payload.clone())
 647        })?;
 648        Ok(())
 649    }
 650
 651    async fn buffer_saved(
 652        self: Arc<Server>,
 653        request: TypedEnvelope<proto::BufferSaved>,
 654    ) -> tide::Result<()> {
 655        let receiver_ids = self
 656            .state()
 657            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 658        broadcast(request.sender_id, receiver_ids, |connection_id| {
 659            self.peer
 660                .forward_send(request.sender_id, connection_id, request.payload.clone())
 661        })?;
 662        Ok(())
 663    }
 664
 665    async fn get_channels(
 666        self: Arc<Server>,
 667        request: TypedEnvelope<proto::GetChannels>,
 668    ) -> tide::Result<proto::GetChannelsResponse> {
 669        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 670        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 671        Ok(proto::GetChannelsResponse {
 672            channels: channels
 673                .into_iter()
 674                .map(|chan| proto::Channel {
 675                    id: chan.id.to_proto(),
 676                    name: chan.name,
 677                })
 678                .collect(),
 679        })
 680    }
 681
 682    async fn get_users(
 683        self: Arc<Server>,
 684        request: TypedEnvelope<proto::GetUsers>,
 685    ) -> tide::Result<proto::GetUsersResponse> {
 686        let user_ids = request
 687            .payload
 688            .user_ids
 689            .into_iter()
 690            .map(UserId::from_proto)
 691            .collect();
 692        let users = self
 693            .app_state
 694            .db
 695            .get_users_by_ids(user_ids)
 696            .await?
 697            .into_iter()
 698            .map(|user| proto::User {
 699                id: user.id.to_proto(),
 700                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 701                github_login: user.github_login,
 702            })
 703            .collect();
 704        Ok(proto::GetUsersResponse { users })
 705    }
 706
 707    fn update_contacts_for_users<'a>(
 708        self: &Arc<Server>,
 709        user_ids: impl IntoIterator<Item = &'a UserId>,
 710    ) -> anyhow::Result<()> {
 711        let mut result = Ok(());
 712        let state = self.state();
 713        for user_id in user_ids {
 714            let contacts = state.contacts_for_user(*user_id);
 715            for connection_id in state.connection_ids_for_user(*user_id) {
 716                if let Err(error) = self.peer.send(
 717                    connection_id,
 718                    proto::UpdateContacts {
 719                        contacts: contacts.clone(),
 720                    },
 721                ) {
 722                    result = Err(error);
 723                }
 724            }
 725        }
 726        result
 727    }
 728
 729    async fn join_channel(
 730        mut self: Arc<Self>,
 731        request: TypedEnvelope<proto::JoinChannel>,
 732    ) -> tide::Result<proto::JoinChannelResponse> {
 733        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 734        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 735        if !self
 736            .app_state
 737            .db
 738            .can_user_access_channel(user_id, channel_id)
 739            .await?
 740        {
 741            Err(anyhow!("access denied"))?;
 742        }
 743
 744        self.state_mut().join_channel(request.sender_id, channel_id);
 745        let messages = self
 746            .app_state
 747            .db
 748            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 749            .await?
 750            .into_iter()
 751            .map(|msg| proto::ChannelMessage {
 752                id: msg.id.to_proto(),
 753                body: msg.body,
 754                timestamp: msg.sent_at.unix_timestamp() as u64,
 755                sender_id: msg.sender_id.to_proto(),
 756                nonce: Some(msg.nonce.as_u128().into()),
 757            })
 758            .collect::<Vec<_>>();
 759        Ok(proto::JoinChannelResponse {
 760            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 761            messages,
 762        })
 763    }
 764
 765    async fn leave_channel(
 766        mut self: Arc<Self>,
 767        request: TypedEnvelope<proto::LeaveChannel>,
 768    ) -> tide::Result<()> {
 769        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 770        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 771        if !self
 772            .app_state
 773            .db
 774            .can_user_access_channel(user_id, channel_id)
 775            .await?
 776        {
 777            Err(anyhow!("access denied"))?;
 778        }
 779
 780        self.state_mut()
 781            .leave_channel(request.sender_id, channel_id);
 782
 783        Ok(())
 784    }
 785
 786    async fn send_channel_message(
 787        self: Arc<Self>,
 788        request: TypedEnvelope<proto::SendChannelMessage>,
 789    ) -> tide::Result<proto::SendChannelMessageResponse> {
 790        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 791        let user_id;
 792        let connection_ids;
 793        {
 794            let state = self.state();
 795            user_id = state.user_id_for_connection(request.sender_id)?;
 796            connection_ids = state.channel_connection_ids(channel_id)?;
 797        }
 798
 799        // Validate the message body.
 800        let body = request.payload.body.trim().to_string();
 801        if body.len() > MAX_MESSAGE_LEN {
 802            return Err(anyhow!("message is too long"))?;
 803        }
 804        if body.is_empty() {
 805            return Err(anyhow!("message can't be blank"))?;
 806        }
 807
 808        let timestamp = OffsetDateTime::now_utc();
 809        let nonce = request
 810            .payload
 811            .nonce
 812            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 813
 814        let message_id = self
 815            .app_state
 816            .db
 817            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 818            .await?
 819            .to_proto();
 820        let message = proto::ChannelMessage {
 821            sender_id: user_id.to_proto(),
 822            id: message_id,
 823            body,
 824            timestamp: timestamp.unix_timestamp() as u64,
 825            nonce: Some(nonce),
 826        };
 827        broadcast(request.sender_id, connection_ids, |conn_id| {
 828            self.peer.send(
 829                conn_id,
 830                proto::ChannelMessageSent {
 831                    channel_id: channel_id.to_proto(),
 832                    message: Some(message.clone()),
 833                },
 834            )
 835        })?;
 836        Ok(proto::SendChannelMessageResponse {
 837            message: Some(message),
 838        })
 839    }
 840
 841    async fn get_channel_messages(
 842        self: Arc<Self>,
 843        request: TypedEnvelope<proto::GetChannelMessages>,
 844    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 845        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 846        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 847        if !self
 848            .app_state
 849            .db
 850            .can_user_access_channel(user_id, channel_id)
 851            .await?
 852        {
 853            Err(anyhow!("access denied"))?;
 854        }
 855
 856        let messages = self
 857            .app_state
 858            .db
 859            .get_channel_messages(
 860                channel_id,
 861                MESSAGE_COUNT_PER_PAGE,
 862                Some(MessageId::from_proto(request.payload.before_message_id)),
 863            )
 864            .await?
 865            .into_iter()
 866            .map(|msg| proto::ChannelMessage {
 867                id: msg.id.to_proto(),
 868                body: msg.body,
 869                timestamp: msg.sent_at.unix_timestamp() as u64,
 870                sender_id: msg.sender_id.to_proto(),
 871                nonce: Some(msg.nonce.as_u128().into()),
 872            })
 873            .collect::<Vec<_>>();
 874
 875        Ok(proto::GetChannelMessagesResponse {
 876            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 877            messages,
 878        })
 879    }
 880
 881    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 882        self.store.read()
 883    }
 884
 885    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 886        self.store.write()
 887    }
 888}
 889
 890impl Executor for RealExecutor {
 891    type Timer = Timer;
 892
 893    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 894        task::spawn(future);
 895    }
 896
 897    fn timer(&self, duration: Duration) -> Self::Timer {
 898        Timer::after(duration)
 899    }
 900}
 901
 902fn broadcast<F>(
 903    sender_id: ConnectionId,
 904    receiver_ids: Vec<ConnectionId>,
 905    mut f: F,
 906) -> anyhow::Result<()>
 907where
 908    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 909{
 910    let mut result = Ok(());
 911    for receiver_id in receiver_ids {
 912        if receiver_id != sender_id {
 913            if let Err(error) = f(receiver_id) {
 914                if result.is_ok() {
 915                    result = Err(error);
 916                }
 917            }
 918        }
 919    }
 920    result
 921}
 922
 923pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 924    let server = Server::new(app.state().clone(), rpc.clone(), None);
 925    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 926        let server = server.clone();
 927        async move {
 928            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 929
 930            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 931            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 932            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 933            let client_protocol_version: Option<u32> = request
 934                .header("X-Zed-Protocol-Version")
 935                .and_then(|v| v.as_str().parse().ok());
 936
 937            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 938                return Ok(Response::new(StatusCode::UpgradeRequired));
 939            }
 940
 941            let header = match request.header("Sec-Websocket-Key") {
 942                Some(h) => h.as_str(),
 943                None => return Err(anyhow!("expected sec-websocket-key"))?,
 944            };
 945
 946            let user_id = process_auth_header(&request).await?;
 947
 948            let mut response = Response::new(StatusCode::SwitchingProtocols);
 949            response.insert_header(UPGRADE, "websocket");
 950            response.insert_header(CONNECTION, "Upgrade");
 951            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 952            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 953            response.insert_header("Sec-Websocket-Version", "13");
 954
 955            let http_res: &mut tide::http::Response = response.as_mut();
 956            let upgrade_receiver = http_res.recv_upgrade().await;
 957            let addr = request.remote().unwrap_or("unknown").to_string();
 958            task::spawn(async move {
 959                if let Some(stream) = upgrade_receiver.await {
 960                    server
 961                        .handle_connection(
 962                            Connection::new(
 963                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 964                            ),
 965                            addr,
 966                            user_id,
 967                            None,
 968                            RealExecutor,
 969                        )
 970                        .await;
 971                }
 972            });
 973
 974            Ok(response)
 975        }
 976    });
 977}
 978
 979fn header_contains_ignore_case<T>(
 980    request: &tide::Request<T>,
 981    header_name: HeaderName,
 982    value: &str,
 983) -> bool {
 984    request
 985        .header(header_name)
 986        .map(|h| {
 987            h.as_str()
 988                .split(',')
 989                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 990        })
 991        .unwrap_or(false)
 992}
 993
 994#[cfg(test)]
 995mod tests {
 996    use super::*;
 997    use crate::{
 998        auth,
 999        db::{tests::TestDb, UserId},
1000        github, AppState, Config,
1001    };
1002    use ::rpc::Peer;
1003    use client::{
1004        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1005        EstablishConnectionError, UserStore,
1006    };
1007    use collections::BTreeMap;
1008    use editor::{
1009        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1010        Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1011    };
1012    use gpui::{executor, ModelHandle, TestAppContext};
1013    use language::{
1014        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1015        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1016    };
1017    use lsp;
1018    use parking_lot::Mutex;
1019    use postage::{barrier, watch};
1020    use project::{
1021        fs::{FakeFs, Fs as _},
1022        search::SearchQuery,
1023        worktree::WorktreeHandle,
1024        DiagnosticSummary, Project, ProjectPath,
1025    };
1026    use rand::prelude::*;
1027    use rpc::PeerId;
1028    use serde_json::json;
1029    use sqlx::types::time::OffsetDateTime;
1030    use std::{
1031        cell::Cell,
1032        env,
1033        ops::Deref,
1034        path::{Path, PathBuf},
1035        rc::Rc,
1036        sync::{
1037            atomic::{AtomicBool, Ordering::SeqCst},
1038            Arc,
1039        },
1040        time::Duration,
1041    };
1042    use workspace::{Settings, Workspace, WorkspaceParams};
1043
1044    #[cfg(test)]
1045    #[ctor::ctor]
1046    fn init_logger() {
1047        if std::env::var("RUST_LOG").is_ok() {
1048            env_logger::init();
1049        }
1050    }
1051
1052    #[gpui::test(iterations = 10)]
1053    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1054        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1055        let lang_registry = Arc::new(LanguageRegistry::test());
1056        let fs = FakeFs::new(cx_a.background());
1057        cx_a.foreground().forbid_parking();
1058
1059        // Connect to a server as 2 clients.
1060        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1061        let client_a = server.create_client(cx_a, "user_a").await;
1062        let client_b = server.create_client(cx_b, "user_b").await;
1063
1064        // Share a project as client A
1065        fs.insert_tree(
1066            "/a",
1067            json!({
1068                ".zed.toml": r#"collaborators = ["user_b"]"#,
1069                "a.txt": "a-contents",
1070                "b.txt": "b-contents",
1071            }),
1072        )
1073        .await;
1074        let project_a = cx_a.update(|cx| {
1075            Project::local(
1076                client_a.clone(),
1077                client_a.user_store.clone(),
1078                lang_registry.clone(),
1079                fs.clone(),
1080                cx,
1081            )
1082        });
1083        let (worktree_a, _) = project_a
1084            .update(cx_a, |p, cx| {
1085                p.find_or_create_local_worktree("/a", true, cx)
1086            })
1087            .await
1088            .unwrap();
1089        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1090        worktree_a
1091            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1092            .await;
1093        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1094        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1095
1096        // Join that project as client B
1097        let project_b = Project::remote(
1098            project_id,
1099            client_b.clone(),
1100            client_b.user_store.clone(),
1101            lang_registry.clone(),
1102            fs.clone(),
1103            &mut cx_b.to_async(),
1104        )
1105        .await
1106        .unwrap();
1107
1108        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1109            assert_eq!(
1110                project
1111                    .collaborators()
1112                    .get(&client_a.peer_id)
1113                    .unwrap()
1114                    .user
1115                    .github_login,
1116                "user_a"
1117            );
1118            project.replica_id()
1119        });
1120        project_a
1121            .condition(&cx_a, |tree, _| {
1122                tree.collaborators()
1123                    .get(&client_b.peer_id)
1124                    .map_or(false, |collaborator| {
1125                        collaborator.replica_id == replica_id_b
1126                            && collaborator.user.github_login == "user_b"
1127                    })
1128            })
1129            .await;
1130
1131        // Open the same file as client B and client A.
1132        let buffer_b = project_b
1133            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1134            .await
1135            .unwrap();
1136        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1137        buffer_b.read_with(cx_b, |buf, cx| {
1138            assert_eq!(buf.read(cx).text(), "b-contents")
1139        });
1140        project_a.read_with(cx_a, |project, cx| {
1141            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1142        });
1143        let buffer_a = project_a
1144            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1145            .await
1146            .unwrap();
1147
1148        let editor_b = cx_b.add_view(window_b, |cx| {
1149            Editor::for_buffer(
1150                buffer_b,
1151                None,
1152                watch::channel_with(Settings::test(cx)).1,
1153                cx,
1154            )
1155        });
1156
1157        // TODO
1158        // // Create a selection set as client B and see that selection set as client A.
1159        // buffer_a
1160        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1161        //     .await;
1162
1163        // Edit the buffer as client B and see that edit as client A.
1164        editor_b.update(cx_b, |editor, cx| {
1165            editor.handle_input(&Input("ok, ".into()), cx)
1166        });
1167        buffer_a
1168            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1169            .await;
1170
1171        // TODO
1172        // // Remove the selection set as client B, see those selections disappear as client A.
1173        cx_b.update(move |_| drop(editor_b));
1174        // buffer_a
1175        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1176        //     .await;
1177
1178        // Dropping the client B's project removes client B from client A's collaborators.
1179        cx_b.update(move |_| drop(project_b));
1180        project_a
1181            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1182            .await;
1183    }
1184
1185    #[gpui::test(iterations = 10)]
1186    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1187        let lang_registry = Arc::new(LanguageRegistry::test());
1188        let fs = FakeFs::new(cx_a.background());
1189        cx_a.foreground().forbid_parking();
1190
1191        // Connect to a server as 2 clients.
1192        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1193        let client_a = server.create_client(cx_a, "user_a").await;
1194        let client_b = server.create_client(cx_b, "user_b").await;
1195
1196        // Share a project as client A
1197        fs.insert_tree(
1198            "/a",
1199            json!({
1200                ".zed.toml": r#"collaborators = ["user_b"]"#,
1201                "a.txt": "a-contents",
1202                "b.txt": "b-contents",
1203            }),
1204        )
1205        .await;
1206        let project_a = cx_a.update(|cx| {
1207            Project::local(
1208                client_a.clone(),
1209                client_a.user_store.clone(),
1210                lang_registry.clone(),
1211                fs.clone(),
1212                cx,
1213            )
1214        });
1215        let (worktree_a, _) = project_a
1216            .update(cx_a, |p, cx| {
1217                p.find_or_create_local_worktree("/a", true, cx)
1218            })
1219            .await
1220            .unwrap();
1221        worktree_a
1222            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1223            .await;
1224        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1225        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1226        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1227        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1228
1229        // Join that project as client B
1230        let project_b = Project::remote(
1231            project_id,
1232            client_b.clone(),
1233            client_b.user_store.clone(),
1234            lang_registry.clone(),
1235            fs.clone(),
1236            &mut cx_b.to_async(),
1237        )
1238        .await
1239        .unwrap();
1240        project_b
1241            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1242            .await
1243            .unwrap();
1244
1245        // Unshare the project as client A
1246        project_a
1247            .update(cx_a, |project, cx| project.unshare(cx))
1248            .await
1249            .unwrap();
1250        project_b
1251            .condition(cx_b, |project, _| project.is_read_only())
1252            .await;
1253        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1254        cx_b.update(|_| {
1255            drop(project_b);
1256        });
1257
1258        // Share the project again and ensure guests can still join.
1259        project_a
1260            .update(cx_a, |project, cx| project.share(cx))
1261            .await
1262            .unwrap();
1263        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1264
1265        let project_b2 = Project::remote(
1266            project_id,
1267            client_b.clone(),
1268            client_b.user_store.clone(),
1269            lang_registry.clone(),
1270            fs.clone(),
1271            &mut cx_b.to_async(),
1272        )
1273        .await
1274        .unwrap();
1275        project_b2
1276            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1277            .await
1278            .unwrap();
1279    }
1280
1281    #[gpui::test(iterations = 10)]
1282    async fn test_propagate_saves_and_fs_changes(
1283        cx_a: &mut TestAppContext,
1284        cx_b: &mut TestAppContext,
1285        cx_c: &mut TestAppContext,
1286    ) {
1287        let lang_registry = Arc::new(LanguageRegistry::test());
1288        let fs = FakeFs::new(cx_a.background());
1289        cx_a.foreground().forbid_parking();
1290
1291        // Connect to a server as 3 clients.
1292        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1293        let client_a = server.create_client(cx_a, "user_a").await;
1294        let client_b = server.create_client(cx_b, "user_b").await;
1295        let client_c = server.create_client(cx_c, "user_c").await;
1296
1297        // Share a worktree as client A.
1298        fs.insert_tree(
1299            "/a",
1300            json!({
1301                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1302                "file1": "",
1303                "file2": ""
1304            }),
1305        )
1306        .await;
1307        let project_a = cx_a.update(|cx| {
1308            Project::local(
1309                client_a.clone(),
1310                client_a.user_store.clone(),
1311                lang_registry.clone(),
1312                fs.clone(),
1313                cx,
1314            )
1315        });
1316        let (worktree_a, _) = project_a
1317            .update(cx_a, |p, cx| {
1318                p.find_or_create_local_worktree("/a", true, cx)
1319            })
1320            .await
1321            .unwrap();
1322        worktree_a
1323            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1324            .await;
1325        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1326        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1327        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1328
1329        // Join that worktree as clients B and C.
1330        let project_b = Project::remote(
1331            project_id,
1332            client_b.clone(),
1333            client_b.user_store.clone(),
1334            lang_registry.clone(),
1335            fs.clone(),
1336            &mut cx_b.to_async(),
1337        )
1338        .await
1339        .unwrap();
1340        let project_c = Project::remote(
1341            project_id,
1342            client_c.clone(),
1343            client_c.user_store.clone(),
1344            lang_registry.clone(),
1345            fs.clone(),
1346            &mut cx_c.to_async(),
1347        )
1348        .await
1349        .unwrap();
1350        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1351        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1352
1353        // Open and edit a buffer as both guests B and C.
1354        let buffer_b = project_b
1355            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1356            .await
1357            .unwrap();
1358        let buffer_c = project_c
1359            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1360            .await
1361            .unwrap();
1362        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1363        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1364
1365        // Open and edit that buffer as the host.
1366        let buffer_a = project_a
1367            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1368            .await
1369            .unwrap();
1370
1371        buffer_a
1372            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1373            .await;
1374        buffer_a.update(cx_a, |buf, cx| {
1375            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1376        });
1377
1378        // Wait for edits to propagate
1379        buffer_a
1380            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1381            .await;
1382        buffer_b
1383            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1384            .await;
1385        buffer_c
1386            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1387            .await;
1388
1389        // Edit the buffer as the host and concurrently save as guest B.
1390        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1391        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1392        save_b.await.unwrap();
1393        assert_eq!(
1394            fs.load("/a/file1".as_ref()).await.unwrap(),
1395            "hi-a, i-am-c, i-am-b, i-am-a"
1396        );
1397        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1398        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1399        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1400
1401        worktree_a.flush_fs_events(cx_a).await;
1402
1403        // Make changes on host's file system, see those changes on guest worktrees.
1404        fs.rename(
1405            "/a/file1".as_ref(),
1406            "/a/file1-renamed".as_ref(),
1407            Default::default(),
1408        )
1409        .await
1410        .unwrap();
1411
1412        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1413            .await
1414            .unwrap();
1415        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1416
1417        worktree_a
1418            .condition(&cx_a, |tree, _| {
1419                tree.paths()
1420                    .map(|p| p.to_string_lossy())
1421                    .collect::<Vec<_>>()
1422                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1423            })
1424            .await;
1425        worktree_b
1426            .condition(&cx_b, |tree, _| {
1427                tree.paths()
1428                    .map(|p| p.to_string_lossy())
1429                    .collect::<Vec<_>>()
1430                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1431            })
1432            .await;
1433        worktree_c
1434            .condition(&cx_c, |tree, _| {
1435                tree.paths()
1436                    .map(|p| p.to_string_lossy())
1437                    .collect::<Vec<_>>()
1438                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1439            })
1440            .await;
1441
1442        // Ensure buffer files are updated as well.
1443        buffer_a
1444            .condition(&cx_a, |buf, _| {
1445                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1446            })
1447            .await;
1448        buffer_b
1449            .condition(&cx_b, |buf, _| {
1450                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1451            })
1452            .await;
1453        buffer_c
1454            .condition(&cx_c, |buf, _| {
1455                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1456            })
1457            .await;
1458    }
1459
1460    #[gpui::test(iterations = 10)]
1461    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1462        cx_a.foreground().forbid_parking();
1463        let lang_registry = Arc::new(LanguageRegistry::test());
1464        let fs = FakeFs::new(cx_a.background());
1465
1466        // Connect to a server as 2 clients.
1467        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1468        let client_a = server.create_client(cx_a, "user_a").await;
1469        let client_b = server.create_client(cx_b, "user_b").await;
1470
1471        // Share a project as client A
1472        fs.insert_tree(
1473            "/dir",
1474            json!({
1475                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1476                "a.txt": "a-contents",
1477            }),
1478        )
1479        .await;
1480
1481        let project_a = cx_a.update(|cx| {
1482            Project::local(
1483                client_a.clone(),
1484                client_a.user_store.clone(),
1485                lang_registry.clone(),
1486                fs.clone(),
1487                cx,
1488            )
1489        });
1490        let (worktree_a, _) = project_a
1491            .update(cx_a, |p, cx| {
1492                p.find_or_create_local_worktree("/dir", true, cx)
1493            })
1494            .await
1495            .unwrap();
1496        worktree_a
1497            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1498            .await;
1499        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1500        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1501        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1502
1503        // Join that project as client B
1504        let project_b = Project::remote(
1505            project_id,
1506            client_b.clone(),
1507            client_b.user_store.clone(),
1508            lang_registry.clone(),
1509            fs.clone(),
1510            &mut cx_b.to_async(),
1511        )
1512        .await
1513        .unwrap();
1514
1515        // Open a buffer as client B
1516        let buffer_b = project_b
1517            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1518            .await
1519            .unwrap();
1520
1521        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1522        buffer_b.read_with(cx_b, |buf, _| {
1523            assert!(buf.is_dirty());
1524            assert!(!buf.has_conflict());
1525        });
1526
1527        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1528        buffer_b
1529            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1530            .await;
1531        buffer_b.read_with(cx_b, |buf, _| {
1532            assert!(!buf.has_conflict());
1533        });
1534
1535        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1536        buffer_b.read_with(cx_b, |buf, _| {
1537            assert!(buf.is_dirty());
1538            assert!(!buf.has_conflict());
1539        });
1540    }
1541
1542    #[gpui::test(iterations = 10)]
1543    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1544        cx_a.foreground().forbid_parking();
1545        let lang_registry = Arc::new(LanguageRegistry::test());
1546        let fs = FakeFs::new(cx_a.background());
1547
1548        // Connect to a server as 2 clients.
1549        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1550        let client_a = server.create_client(cx_a, "user_a").await;
1551        let client_b = server.create_client(cx_b, "user_b").await;
1552
1553        // Share a project as client A
1554        fs.insert_tree(
1555            "/dir",
1556            json!({
1557                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1558                "a.txt": "a-contents",
1559            }),
1560        )
1561        .await;
1562
1563        let project_a = cx_a.update(|cx| {
1564            Project::local(
1565                client_a.clone(),
1566                client_a.user_store.clone(),
1567                lang_registry.clone(),
1568                fs.clone(),
1569                cx,
1570            )
1571        });
1572        let (worktree_a, _) = project_a
1573            .update(cx_a, |p, cx| {
1574                p.find_or_create_local_worktree("/dir", true, cx)
1575            })
1576            .await
1577            .unwrap();
1578        worktree_a
1579            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1580            .await;
1581        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1582        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1583        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1584
1585        // Join that project as client B
1586        let project_b = Project::remote(
1587            project_id,
1588            client_b.clone(),
1589            client_b.user_store.clone(),
1590            lang_registry.clone(),
1591            fs.clone(),
1592            &mut cx_b.to_async(),
1593        )
1594        .await
1595        .unwrap();
1596        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1597
1598        // Open a buffer as client B
1599        let buffer_b = project_b
1600            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1601            .await
1602            .unwrap();
1603        buffer_b.read_with(cx_b, |buf, _| {
1604            assert!(!buf.is_dirty());
1605            assert!(!buf.has_conflict());
1606        });
1607
1608        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1609            .await
1610            .unwrap();
1611        buffer_b
1612            .condition(&cx_b, |buf, _| {
1613                buf.text() == "new contents" && !buf.is_dirty()
1614            })
1615            .await;
1616        buffer_b.read_with(cx_b, |buf, _| {
1617            assert!(!buf.has_conflict());
1618        });
1619    }
1620
1621    #[gpui::test(iterations = 10)]
1622    async fn test_editing_while_guest_opens_buffer(
1623        cx_a: &mut TestAppContext,
1624        cx_b: &mut TestAppContext,
1625    ) {
1626        cx_a.foreground().forbid_parking();
1627        let lang_registry = Arc::new(LanguageRegistry::test());
1628        let fs = FakeFs::new(cx_a.background());
1629
1630        // Connect to a server as 2 clients.
1631        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1632        let client_a = server.create_client(cx_a, "user_a").await;
1633        let client_b = server.create_client(cx_b, "user_b").await;
1634
1635        // Share a project as client A
1636        fs.insert_tree(
1637            "/dir",
1638            json!({
1639                ".zed.toml": r#"collaborators = ["user_b"]"#,
1640                "a.txt": "a-contents",
1641            }),
1642        )
1643        .await;
1644        let project_a = cx_a.update(|cx| {
1645            Project::local(
1646                client_a.clone(),
1647                client_a.user_store.clone(),
1648                lang_registry.clone(),
1649                fs.clone(),
1650                cx,
1651            )
1652        });
1653        let (worktree_a, _) = project_a
1654            .update(cx_a, |p, cx| {
1655                p.find_or_create_local_worktree("/dir", true, cx)
1656            })
1657            .await
1658            .unwrap();
1659        worktree_a
1660            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1661            .await;
1662        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1663        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1664        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1665
1666        // Join that project as client B
1667        let project_b = Project::remote(
1668            project_id,
1669            client_b.clone(),
1670            client_b.user_store.clone(),
1671            lang_registry.clone(),
1672            fs.clone(),
1673            &mut cx_b.to_async(),
1674        )
1675        .await
1676        .unwrap();
1677
1678        // Open a buffer as client A
1679        let buffer_a = project_a
1680            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1681            .await
1682            .unwrap();
1683
1684        // Start opening the same buffer as client B
1685        let buffer_b = cx_b
1686            .background()
1687            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1688
1689        // Edit the buffer as client A while client B is still opening it.
1690        cx_b.background().simulate_random_delay().await;
1691        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1692        cx_b.background().simulate_random_delay().await;
1693        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1694
1695        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1696        let buffer_b = buffer_b.await.unwrap();
1697        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1698    }
1699
1700    #[gpui::test(iterations = 10)]
1701    async fn test_leaving_worktree_while_opening_buffer(
1702        cx_a: &mut TestAppContext,
1703        cx_b: &mut TestAppContext,
1704    ) {
1705        cx_a.foreground().forbid_parking();
1706        let lang_registry = Arc::new(LanguageRegistry::test());
1707        let fs = FakeFs::new(cx_a.background());
1708
1709        // Connect to a server as 2 clients.
1710        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1711        let client_a = server.create_client(cx_a, "user_a").await;
1712        let client_b = server.create_client(cx_b, "user_b").await;
1713
1714        // Share a project as client A
1715        fs.insert_tree(
1716            "/dir",
1717            json!({
1718                ".zed.toml": r#"collaborators = ["user_b"]"#,
1719                "a.txt": "a-contents",
1720            }),
1721        )
1722        .await;
1723        let project_a = cx_a.update(|cx| {
1724            Project::local(
1725                client_a.clone(),
1726                client_a.user_store.clone(),
1727                lang_registry.clone(),
1728                fs.clone(),
1729                cx,
1730            )
1731        });
1732        let (worktree_a, _) = project_a
1733            .update(cx_a, |p, cx| {
1734                p.find_or_create_local_worktree("/dir", true, cx)
1735            })
1736            .await
1737            .unwrap();
1738        worktree_a
1739            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1740            .await;
1741        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1742        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1743        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1744
1745        // Join that project as client B
1746        let project_b = Project::remote(
1747            project_id,
1748            client_b.clone(),
1749            client_b.user_store.clone(),
1750            lang_registry.clone(),
1751            fs.clone(),
1752            &mut cx_b.to_async(),
1753        )
1754        .await
1755        .unwrap();
1756
1757        // See that a guest has joined as client A.
1758        project_a
1759            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1760            .await;
1761
1762        // Begin opening a buffer as client B, but leave the project before the open completes.
1763        let buffer_b = cx_b
1764            .background()
1765            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1766        cx_b.update(|_| drop(project_b));
1767        drop(buffer_b);
1768
1769        // See that the guest has left.
1770        project_a
1771            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1772            .await;
1773    }
1774
1775    #[gpui::test(iterations = 10)]
1776    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1777        cx_a.foreground().forbid_parking();
1778        let lang_registry = Arc::new(LanguageRegistry::test());
1779        let fs = FakeFs::new(cx_a.background());
1780
1781        // Connect to a server as 2 clients.
1782        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1783        let client_a = server.create_client(cx_a, "user_a").await;
1784        let client_b = server.create_client(cx_b, "user_b").await;
1785
1786        // Share a project as client A
1787        fs.insert_tree(
1788            "/a",
1789            json!({
1790                ".zed.toml": r#"collaborators = ["user_b"]"#,
1791                "a.txt": "a-contents",
1792                "b.txt": "b-contents",
1793            }),
1794        )
1795        .await;
1796        let project_a = cx_a.update(|cx| {
1797            Project::local(
1798                client_a.clone(),
1799                client_a.user_store.clone(),
1800                lang_registry.clone(),
1801                fs.clone(),
1802                cx,
1803            )
1804        });
1805        let (worktree_a, _) = project_a
1806            .update(cx_a, |p, cx| {
1807                p.find_or_create_local_worktree("/a", true, cx)
1808            })
1809            .await
1810            .unwrap();
1811        worktree_a
1812            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1813            .await;
1814        let project_id = project_a
1815            .update(cx_a, |project, _| project.next_remote_id())
1816            .await;
1817        project_a
1818            .update(cx_a, |project, cx| project.share(cx))
1819            .await
1820            .unwrap();
1821
1822        // Join that project as client B
1823        let _project_b = Project::remote(
1824            project_id,
1825            client_b.clone(),
1826            client_b.user_store.clone(),
1827            lang_registry.clone(),
1828            fs.clone(),
1829            &mut cx_b.to_async(),
1830        )
1831        .await
1832        .unwrap();
1833
1834        // Client A sees that a guest has joined.
1835        project_a
1836            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1837            .await;
1838
1839        // Drop client B's connection and ensure client A observes client B leaving the project.
1840        client_b.disconnect(&cx_b.to_async()).unwrap();
1841        project_a
1842            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1843            .await;
1844
1845        // Rejoin the project as client B
1846        let _project_b = Project::remote(
1847            project_id,
1848            client_b.clone(),
1849            client_b.user_store.clone(),
1850            lang_registry.clone(),
1851            fs.clone(),
1852            &mut cx_b.to_async(),
1853        )
1854        .await
1855        .unwrap();
1856
1857        // Client A sees that a guest has re-joined.
1858        project_a
1859            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1860            .await;
1861
1862        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1863        client_b.wait_for_current_user(cx_b).await;
1864        server.disconnect_client(client_b.current_user_id(cx_b));
1865        cx_a.foreground().advance_clock(Duration::from_secs(3));
1866        project_a
1867            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1868            .await;
1869    }
1870
1871    #[gpui::test(iterations = 10)]
1872    async fn test_collaborating_with_diagnostics(
1873        cx_a: &mut TestAppContext,
1874        cx_b: &mut TestAppContext,
1875    ) {
1876        cx_a.foreground().forbid_parking();
1877        let mut lang_registry = Arc::new(LanguageRegistry::test());
1878        let fs = FakeFs::new(cx_a.background());
1879
1880        // Set up a fake language server.
1881        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1882        Arc::get_mut(&mut lang_registry)
1883            .unwrap()
1884            .add(Arc::new(Language::new(
1885                LanguageConfig {
1886                    name: "Rust".into(),
1887                    path_suffixes: vec!["rs".to_string()],
1888                    language_server: Some(language_server_config),
1889                    ..Default::default()
1890                },
1891                Some(tree_sitter_rust::language()),
1892            )));
1893
1894        // Connect to a server as 2 clients.
1895        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1896        let client_a = server.create_client(cx_a, "user_a").await;
1897        let client_b = server.create_client(cx_b, "user_b").await;
1898
1899        // Share a project as client A
1900        fs.insert_tree(
1901            "/a",
1902            json!({
1903                ".zed.toml": r#"collaborators = ["user_b"]"#,
1904                "a.rs": "let one = two",
1905                "other.rs": "",
1906            }),
1907        )
1908        .await;
1909        let project_a = cx_a.update(|cx| {
1910            Project::local(
1911                client_a.clone(),
1912                client_a.user_store.clone(),
1913                lang_registry.clone(),
1914                fs.clone(),
1915                cx,
1916            )
1917        });
1918        let (worktree_a, _) = project_a
1919            .update(cx_a, |p, cx| {
1920                p.find_or_create_local_worktree("/a", true, cx)
1921            })
1922            .await
1923            .unwrap();
1924        worktree_a
1925            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1926            .await;
1927        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1928        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1929        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1930
1931        // Cause the language server to start.
1932        let _ = cx_a
1933            .background()
1934            .spawn(project_a.update(cx_a, |project, cx| {
1935                project.open_buffer(
1936                    ProjectPath {
1937                        worktree_id,
1938                        path: Path::new("other.rs").into(),
1939                    },
1940                    cx,
1941                )
1942            }))
1943            .await
1944            .unwrap();
1945
1946        // Simulate a language server reporting errors for a file.
1947        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1948        fake_language_server
1949            .receive_notification::<lsp::notification::DidOpenTextDocument>()
1950            .await;
1951        fake_language_server
1952            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1953                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1954                version: None,
1955                diagnostics: vec![lsp::Diagnostic {
1956                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1957                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1958                    message: "message 1".to_string(),
1959                    ..Default::default()
1960                }],
1961            })
1962            .await;
1963
1964        // Wait for server to see the diagnostics update.
1965        server
1966            .condition(|store| {
1967                let worktree = store
1968                    .project(project_id)
1969                    .unwrap()
1970                    .share
1971                    .as_ref()
1972                    .unwrap()
1973                    .worktrees
1974                    .get(&worktree_id.to_proto())
1975                    .unwrap();
1976
1977                !worktree.diagnostic_summaries.is_empty()
1978            })
1979            .await;
1980
1981        // Join the worktree as client B.
1982        let project_b = Project::remote(
1983            project_id,
1984            client_b.clone(),
1985            client_b.user_store.clone(),
1986            lang_registry.clone(),
1987            fs.clone(),
1988            &mut cx_b.to_async(),
1989        )
1990        .await
1991        .unwrap();
1992
1993        project_b.read_with(cx_b, |project, cx| {
1994            assert_eq!(
1995                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1996                &[(
1997                    ProjectPath {
1998                        worktree_id,
1999                        path: Arc::from(Path::new("a.rs")),
2000                    },
2001                    DiagnosticSummary {
2002                        error_count: 1,
2003                        warning_count: 0,
2004                        ..Default::default()
2005                    },
2006                )]
2007            )
2008        });
2009
2010        // Simulate a language server reporting more errors for a file.
2011        fake_language_server
2012            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2013                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2014                version: None,
2015                diagnostics: vec![
2016                    lsp::Diagnostic {
2017                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2018                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2019                        message: "message 1".to_string(),
2020                        ..Default::default()
2021                    },
2022                    lsp::Diagnostic {
2023                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2024                        range: lsp::Range::new(
2025                            lsp::Position::new(0, 10),
2026                            lsp::Position::new(0, 13),
2027                        ),
2028                        message: "message 2".to_string(),
2029                        ..Default::default()
2030                    },
2031                ],
2032            })
2033            .await;
2034
2035        // Client b gets the updated summaries
2036        project_b
2037            .condition(&cx_b, |project, cx| {
2038                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2039                    == &[(
2040                        ProjectPath {
2041                            worktree_id,
2042                            path: Arc::from(Path::new("a.rs")),
2043                        },
2044                        DiagnosticSummary {
2045                            error_count: 1,
2046                            warning_count: 1,
2047                            ..Default::default()
2048                        },
2049                    )]
2050            })
2051            .await;
2052
2053        // Open the file with the errors on client B. They should be present.
2054        let buffer_b = cx_b
2055            .background()
2056            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2057            .await
2058            .unwrap();
2059
2060        buffer_b.read_with(cx_b, |buffer, _| {
2061            assert_eq!(
2062                buffer
2063                    .snapshot()
2064                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2065                    .map(|entry| entry)
2066                    .collect::<Vec<_>>(),
2067                &[
2068                    DiagnosticEntry {
2069                        range: Point::new(0, 4)..Point::new(0, 7),
2070                        diagnostic: Diagnostic {
2071                            group_id: 0,
2072                            message: "message 1".to_string(),
2073                            severity: lsp::DiagnosticSeverity::ERROR,
2074                            is_primary: true,
2075                            ..Default::default()
2076                        }
2077                    },
2078                    DiagnosticEntry {
2079                        range: Point::new(0, 10)..Point::new(0, 13),
2080                        diagnostic: Diagnostic {
2081                            group_id: 1,
2082                            severity: lsp::DiagnosticSeverity::WARNING,
2083                            message: "message 2".to_string(),
2084                            is_primary: true,
2085                            ..Default::default()
2086                        }
2087                    }
2088                ]
2089            );
2090        });
2091    }
2092
2093    #[gpui::test(iterations = 10)]
2094    async fn test_collaborating_with_completion(
2095        cx_a: &mut TestAppContext,
2096        cx_b: &mut TestAppContext,
2097    ) {
2098        cx_a.foreground().forbid_parking();
2099        let mut lang_registry = Arc::new(LanguageRegistry::test());
2100        let fs = FakeFs::new(cx_a.background());
2101
2102        // Set up a fake language server.
2103        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2104        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2105            completion_provider: Some(lsp::CompletionOptions {
2106                trigger_characters: Some(vec![".".to_string()]),
2107                ..Default::default()
2108            }),
2109            ..Default::default()
2110        });
2111        Arc::get_mut(&mut lang_registry)
2112            .unwrap()
2113            .add(Arc::new(Language::new(
2114                LanguageConfig {
2115                    name: "Rust".into(),
2116                    path_suffixes: vec!["rs".to_string()],
2117                    language_server: Some(language_server_config),
2118                    ..Default::default()
2119                },
2120                Some(tree_sitter_rust::language()),
2121            )));
2122
2123        // Connect to a server as 2 clients.
2124        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2125        let client_a = server.create_client(cx_a, "user_a").await;
2126        let client_b = server.create_client(cx_b, "user_b").await;
2127
2128        // Share a project as client A
2129        fs.insert_tree(
2130            "/a",
2131            json!({
2132                ".zed.toml": r#"collaborators = ["user_b"]"#,
2133                "main.rs": "fn main() { a }",
2134                "other.rs": "",
2135            }),
2136        )
2137        .await;
2138        let project_a = cx_a.update(|cx| {
2139            Project::local(
2140                client_a.clone(),
2141                client_a.user_store.clone(),
2142                lang_registry.clone(),
2143                fs.clone(),
2144                cx,
2145            )
2146        });
2147        let (worktree_a, _) = project_a
2148            .update(cx_a, |p, cx| {
2149                p.find_or_create_local_worktree("/a", true, cx)
2150            })
2151            .await
2152            .unwrap();
2153        worktree_a
2154            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2155            .await;
2156        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2157        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2158        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2159
2160        // Join the worktree as client B.
2161        let project_b = Project::remote(
2162            project_id,
2163            client_b.clone(),
2164            client_b.user_store.clone(),
2165            lang_registry.clone(),
2166            fs.clone(),
2167            &mut cx_b.to_async(),
2168        )
2169        .await
2170        .unwrap();
2171
2172        // Open a file in an editor as the guest.
2173        let buffer_b = project_b
2174            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2175            .await
2176            .unwrap();
2177        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2178        let editor_b = cx_b.add_view(window_b, |cx| {
2179            Editor::for_buffer(
2180                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2181                Some(project_b.clone()),
2182                watch::channel_with(Settings::test(cx)).1,
2183                cx,
2184            )
2185        });
2186
2187        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2188        buffer_b
2189            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2190            .await;
2191
2192        // Type a completion trigger character as the guest.
2193        editor_b.update(cx_b, |editor, cx| {
2194            editor.select_ranges([13..13], None, cx);
2195            editor.handle_input(&Input(".".into()), cx);
2196            cx.focus(&editor_b);
2197        });
2198
2199        // Receive a completion request as the host's language server.
2200        // Return some completions from the host's language server.
2201        cx_a.foreground().start_waiting();
2202        fake_language_server
2203            .handle_request::<lsp::request::Completion, _>(|params, _| {
2204                assert_eq!(
2205                    params.text_document_position.text_document.uri,
2206                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2207                );
2208                assert_eq!(
2209                    params.text_document_position.position,
2210                    lsp::Position::new(0, 14),
2211                );
2212
2213                Some(lsp::CompletionResponse::Array(vec![
2214                    lsp::CompletionItem {
2215                        label: "first_method(…)".into(),
2216                        detail: Some("fn(&mut self, B) -> C".into()),
2217                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2218                            new_text: "first_method($1)".to_string(),
2219                            range: lsp::Range::new(
2220                                lsp::Position::new(0, 14),
2221                                lsp::Position::new(0, 14),
2222                            ),
2223                        })),
2224                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2225                        ..Default::default()
2226                    },
2227                    lsp::CompletionItem {
2228                        label: "second_method(…)".into(),
2229                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2230                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2231                            new_text: "second_method()".to_string(),
2232                            range: lsp::Range::new(
2233                                lsp::Position::new(0, 14),
2234                                lsp::Position::new(0, 14),
2235                            ),
2236                        })),
2237                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2238                        ..Default::default()
2239                    },
2240                ]))
2241            })
2242            .next()
2243            .await
2244            .unwrap();
2245        cx_a.foreground().finish_waiting();
2246
2247        // Open the buffer on the host.
2248        let buffer_a = project_a
2249            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2250            .await
2251            .unwrap();
2252        buffer_a
2253            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2254            .await;
2255
2256        // Confirm a completion on the guest.
2257        editor_b
2258            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2259            .await;
2260        editor_b.update(cx_b, |editor, cx| {
2261            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2262            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2263        });
2264
2265        // Return a resolved completion from the host's language server.
2266        // The resolved completion has an additional text edit.
2267        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2268            |params, _| {
2269                assert_eq!(params.label, "first_method(…)");
2270                lsp::CompletionItem {
2271                    label: "first_method(…)".into(),
2272                    detail: Some("fn(&mut self, B) -> C".into()),
2273                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2274                        new_text: "first_method($1)".to_string(),
2275                        range: lsp::Range::new(
2276                            lsp::Position::new(0, 14),
2277                            lsp::Position::new(0, 14),
2278                        ),
2279                    })),
2280                    additional_text_edits: Some(vec![lsp::TextEdit {
2281                        new_text: "use d::SomeTrait;\n".to_string(),
2282                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2283                    }]),
2284                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2285                    ..Default::default()
2286                }
2287            },
2288        );
2289
2290        // The additional edit is applied.
2291        buffer_a
2292            .condition(&cx_a, |buffer, _| {
2293                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2294            })
2295            .await;
2296        buffer_b
2297            .condition(&cx_b, |buffer, _| {
2298                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2299            })
2300            .await;
2301    }
2302
2303    #[gpui::test(iterations = 10)]
2304    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2305        cx_a.foreground().forbid_parking();
2306        let mut lang_registry = Arc::new(LanguageRegistry::test());
2307        let fs = FakeFs::new(cx_a.background());
2308
2309        // Set up a fake language server.
2310        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2311        Arc::get_mut(&mut lang_registry)
2312            .unwrap()
2313            .add(Arc::new(Language::new(
2314                LanguageConfig {
2315                    name: "Rust".into(),
2316                    path_suffixes: vec!["rs".to_string()],
2317                    language_server: Some(language_server_config),
2318                    ..Default::default()
2319                },
2320                Some(tree_sitter_rust::language()),
2321            )));
2322
2323        // Connect to a server as 2 clients.
2324        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2325        let client_a = server.create_client(cx_a, "user_a").await;
2326        let client_b = server.create_client(cx_b, "user_b").await;
2327
2328        // Share a project as client A
2329        fs.insert_tree(
2330            "/a",
2331            json!({
2332                ".zed.toml": r#"collaborators = ["user_b"]"#,
2333                "a.rs": "let one = two",
2334            }),
2335        )
2336        .await;
2337        let project_a = cx_a.update(|cx| {
2338            Project::local(
2339                client_a.clone(),
2340                client_a.user_store.clone(),
2341                lang_registry.clone(),
2342                fs.clone(),
2343                cx,
2344            )
2345        });
2346        let (worktree_a, _) = project_a
2347            .update(cx_a, |p, cx| {
2348                p.find_or_create_local_worktree("/a", true, cx)
2349            })
2350            .await
2351            .unwrap();
2352        worktree_a
2353            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2354            .await;
2355        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2356        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2357        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2358
2359        // Join the worktree as client B.
2360        let project_b = Project::remote(
2361            project_id,
2362            client_b.clone(),
2363            client_b.user_store.clone(),
2364            lang_registry.clone(),
2365            fs.clone(),
2366            &mut cx_b.to_async(),
2367        )
2368        .await
2369        .unwrap();
2370
2371        let buffer_b = cx_b
2372            .background()
2373            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2374            .await
2375            .unwrap();
2376
2377        let format = project_b.update(cx_b, |project, cx| {
2378            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2379        });
2380
2381        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2382        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2383            Some(vec![
2384                lsp::TextEdit {
2385                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2386                    new_text: "h".to_string(),
2387                },
2388                lsp::TextEdit {
2389                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2390                    new_text: "y".to_string(),
2391                },
2392            ])
2393        });
2394
2395        format.await.unwrap();
2396        assert_eq!(
2397            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2398            "let honey = two"
2399        );
2400    }
2401
2402    #[gpui::test(iterations = 10)]
2403    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2404        cx_a.foreground().forbid_parking();
2405        let mut lang_registry = Arc::new(LanguageRegistry::test());
2406        let fs = FakeFs::new(cx_a.background());
2407        fs.insert_tree(
2408            "/root-1",
2409            json!({
2410                ".zed.toml": r#"collaborators = ["user_b"]"#,
2411                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2412            }),
2413        )
2414        .await;
2415        fs.insert_tree(
2416            "/root-2",
2417            json!({
2418                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2419            }),
2420        )
2421        .await;
2422
2423        // Set up a fake language server.
2424        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2425        Arc::get_mut(&mut lang_registry)
2426            .unwrap()
2427            .add(Arc::new(Language::new(
2428                LanguageConfig {
2429                    name: "Rust".into(),
2430                    path_suffixes: vec!["rs".to_string()],
2431                    language_server: Some(language_server_config),
2432                    ..Default::default()
2433                },
2434                Some(tree_sitter_rust::language()),
2435            )));
2436
2437        // Connect to a server as 2 clients.
2438        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2439        let client_a = server.create_client(cx_a, "user_a").await;
2440        let client_b = server.create_client(cx_b, "user_b").await;
2441
2442        // Share a project as client A
2443        let project_a = cx_a.update(|cx| {
2444            Project::local(
2445                client_a.clone(),
2446                client_a.user_store.clone(),
2447                lang_registry.clone(),
2448                fs.clone(),
2449                cx,
2450            )
2451        });
2452        let (worktree_a, _) = project_a
2453            .update(cx_a, |p, cx| {
2454                p.find_or_create_local_worktree("/root-1", true, cx)
2455            })
2456            .await
2457            .unwrap();
2458        worktree_a
2459            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2460            .await;
2461        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2462        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2463        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2464
2465        // Join the worktree as client B.
2466        let project_b = Project::remote(
2467            project_id,
2468            client_b.clone(),
2469            client_b.user_store.clone(),
2470            lang_registry.clone(),
2471            fs.clone(),
2472            &mut cx_b.to_async(),
2473        )
2474        .await
2475        .unwrap();
2476
2477        // Open the file on client B.
2478        let buffer_b = cx_b
2479            .background()
2480            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2481            .await
2482            .unwrap();
2483
2484        // Request the definition of a symbol as the guest.
2485        let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2486
2487        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2488        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2489            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2490                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2491                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2492            )))
2493        });
2494
2495        let definitions_1 = definitions_1.await.unwrap();
2496        cx_b.read(|cx| {
2497            assert_eq!(definitions_1.len(), 1);
2498            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2499            let target_buffer = definitions_1[0].buffer.read(cx);
2500            assert_eq!(
2501                target_buffer.text(),
2502                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2503            );
2504            assert_eq!(
2505                definitions_1[0].range.to_point(target_buffer),
2506                Point::new(0, 6)..Point::new(0, 9)
2507            );
2508        });
2509
2510        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2511        // the previous call to `definition`.
2512        let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2513        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2514            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2515                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2516                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2517            )))
2518        });
2519
2520        let definitions_2 = definitions_2.await.unwrap();
2521        cx_b.read(|cx| {
2522            assert_eq!(definitions_2.len(), 1);
2523            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2524            let target_buffer = definitions_2[0].buffer.read(cx);
2525            assert_eq!(
2526                target_buffer.text(),
2527                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2528            );
2529            assert_eq!(
2530                definitions_2[0].range.to_point(target_buffer),
2531                Point::new(1, 6)..Point::new(1, 11)
2532            );
2533        });
2534        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2535    }
2536
2537    #[gpui::test(iterations = 10)]
2538    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2539        cx_a.foreground().forbid_parking();
2540        let mut lang_registry = Arc::new(LanguageRegistry::test());
2541        let fs = FakeFs::new(cx_a.background());
2542        fs.insert_tree(
2543            "/root-1",
2544            json!({
2545                ".zed.toml": r#"collaborators = ["user_b"]"#,
2546                "one.rs": "const ONE: usize = 1;",
2547                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2548            }),
2549        )
2550        .await;
2551        fs.insert_tree(
2552            "/root-2",
2553            json!({
2554                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2555            }),
2556        )
2557        .await;
2558
2559        // Set up a fake language server.
2560        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2561        Arc::get_mut(&mut lang_registry)
2562            .unwrap()
2563            .add(Arc::new(Language::new(
2564                LanguageConfig {
2565                    name: "Rust".into(),
2566                    path_suffixes: vec!["rs".to_string()],
2567                    language_server: Some(language_server_config),
2568                    ..Default::default()
2569                },
2570                Some(tree_sitter_rust::language()),
2571            )));
2572
2573        // Connect to a server as 2 clients.
2574        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2575        let client_a = server.create_client(cx_a, "user_a").await;
2576        let client_b = server.create_client(cx_b, "user_b").await;
2577
2578        // Share a project as client A
2579        let project_a = cx_a.update(|cx| {
2580            Project::local(
2581                client_a.clone(),
2582                client_a.user_store.clone(),
2583                lang_registry.clone(),
2584                fs.clone(),
2585                cx,
2586            )
2587        });
2588        let (worktree_a, _) = project_a
2589            .update(cx_a, |p, cx| {
2590                p.find_or_create_local_worktree("/root-1", true, cx)
2591            })
2592            .await
2593            .unwrap();
2594        worktree_a
2595            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2596            .await;
2597        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2598        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2599        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2600
2601        // Join the worktree as client B.
2602        let project_b = Project::remote(
2603            project_id,
2604            client_b.clone(),
2605            client_b.user_store.clone(),
2606            lang_registry.clone(),
2607            fs.clone(),
2608            &mut cx_b.to_async(),
2609        )
2610        .await
2611        .unwrap();
2612
2613        // Open the file on client B.
2614        let buffer_b = cx_b
2615            .background()
2616            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2617            .await
2618            .unwrap();
2619
2620        // Request references to a symbol as the guest.
2621        let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2622
2623        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2624        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2625            assert_eq!(
2626                params.text_document_position.text_document.uri.as_str(),
2627                "file:///root-1/one.rs"
2628            );
2629            Some(vec![
2630                lsp::Location {
2631                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2632                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2633                },
2634                lsp::Location {
2635                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2636                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2637                },
2638                lsp::Location {
2639                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2640                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2641                },
2642            ])
2643        });
2644
2645        let references = references.await.unwrap();
2646        cx_b.read(|cx| {
2647            assert_eq!(references.len(), 3);
2648            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2649
2650            let two_buffer = references[0].buffer.read(cx);
2651            let three_buffer = references[2].buffer.read(cx);
2652            assert_eq!(
2653                two_buffer.file().unwrap().path().as_ref(),
2654                Path::new("two.rs")
2655            );
2656            assert_eq!(references[1].buffer, references[0].buffer);
2657            assert_eq!(
2658                three_buffer.file().unwrap().full_path(cx),
2659                Path::new("three.rs")
2660            );
2661
2662            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2663            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2664            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2665        });
2666    }
2667
2668    #[gpui::test(iterations = 10)]
2669    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2670        cx_a.foreground().forbid_parking();
2671        let lang_registry = Arc::new(LanguageRegistry::test());
2672        let fs = FakeFs::new(cx_a.background());
2673        fs.insert_tree(
2674            "/root-1",
2675            json!({
2676                ".zed.toml": r#"collaborators = ["user_b"]"#,
2677                "a": "hello world",
2678                "b": "goodnight moon",
2679                "c": "a world of goo",
2680                "d": "world champion of clown world",
2681            }),
2682        )
2683        .await;
2684        fs.insert_tree(
2685            "/root-2",
2686            json!({
2687                "e": "disney world is fun",
2688            }),
2689        )
2690        .await;
2691
2692        // Connect to a server as 2 clients.
2693        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2694        let client_a = server.create_client(cx_a, "user_a").await;
2695        let client_b = server.create_client(cx_b, "user_b").await;
2696
2697        // Share a project as client A
2698        let project_a = cx_a.update(|cx| {
2699            Project::local(
2700                client_a.clone(),
2701                client_a.user_store.clone(),
2702                lang_registry.clone(),
2703                fs.clone(),
2704                cx,
2705            )
2706        });
2707        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2708
2709        let (worktree_1, _) = project_a
2710            .update(cx_a, |p, cx| {
2711                p.find_or_create_local_worktree("/root-1", true, cx)
2712            })
2713            .await
2714            .unwrap();
2715        worktree_1
2716            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2717            .await;
2718        let (worktree_2, _) = project_a
2719            .update(cx_a, |p, cx| {
2720                p.find_or_create_local_worktree("/root-2", true, cx)
2721            })
2722            .await
2723            .unwrap();
2724        worktree_2
2725            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2726            .await;
2727
2728        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2729
2730        // Join the worktree as client B.
2731        let project_b = Project::remote(
2732            project_id,
2733            client_b.clone(),
2734            client_b.user_store.clone(),
2735            lang_registry.clone(),
2736            fs.clone(),
2737            &mut cx_b.to_async(),
2738        )
2739        .await
2740        .unwrap();
2741
2742        let results = project_b
2743            .update(cx_b, |project, cx| {
2744                project.search(SearchQuery::text("world", false, false), cx)
2745            })
2746            .await
2747            .unwrap();
2748
2749        let mut ranges_by_path = results
2750            .into_iter()
2751            .map(|(buffer, ranges)| {
2752                buffer.read_with(cx_b, |buffer, cx| {
2753                    let path = buffer.file().unwrap().full_path(cx);
2754                    let offset_ranges = ranges
2755                        .into_iter()
2756                        .map(|range| range.to_offset(buffer))
2757                        .collect::<Vec<_>>();
2758                    (path, offset_ranges)
2759                })
2760            })
2761            .collect::<Vec<_>>();
2762        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2763
2764        assert_eq!(
2765            ranges_by_path,
2766            &[
2767                (PathBuf::from("root-1/a"), vec![6..11]),
2768                (PathBuf::from("root-1/c"), vec![2..7]),
2769                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2770                (PathBuf::from("root-2/e"), vec![7..12]),
2771            ]
2772        );
2773    }
2774
2775    #[gpui::test(iterations = 10)]
2776    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2777        cx_a.foreground().forbid_parking();
2778        let lang_registry = Arc::new(LanguageRegistry::test());
2779        let fs = FakeFs::new(cx_a.background());
2780        fs.insert_tree(
2781            "/root-1",
2782            json!({
2783                ".zed.toml": r#"collaborators = ["user_b"]"#,
2784                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2785            }),
2786        )
2787        .await;
2788
2789        // Set up a fake language server.
2790        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2791        lang_registry.add(Arc::new(Language::new(
2792            LanguageConfig {
2793                name: "Rust".into(),
2794                path_suffixes: vec!["rs".to_string()],
2795                language_server: Some(language_server_config),
2796                ..Default::default()
2797            },
2798            Some(tree_sitter_rust::language()),
2799        )));
2800
2801        // Connect to a server as 2 clients.
2802        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2803        let client_a = server.create_client(cx_a, "user_a").await;
2804        let client_b = server.create_client(cx_b, "user_b").await;
2805
2806        // Share a project as client A
2807        let project_a = cx_a.update(|cx| {
2808            Project::local(
2809                client_a.clone(),
2810                client_a.user_store.clone(),
2811                lang_registry.clone(),
2812                fs.clone(),
2813                cx,
2814            )
2815        });
2816        let (worktree_a, _) = project_a
2817            .update(cx_a, |p, cx| {
2818                p.find_or_create_local_worktree("/root-1", true, cx)
2819            })
2820            .await
2821            .unwrap();
2822        worktree_a
2823            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2824            .await;
2825        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2826        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2827        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2828
2829        // Join the worktree as client B.
2830        let project_b = Project::remote(
2831            project_id,
2832            client_b.clone(),
2833            client_b.user_store.clone(),
2834            lang_registry.clone(),
2835            fs.clone(),
2836            &mut cx_b.to_async(),
2837        )
2838        .await
2839        .unwrap();
2840
2841        // Open the file on client B.
2842        let buffer_b = cx_b
2843            .background()
2844            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2845            .await
2846            .unwrap();
2847
2848        // Request document highlights as the guest.
2849        let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2850
2851        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2852        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2853            |params, _| {
2854                assert_eq!(
2855                    params
2856                        .text_document_position_params
2857                        .text_document
2858                        .uri
2859                        .as_str(),
2860                    "file:///root-1/main.rs"
2861                );
2862                assert_eq!(
2863                    params.text_document_position_params.position,
2864                    lsp::Position::new(0, 34)
2865                );
2866                Some(vec![
2867                    lsp::DocumentHighlight {
2868                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2869                        range: lsp::Range::new(
2870                            lsp::Position::new(0, 10),
2871                            lsp::Position::new(0, 16),
2872                        ),
2873                    },
2874                    lsp::DocumentHighlight {
2875                        kind: Some(lsp::DocumentHighlightKind::READ),
2876                        range: lsp::Range::new(
2877                            lsp::Position::new(0, 32),
2878                            lsp::Position::new(0, 38),
2879                        ),
2880                    },
2881                    lsp::DocumentHighlight {
2882                        kind: Some(lsp::DocumentHighlightKind::READ),
2883                        range: lsp::Range::new(
2884                            lsp::Position::new(0, 41),
2885                            lsp::Position::new(0, 47),
2886                        ),
2887                    },
2888                ])
2889            },
2890        );
2891
2892        let highlights = highlights.await.unwrap();
2893        buffer_b.read_with(cx_b, |buffer, _| {
2894            let snapshot = buffer.snapshot();
2895
2896            let highlights = highlights
2897                .into_iter()
2898                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2899                .collect::<Vec<_>>();
2900            assert_eq!(
2901                highlights,
2902                &[
2903                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2904                    (lsp::DocumentHighlightKind::READ, 32..38),
2905                    (lsp::DocumentHighlightKind::READ, 41..47)
2906                ]
2907            )
2908        });
2909    }
2910
2911    #[gpui::test(iterations = 10)]
2912    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2913        cx_a.foreground().forbid_parking();
2914        let mut lang_registry = Arc::new(LanguageRegistry::test());
2915        let fs = FakeFs::new(cx_a.background());
2916        fs.insert_tree(
2917            "/code",
2918            json!({
2919                "crate-1": {
2920                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2921                    "one.rs": "const ONE: usize = 1;",
2922                },
2923                "crate-2": {
2924                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2925                },
2926                "private": {
2927                    "passwords.txt": "the-password",
2928                }
2929            }),
2930        )
2931        .await;
2932
2933        // Set up a fake language server.
2934        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2935        Arc::get_mut(&mut lang_registry)
2936            .unwrap()
2937            .add(Arc::new(Language::new(
2938                LanguageConfig {
2939                    name: "Rust".into(),
2940                    path_suffixes: vec!["rs".to_string()],
2941                    language_server: Some(language_server_config),
2942                    ..Default::default()
2943                },
2944                Some(tree_sitter_rust::language()),
2945            )));
2946
2947        // Connect to a server as 2 clients.
2948        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2949        let client_a = server.create_client(cx_a, "user_a").await;
2950        let client_b = server.create_client(cx_b, "user_b").await;
2951
2952        // Share a project as client A
2953        let project_a = cx_a.update(|cx| {
2954            Project::local(
2955                client_a.clone(),
2956                client_a.user_store.clone(),
2957                lang_registry.clone(),
2958                fs.clone(),
2959                cx,
2960            )
2961        });
2962        let (worktree_a, _) = project_a
2963            .update(cx_a, |p, cx| {
2964                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2965            })
2966            .await
2967            .unwrap();
2968        worktree_a
2969            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2970            .await;
2971        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2972        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2973        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2974
2975        // Join the worktree as client B.
2976        let project_b = Project::remote(
2977            project_id,
2978            client_b.clone(),
2979            client_b.user_store.clone(),
2980            lang_registry.clone(),
2981            fs.clone(),
2982            &mut cx_b.to_async(),
2983        )
2984        .await
2985        .unwrap();
2986
2987        // Cause the language server to start.
2988        let _buffer = cx_b
2989            .background()
2990            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2991            .await
2992            .unwrap();
2993
2994        // Request the definition of a symbol as the guest.
2995        let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
2996        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2997        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2998            #[allow(deprecated)]
2999            Some(vec![lsp::SymbolInformation {
3000                name: "TWO".into(),
3001                location: lsp::Location {
3002                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3003                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3004                },
3005                kind: lsp::SymbolKind::CONSTANT,
3006                tags: None,
3007                container_name: None,
3008                deprecated: None,
3009            }])
3010        });
3011
3012        let symbols = symbols.await.unwrap();
3013        assert_eq!(symbols.len(), 1);
3014        assert_eq!(symbols[0].name, "TWO");
3015
3016        // Open one of the returned symbols.
3017        let buffer_b_2 = project_b
3018            .update(cx_b, |project, cx| {
3019                project.open_buffer_for_symbol(&symbols[0], cx)
3020            })
3021            .await
3022            .unwrap();
3023        buffer_b_2.read_with(cx_b, |buffer, _| {
3024            assert_eq!(
3025                buffer.file().unwrap().path().as_ref(),
3026                Path::new("../crate-2/two.rs")
3027            );
3028        });
3029
3030        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3031        let mut fake_symbol = symbols[0].clone();
3032        fake_symbol.path = Path::new("/code/secrets").into();
3033        let error = project_b
3034            .update(cx_b, |project, cx| {
3035                project.open_buffer_for_symbol(&fake_symbol, cx)
3036            })
3037            .await
3038            .unwrap_err();
3039        assert!(error.to_string().contains("invalid symbol signature"));
3040    }
3041
3042    #[gpui::test(iterations = 10)]
3043    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3044        cx_a: &mut TestAppContext,
3045        cx_b: &mut TestAppContext,
3046        mut rng: StdRng,
3047    ) {
3048        cx_a.foreground().forbid_parking();
3049        let mut lang_registry = Arc::new(LanguageRegistry::test());
3050        let fs = FakeFs::new(cx_a.background());
3051        fs.insert_tree(
3052            "/root",
3053            json!({
3054                ".zed.toml": r#"collaborators = ["user_b"]"#,
3055                "a.rs": "const ONE: usize = b::TWO;",
3056                "b.rs": "const TWO: usize = 2",
3057            }),
3058        )
3059        .await;
3060
3061        // Set up a fake language server.
3062        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3063
3064        Arc::get_mut(&mut lang_registry)
3065            .unwrap()
3066            .add(Arc::new(Language::new(
3067                LanguageConfig {
3068                    name: "Rust".into(),
3069                    path_suffixes: vec!["rs".to_string()],
3070                    language_server: Some(language_server_config),
3071                    ..Default::default()
3072                },
3073                Some(tree_sitter_rust::language()),
3074            )));
3075
3076        // Connect to a server as 2 clients.
3077        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3078        let client_a = server.create_client(cx_a, "user_a").await;
3079        let client_b = server.create_client(cx_b, "user_b").await;
3080
3081        // Share a project as client A
3082        let project_a = cx_a.update(|cx| {
3083            Project::local(
3084                client_a.clone(),
3085                client_a.user_store.clone(),
3086                lang_registry.clone(),
3087                fs.clone(),
3088                cx,
3089            )
3090        });
3091
3092        let (worktree_a, _) = project_a
3093            .update(cx_a, |p, cx| {
3094                p.find_or_create_local_worktree("/root", true, cx)
3095            })
3096            .await
3097            .unwrap();
3098        worktree_a
3099            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3100            .await;
3101        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3102        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3103        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3104
3105        // Join the worktree as client B.
3106        let project_b = Project::remote(
3107            project_id,
3108            client_b.clone(),
3109            client_b.user_store.clone(),
3110            lang_registry.clone(),
3111            fs.clone(),
3112            &mut cx_b.to_async(),
3113        )
3114        .await
3115        .unwrap();
3116
3117        let buffer_b1 = cx_b
3118            .background()
3119            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3120            .await
3121            .unwrap();
3122
3123        let definitions;
3124        let buffer_b2;
3125        if rng.gen() {
3126            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3127            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3128        } else {
3129            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3130            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3131        }
3132
3133        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3134        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3135            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3136                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3137                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3138            )))
3139        });
3140
3141        let buffer_b2 = buffer_b2.await.unwrap();
3142        let definitions = definitions.await.unwrap();
3143        assert_eq!(definitions.len(), 1);
3144        assert_eq!(definitions[0].buffer, buffer_b2);
3145    }
3146
3147    #[gpui::test(iterations = 10)]
3148    async fn test_collaborating_with_code_actions(
3149        cx_a: &mut TestAppContext,
3150        cx_b: &mut TestAppContext,
3151    ) {
3152        cx_a.foreground().forbid_parking();
3153        let mut lang_registry = Arc::new(LanguageRegistry::test());
3154        let fs = FakeFs::new(cx_a.background());
3155        let mut path_openers_b = Vec::new();
3156        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3157
3158        // Set up a fake language server.
3159        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3160        Arc::get_mut(&mut lang_registry)
3161            .unwrap()
3162            .add(Arc::new(Language::new(
3163                LanguageConfig {
3164                    name: "Rust".into(),
3165                    path_suffixes: vec!["rs".to_string()],
3166                    language_server: Some(language_server_config),
3167                    ..Default::default()
3168                },
3169                Some(tree_sitter_rust::language()),
3170            )));
3171
3172        // Connect to a server as 2 clients.
3173        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3174        let client_a = server.create_client(cx_a, "user_a").await;
3175        let client_b = server.create_client(cx_b, "user_b").await;
3176
3177        // Share a project as client A
3178        fs.insert_tree(
3179            "/a",
3180            json!({
3181                ".zed.toml": r#"collaborators = ["user_b"]"#,
3182                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3183                "other.rs": "pub fn foo() -> usize { 4 }",
3184            }),
3185        )
3186        .await;
3187        let project_a = cx_a.update(|cx| {
3188            Project::local(
3189                client_a.clone(),
3190                client_a.user_store.clone(),
3191                lang_registry.clone(),
3192                fs.clone(),
3193                cx,
3194            )
3195        });
3196        let (worktree_a, _) = project_a
3197            .update(cx_a, |p, cx| {
3198                p.find_or_create_local_worktree("/a", true, cx)
3199            })
3200            .await
3201            .unwrap();
3202        worktree_a
3203            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3204            .await;
3205        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3206        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3207        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3208
3209        // Join the worktree as client B.
3210        let project_b = Project::remote(
3211            project_id,
3212            client_b.clone(),
3213            client_b.user_store.clone(),
3214            lang_registry.clone(),
3215            fs.clone(),
3216            &mut cx_b.to_async(),
3217        )
3218        .await
3219        .unwrap();
3220        let mut params = cx_b.update(WorkspaceParams::test);
3221        params.languages = lang_registry.clone();
3222        params.client = client_b.client.clone();
3223        params.user_store = client_b.user_store.clone();
3224        params.project = project_b;
3225        params.path_openers = path_openers_b.into();
3226
3227        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3228        let editor_b = workspace_b
3229            .update(cx_b, |workspace, cx| {
3230                workspace.open_path((worktree_id, "main.rs").into(), cx)
3231            })
3232            .await
3233            .unwrap()
3234            .downcast::<Editor>()
3235            .unwrap();
3236
3237        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3238        fake_language_server
3239            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3240                assert_eq!(
3241                    params.text_document.uri,
3242                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3243                );
3244                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3245                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3246                None
3247            })
3248            .next()
3249            .await;
3250
3251        // Move cursor to a location that contains code actions.
3252        editor_b.update(cx_b, |editor, cx| {
3253            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3254            cx.focus(&editor_b);
3255        });
3256
3257        fake_language_server
3258            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3259                assert_eq!(
3260                    params.text_document.uri,
3261                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3262                );
3263                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3264                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3265
3266                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3267                    lsp::CodeAction {
3268                        title: "Inline into all callers".to_string(),
3269                        edit: Some(lsp::WorkspaceEdit {
3270                            changes: Some(
3271                                [
3272                                    (
3273                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3274                                        vec![lsp::TextEdit::new(
3275                                            lsp::Range::new(
3276                                                lsp::Position::new(1, 22),
3277                                                lsp::Position::new(1, 34),
3278                                            ),
3279                                            "4".to_string(),
3280                                        )],
3281                                    ),
3282                                    (
3283                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3284                                        vec![lsp::TextEdit::new(
3285                                            lsp::Range::new(
3286                                                lsp::Position::new(0, 0),
3287                                                lsp::Position::new(0, 27),
3288                                            ),
3289                                            "".to_string(),
3290                                        )],
3291                                    ),
3292                                ]
3293                                .into_iter()
3294                                .collect(),
3295                            ),
3296                            ..Default::default()
3297                        }),
3298                        data: Some(json!({
3299                            "codeActionParams": {
3300                                "range": {
3301                                    "start": {"line": 1, "column": 31},
3302                                    "end": {"line": 1, "column": 31},
3303                                }
3304                            }
3305                        })),
3306                        ..Default::default()
3307                    },
3308                )])
3309            })
3310            .next()
3311            .await;
3312
3313        // Toggle code actions and wait for them to display.
3314        editor_b.update(cx_b, |editor, cx| {
3315            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3316        });
3317        editor_b
3318            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3319            .await;
3320
3321        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3322
3323        // Confirming the code action will trigger a resolve request.
3324        let confirm_action = workspace_b
3325            .update(cx_b, |workspace, cx| {
3326                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3327            })
3328            .unwrap();
3329        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3330            lsp::CodeAction {
3331                title: "Inline into all callers".to_string(),
3332                edit: Some(lsp::WorkspaceEdit {
3333                    changes: Some(
3334                        [
3335                            (
3336                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3337                                vec![lsp::TextEdit::new(
3338                                    lsp::Range::new(
3339                                        lsp::Position::new(1, 22),
3340                                        lsp::Position::new(1, 34),
3341                                    ),
3342                                    "4".to_string(),
3343                                )],
3344                            ),
3345                            (
3346                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3347                                vec![lsp::TextEdit::new(
3348                                    lsp::Range::new(
3349                                        lsp::Position::new(0, 0),
3350                                        lsp::Position::new(0, 27),
3351                                    ),
3352                                    "".to_string(),
3353                                )],
3354                            ),
3355                        ]
3356                        .into_iter()
3357                        .collect(),
3358                    ),
3359                    ..Default::default()
3360                }),
3361                ..Default::default()
3362            }
3363        });
3364
3365        // After the action is confirmed, an editor containing both modified files is opened.
3366        confirm_action.await.unwrap();
3367        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3368            workspace
3369                .active_item(cx)
3370                .unwrap()
3371                .downcast::<Editor>()
3372                .unwrap()
3373        });
3374        code_action_editor.update(cx_b, |editor, cx| {
3375            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3376            editor.undo(&Undo, cx);
3377            assert_eq!(
3378                editor.text(cx),
3379                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3380            );
3381            editor.redo(&Redo, cx);
3382            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3383        });
3384    }
3385
3386    #[gpui::test(iterations = 10)]
3387    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3388        cx_a.foreground().forbid_parking();
3389        let mut lang_registry = Arc::new(LanguageRegistry::test());
3390        let fs = FakeFs::new(cx_a.background());
3391        let mut path_openers_b = Vec::new();
3392        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3393
3394        // Set up a fake language server.
3395        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3396        Arc::get_mut(&mut lang_registry)
3397            .unwrap()
3398            .add(Arc::new(Language::new(
3399                LanguageConfig {
3400                    name: "Rust".into(),
3401                    path_suffixes: vec!["rs".to_string()],
3402                    language_server: Some(language_server_config),
3403                    ..Default::default()
3404                },
3405                Some(tree_sitter_rust::language()),
3406            )));
3407
3408        // Connect to a server as 2 clients.
3409        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3410        let client_a = server.create_client(cx_a, "user_a").await;
3411        let client_b = server.create_client(cx_b, "user_b").await;
3412
3413        // Share a project as client A
3414        fs.insert_tree(
3415            "/dir",
3416            json!({
3417                ".zed.toml": r#"collaborators = ["user_b"]"#,
3418                "one.rs": "const ONE: usize = 1;",
3419                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3420            }),
3421        )
3422        .await;
3423        let project_a = cx_a.update(|cx| {
3424            Project::local(
3425                client_a.clone(),
3426                client_a.user_store.clone(),
3427                lang_registry.clone(),
3428                fs.clone(),
3429                cx,
3430            )
3431        });
3432        let (worktree_a, _) = project_a
3433            .update(cx_a, |p, cx| {
3434                p.find_or_create_local_worktree("/dir", true, cx)
3435            })
3436            .await
3437            .unwrap();
3438        worktree_a
3439            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3440            .await;
3441        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3442        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3443        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3444
3445        // Join the worktree as client B.
3446        let project_b = Project::remote(
3447            project_id,
3448            client_b.clone(),
3449            client_b.user_store.clone(),
3450            lang_registry.clone(),
3451            fs.clone(),
3452            &mut cx_b.to_async(),
3453        )
3454        .await
3455        .unwrap();
3456        let mut params = cx_b.update(WorkspaceParams::test);
3457        params.languages = lang_registry.clone();
3458        params.client = client_b.client.clone();
3459        params.user_store = client_b.user_store.clone();
3460        params.project = project_b;
3461        params.path_openers = path_openers_b.into();
3462
3463        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3464        let editor_b = workspace_b
3465            .update(cx_b, |workspace, cx| {
3466                workspace.open_path((worktree_id, "one.rs").into(), cx)
3467            })
3468            .await
3469            .unwrap()
3470            .downcast::<Editor>()
3471            .unwrap();
3472        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3473
3474        // Move cursor to a location that can be renamed.
3475        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3476            editor.select_ranges([7..7], None, cx);
3477            editor.rename(&Rename, cx).unwrap()
3478        });
3479
3480        fake_language_server
3481            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3482                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3483                assert_eq!(params.position, lsp::Position::new(0, 7));
3484                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3485                    lsp::Position::new(0, 6),
3486                    lsp::Position::new(0, 9),
3487                )))
3488            })
3489            .next()
3490            .await
3491            .unwrap();
3492        prepare_rename.await.unwrap();
3493        editor_b.update(cx_b, |editor, cx| {
3494            let rename = editor.pending_rename().unwrap();
3495            let buffer = editor.buffer().read(cx).snapshot(cx);
3496            assert_eq!(
3497                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3498                6..9
3499            );
3500            rename.editor.update(cx, |rename_editor, cx| {
3501                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3502                    rename_buffer.edit([0..3], "THREE", cx);
3503                });
3504            });
3505        });
3506
3507        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3508            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3509        });
3510        fake_language_server
3511            .handle_request::<lsp::request::Rename, _>(|params, _| {
3512                assert_eq!(
3513                    params.text_document_position.text_document.uri.as_str(),
3514                    "file:///dir/one.rs"
3515                );
3516                assert_eq!(
3517                    params.text_document_position.position,
3518                    lsp::Position::new(0, 6)
3519                );
3520                assert_eq!(params.new_name, "THREE");
3521                Some(lsp::WorkspaceEdit {
3522                    changes: Some(
3523                        [
3524                            (
3525                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3526                                vec![lsp::TextEdit::new(
3527                                    lsp::Range::new(
3528                                        lsp::Position::new(0, 6),
3529                                        lsp::Position::new(0, 9),
3530                                    ),
3531                                    "THREE".to_string(),
3532                                )],
3533                            ),
3534                            (
3535                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3536                                vec![
3537                                    lsp::TextEdit::new(
3538                                        lsp::Range::new(
3539                                            lsp::Position::new(0, 24),
3540                                            lsp::Position::new(0, 27),
3541                                        ),
3542                                        "THREE".to_string(),
3543                                    ),
3544                                    lsp::TextEdit::new(
3545                                        lsp::Range::new(
3546                                            lsp::Position::new(0, 35),
3547                                            lsp::Position::new(0, 38),
3548                                        ),
3549                                        "THREE".to_string(),
3550                                    ),
3551                                ],
3552                            ),
3553                        ]
3554                        .into_iter()
3555                        .collect(),
3556                    ),
3557                    ..Default::default()
3558                })
3559            })
3560            .next()
3561            .await
3562            .unwrap();
3563        confirm_rename.await.unwrap();
3564
3565        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3566            workspace
3567                .active_item(cx)
3568                .unwrap()
3569                .downcast::<Editor>()
3570                .unwrap()
3571        });
3572        rename_editor.update(cx_b, |editor, cx| {
3573            assert_eq!(
3574                editor.text(cx),
3575                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3576            );
3577            editor.undo(&Undo, cx);
3578            assert_eq!(
3579                editor.text(cx),
3580                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3581            );
3582            editor.redo(&Redo, cx);
3583            assert_eq!(
3584                editor.text(cx),
3585                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3586            );
3587        });
3588
3589        // Ensure temporary rename edits cannot be undone/redone.
3590        editor_b.update(cx_b, |editor, cx| {
3591            editor.undo(&Undo, cx);
3592            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3593            editor.undo(&Undo, cx);
3594            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3595            editor.redo(&Redo, cx);
3596            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3597        })
3598    }
3599
3600    #[gpui::test(iterations = 10)]
3601    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3602        cx_a.foreground().forbid_parking();
3603
3604        // Connect to a server as 2 clients.
3605        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3606        let client_a = server.create_client(cx_a, "user_a").await;
3607        let client_b = server.create_client(cx_b, "user_b").await;
3608
3609        // Create an org that includes these 2 users.
3610        let db = &server.app_state.db;
3611        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3612        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3613            .await
3614            .unwrap();
3615        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3616            .await
3617            .unwrap();
3618
3619        // Create a channel that includes all the users.
3620        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3621        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3622            .await
3623            .unwrap();
3624        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3625            .await
3626            .unwrap();
3627        db.create_channel_message(
3628            channel_id,
3629            client_b.current_user_id(&cx_b),
3630            "hello A, it's B.",
3631            OffsetDateTime::now_utc(),
3632            1,
3633        )
3634        .await
3635        .unwrap();
3636
3637        let channels_a = cx_a
3638            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3639        channels_a
3640            .condition(cx_a, |list, _| list.available_channels().is_some())
3641            .await;
3642        channels_a.read_with(cx_a, |list, _| {
3643            assert_eq!(
3644                list.available_channels().unwrap(),
3645                &[ChannelDetails {
3646                    id: channel_id.to_proto(),
3647                    name: "test-channel".to_string()
3648                }]
3649            )
3650        });
3651        let channel_a = channels_a.update(cx_a, |this, cx| {
3652            this.get_channel(channel_id.to_proto(), cx).unwrap()
3653        });
3654        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3655        channel_a
3656            .condition(&cx_a, |channel, _| {
3657                channel_messages(channel)
3658                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3659            })
3660            .await;
3661
3662        let channels_b = cx_b
3663            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3664        channels_b
3665            .condition(cx_b, |list, _| list.available_channels().is_some())
3666            .await;
3667        channels_b.read_with(cx_b, |list, _| {
3668            assert_eq!(
3669                list.available_channels().unwrap(),
3670                &[ChannelDetails {
3671                    id: channel_id.to_proto(),
3672                    name: "test-channel".to_string()
3673                }]
3674            )
3675        });
3676
3677        let channel_b = channels_b.update(cx_b, |this, cx| {
3678            this.get_channel(channel_id.to_proto(), cx).unwrap()
3679        });
3680        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3681        channel_b
3682            .condition(&cx_b, |channel, _| {
3683                channel_messages(channel)
3684                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3685            })
3686            .await;
3687
3688        channel_a
3689            .update(cx_a, |channel, cx| {
3690                channel
3691                    .send_message("oh, hi B.".to_string(), cx)
3692                    .unwrap()
3693                    .detach();
3694                let task = channel.send_message("sup".to_string(), cx).unwrap();
3695                assert_eq!(
3696                    channel_messages(channel),
3697                    &[
3698                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3699                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3700                        ("user_a".to_string(), "sup".to_string(), true)
3701                    ]
3702                );
3703                task
3704            })
3705            .await
3706            .unwrap();
3707
3708        channel_b
3709            .condition(&cx_b, |channel, _| {
3710                channel_messages(channel)
3711                    == [
3712                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3713                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3714                        ("user_a".to_string(), "sup".to_string(), false),
3715                    ]
3716            })
3717            .await;
3718
3719        assert_eq!(
3720            server
3721                .state()
3722                .await
3723                .channel(channel_id)
3724                .unwrap()
3725                .connection_ids
3726                .len(),
3727            2
3728        );
3729        cx_b.update(|_| drop(channel_b));
3730        server
3731            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3732            .await;
3733
3734        cx_a.update(|_| drop(channel_a));
3735        server
3736            .condition(|state| state.channel(channel_id).is_none())
3737            .await;
3738    }
3739
3740    #[gpui::test(iterations = 10)]
3741    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3742        cx_a.foreground().forbid_parking();
3743
3744        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3745        let client_a = server.create_client(cx_a, "user_a").await;
3746
3747        let db = &server.app_state.db;
3748        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3749        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3750        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3751            .await
3752            .unwrap();
3753        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3754            .await
3755            .unwrap();
3756
3757        let channels_a = cx_a
3758            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3759        channels_a
3760            .condition(cx_a, |list, _| list.available_channels().is_some())
3761            .await;
3762        let channel_a = channels_a.update(cx_a, |this, cx| {
3763            this.get_channel(channel_id.to_proto(), cx).unwrap()
3764        });
3765
3766        // Messages aren't allowed to be too long.
3767        channel_a
3768            .update(cx_a, |channel, cx| {
3769                let long_body = "this is long.\n".repeat(1024);
3770                channel.send_message(long_body, cx).unwrap()
3771            })
3772            .await
3773            .unwrap_err();
3774
3775        // Messages aren't allowed to be blank.
3776        channel_a.update(cx_a, |channel, cx| {
3777            channel.send_message(String::new(), cx).unwrap_err()
3778        });
3779
3780        // Leading and trailing whitespace are trimmed.
3781        channel_a
3782            .update(cx_a, |channel, cx| {
3783                channel
3784                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3785                    .unwrap()
3786            })
3787            .await
3788            .unwrap();
3789        assert_eq!(
3790            db.get_channel_messages(channel_id, 10, None)
3791                .await
3792                .unwrap()
3793                .iter()
3794                .map(|m| &m.body)
3795                .collect::<Vec<_>>(),
3796            &["surrounded by whitespace"]
3797        );
3798    }
3799
3800    #[gpui::test(iterations = 10)]
3801    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3802        cx_a.foreground().forbid_parking();
3803
3804        // Connect to a server as 2 clients.
3805        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3806        let client_a = server.create_client(cx_a, "user_a").await;
3807        let client_b = server.create_client(cx_b, "user_b").await;
3808        let mut status_b = client_b.status();
3809
3810        // Create an org that includes these 2 users.
3811        let db = &server.app_state.db;
3812        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3813        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3814            .await
3815            .unwrap();
3816        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3817            .await
3818            .unwrap();
3819
3820        // Create a channel that includes all the users.
3821        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3822        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3823            .await
3824            .unwrap();
3825        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3826            .await
3827            .unwrap();
3828        db.create_channel_message(
3829            channel_id,
3830            client_b.current_user_id(&cx_b),
3831            "hello A, it's B.",
3832            OffsetDateTime::now_utc(),
3833            2,
3834        )
3835        .await
3836        .unwrap();
3837
3838        let channels_a = cx_a
3839            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3840        channels_a
3841            .condition(cx_a, |list, _| list.available_channels().is_some())
3842            .await;
3843
3844        channels_a.read_with(cx_a, |list, _| {
3845            assert_eq!(
3846                list.available_channels().unwrap(),
3847                &[ChannelDetails {
3848                    id: channel_id.to_proto(),
3849                    name: "test-channel".to_string()
3850                }]
3851            )
3852        });
3853        let channel_a = channels_a.update(cx_a, |this, cx| {
3854            this.get_channel(channel_id.to_proto(), cx).unwrap()
3855        });
3856        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3857        channel_a
3858            .condition(&cx_a, |channel, _| {
3859                channel_messages(channel)
3860                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3861            })
3862            .await;
3863
3864        let channels_b = cx_b
3865            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3866        channels_b
3867            .condition(cx_b, |list, _| list.available_channels().is_some())
3868            .await;
3869        channels_b.read_with(cx_b, |list, _| {
3870            assert_eq!(
3871                list.available_channels().unwrap(),
3872                &[ChannelDetails {
3873                    id: channel_id.to_proto(),
3874                    name: "test-channel".to_string()
3875                }]
3876            )
3877        });
3878
3879        let channel_b = channels_b.update(cx_b, |this, cx| {
3880            this.get_channel(channel_id.to_proto(), cx).unwrap()
3881        });
3882        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3883        channel_b
3884            .condition(&cx_b, |channel, _| {
3885                channel_messages(channel)
3886                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3887            })
3888            .await;
3889
3890        // Disconnect client B, ensuring we can still access its cached channel data.
3891        server.forbid_connections();
3892        server.disconnect_client(client_b.current_user_id(&cx_b));
3893        cx_b.foreground().advance_clock(Duration::from_secs(3));
3894        while !matches!(
3895            status_b.next().await,
3896            Some(client::Status::ReconnectionError { .. })
3897        ) {}
3898
3899        channels_b.read_with(cx_b, |channels, _| {
3900            assert_eq!(
3901                channels.available_channels().unwrap(),
3902                [ChannelDetails {
3903                    id: channel_id.to_proto(),
3904                    name: "test-channel".to_string()
3905                }]
3906            )
3907        });
3908        channel_b.read_with(cx_b, |channel, _| {
3909            assert_eq!(
3910                channel_messages(channel),
3911                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3912            )
3913        });
3914
3915        // Send a message from client B while it is disconnected.
3916        channel_b
3917            .update(cx_b, |channel, cx| {
3918                let task = channel
3919                    .send_message("can you see this?".to_string(), cx)
3920                    .unwrap();
3921                assert_eq!(
3922                    channel_messages(channel),
3923                    &[
3924                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3925                        ("user_b".to_string(), "can you see this?".to_string(), true)
3926                    ]
3927                );
3928                task
3929            })
3930            .await
3931            .unwrap_err();
3932
3933        // Send a message from client A while B is disconnected.
3934        channel_a
3935            .update(cx_a, |channel, cx| {
3936                channel
3937                    .send_message("oh, hi B.".to_string(), cx)
3938                    .unwrap()
3939                    .detach();
3940                let task = channel.send_message("sup".to_string(), cx).unwrap();
3941                assert_eq!(
3942                    channel_messages(channel),
3943                    &[
3944                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3945                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3946                        ("user_a".to_string(), "sup".to_string(), true)
3947                    ]
3948                );
3949                task
3950            })
3951            .await
3952            .unwrap();
3953
3954        // Give client B a chance to reconnect.
3955        server.allow_connections();
3956        cx_b.foreground().advance_clock(Duration::from_secs(10));
3957
3958        // Verify that B sees the new messages upon reconnection, as well as the message client B
3959        // sent while offline.
3960        channel_b
3961            .condition(&cx_b, |channel, _| {
3962                channel_messages(channel)
3963                    == [
3964                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3965                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3966                        ("user_a".to_string(), "sup".to_string(), false),
3967                        ("user_b".to_string(), "can you see this?".to_string(), false),
3968                    ]
3969            })
3970            .await;
3971
3972        // Ensure client A and B can communicate normally after reconnection.
3973        channel_a
3974            .update(cx_a, |channel, cx| {
3975                channel.send_message("you online?".to_string(), cx).unwrap()
3976            })
3977            .await
3978            .unwrap();
3979        channel_b
3980            .condition(&cx_b, |channel, _| {
3981                channel_messages(channel)
3982                    == [
3983                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3984                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3985                        ("user_a".to_string(), "sup".to_string(), false),
3986                        ("user_b".to_string(), "can you see this?".to_string(), false),
3987                        ("user_a".to_string(), "you online?".to_string(), false),
3988                    ]
3989            })
3990            .await;
3991
3992        channel_b
3993            .update(cx_b, |channel, cx| {
3994                channel.send_message("yep".to_string(), cx).unwrap()
3995            })
3996            .await
3997            .unwrap();
3998        channel_a
3999            .condition(&cx_a, |channel, _| {
4000                channel_messages(channel)
4001                    == [
4002                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4003                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4004                        ("user_a".to_string(), "sup".to_string(), false),
4005                        ("user_b".to_string(), "can you see this?".to_string(), false),
4006                        ("user_a".to_string(), "you online?".to_string(), false),
4007                        ("user_b".to_string(), "yep".to_string(), false),
4008                    ]
4009            })
4010            .await;
4011    }
4012
4013    #[gpui::test(iterations = 10)]
4014    async fn test_contacts(
4015        cx_a: &mut TestAppContext,
4016        cx_b: &mut TestAppContext,
4017        cx_c: &mut TestAppContext,
4018    ) {
4019        cx_a.foreground().forbid_parking();
4020        let lang_registry = Arc::new(LanguageRegistry::test());
4021        let fs = FakeFs::new(cx_a.background());
4022
4023        // Connect to a server as 3 clients.
4024        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4025        let client_a = server.create_client(cx_a, "user_a").await;
4026        let client_b = server.create_client(cx_b, "user_b").await;
4027        let client_c = server.create_client(cx_c, "user_c").await;
4028
4029        // Share a worktree as client A.
4030        fs.insert_tree(
4031            "/a",
4032            json!({
4033                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4034            }),
4035        )
4036        .await;
4037
4038        let project_a = cx_a.update(|cx| {
4039            Project::local(
4040                client_a.clone(),
4041                client_a.user_store.clone(),
4042                lang_registry.clone(),
4043                fs.clone(),
4044                cx,
4045            )
4046        });
4047        let (worktree_a, _) = project_a
4048            .update(cx_a, |p, cx| {
4049                p.find_or_create_local_worktree("/a", true, cx)
4050            })
4051            .await
4052            .unwrap();
4053        worktree_a
4054            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4055            .await;
4056
4057        client_a
4058            .user_store
4059            .condition(&cx_a, |user_store, _| {
4060                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4061            })
4062            .await;
4063        client_b
4064            .user_store
4065            .condition(&cx_b, |user_store, _| {
4066                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4067            })
4068            .await;
4069        client_c
4070            .user_store
4071            .condition(&cx_c, |user_store, _| {
4072                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4073            })
4074            .await;
4075
4076        let project_id = project_a
4077            .update(cx_a, |project, _| project.next_remote_id())
4078            .await;
4079        project_a
4080            .update(cx_a, |project, cx| project.share(cx))
4081            .await
4082            .unwrap();
4083
4084        let _project_b = Project::remote(
4085            project_id,
4086            client_b.clone(),
4087            client_b.user_store.clone(),
4088            lang_registry.clone(),
4089            fs.clone(),
4090            &mut cx_b.to_async(),
4091        )
4092        .await
4093        .unwrap();
4094
4095        client_a
4096            .user_store
4097            .condition(&cx_a, |user_store, _| {
4098                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4099            })
4100            .await;
4101        client_b
4102            .user_store
4103            .condition(&cx_b, |user_store, _| {
4104                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4105            })
4106            .await;
4107        client_c
4108            .user_store
4109            .condition(&cx_c, |user_store, _| {
4110                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4111            })
4112            .await;
4113
4114        project_a
4115            .condition(&cx_a, |project, _| {
4116                project.collaborators().contains_key(&client_b.peer_id)
4117            })
4118            .await;
4119
4120        cx_a.update(move |_| drop(project_a));
4121        client_a
4122            .user_store
4123            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4124            .await;
4125        client_b
4126            .user_store
4127            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4128            .await;
4129        client_c
4130            .user_store
4131            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4132            .await;
4133
4134        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4135            user_store
4136                .contacts()
4137                .iter()
4138                .map(|contact| {
4139                    let worktrees = contact
4140                        .projects
4141                        .iter()
4142                        .map(|p| {
4143                            (
4144                                p.worktree_root_names[0].as_str(),
4145                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4146                            )
4147                        })
4148                        .collect();
4149                    (contact.user.github_login.as_str(), worktrees)
4150                })
4151                .collect()
4152        }
4153    }
4154
4155    #[gpui::test(iterations = 100)]
4156    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4157        cx.foreground().forbid_parking();
4158        let max_peers = env::var("MAX_PEERS")
4159            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4160            .unwrap_or(5);
4161        let max_operations = env::var("OPERATIONS")
4162            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4163            .unwrap_or(10);
4164
4165        let rng = Arc::new(Mutex::new(rng));
4166
4167        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4168        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4169
4170        let fs = FakeFs::new(cx.background());
4171        fs.insert_tree(
4172            "/_collab",
4173            json!({
4174                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4175            }),
4176        )
4177        .await;
4178
4179        let operations = Rc::new(Cell::new(0));
4180        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4181        let mut clients = Vec::new();
4182
4183        let mut next_entity_id = 100000;
4184        let mut host_cx = TestAppContext::new(
4185            cx.foreground_platform(),
4186            cx.platform(),
4187            cx.foreground(),
4188            cx.background(),
4189            cx.font_cache(),
4190            cx.leak_detector(),
4191            next_entity_id,
4192        );
4193        let host = server.create_client(&mut host_cx, "host").await;
4194        let host_project = host_cx.update(|cx| {
4195            Project::local(
4196                host.client.clone(),
4197                host.user_store.clone(),
4198                Arc::new(LanguageRegistry::test()),
4199                fs.clone(),
4200                cx,
4201            )
4202        });
4203        let host_project_id = host_project
4204            .update(&mut host_cx, |p, _| p.next_remote_id())
4205            .await;
4206
4207        let (collab_worktree, _) = host_project
4208            .update(&mut host_cx, |project, cx| {
4209                project.find_or_create_local_worktree("/_collab", true, cx)
4210            })
4211            .await
4212            .unwrap();
4213        collab_worktree
4214            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4215            .await;
4216        host_project
4217            .update(&mut host_cx, |project, cx| project.share(cx))
4218            .await
4219            .unwrap();
4220
4221        clients.push(cx.foreground().spawn(host.simulate_host(
4222            host_project,
4223            language_server_config,
4224            operations.clone(),
4225            max_operations,
4226            rng.clone(),
4227            host_cx,
4228        )));
4229
4230        while operations.get() < max_operations {
4231            cx.background().simulate_random_delay().await;
4232            if clients.len() >= max_peers {
4233                break;
4234            } else if rng.lock().gen_bool(0.05) {
4235                operations.set(operations.get() + 1);
4236
4237                let guest_id = clients.len();
4238                log::info!("Adding guest {}", guest_id);
4239                next_entity_id += 100000;
4240                let mut guest_cx = TestAppContext::new(
4241                    cx.foreground_platform(),
4242                    cx.platform(),
4243                    cx.foreground(),
4244                    cx.background(),
4245                    cx.font_cache(),
4246                    cx.leak_detector(),
4247                    next_entity_id,
4248                );
4249                let guest = server
4250                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4251                    .await;
4252                let guest_project = Project::remote(
4253                    host_project_id,
4254                    guest.client.clone(),
4255                    guest.user_store.clone(),
4256                    guest_lang_registry.clone(),
4257                    FakeFs::new(cx.background()),
4258                    &mut guest_cx.to_async(),
4259                )
4260                .await
4261                .unwrap();
4262                clients.push(cx.foreground().spawn(guest.simulate_guest(
4263                    guest_id,
4264                    guest_project,
4265                    operations.clone(),
4266                    max_operations,
4267                    rng.clone(),
4268                    guest_cx,
4269                )));
4270
4271                log::info!("Guest {} added", guest_id);
4272            }
4273        }
4274
4275        let mut clients = futures::future::join_all(clients).await;
4276        cx.foreground().run_until_parked();
4277
4278        let (host_client, mut host_cx) = clients.remove(0);
4279        let host_project = host_client.project.as_ref().unwrap();
4280        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4281            project
4282                .worktrees(cx)
4283                .map(|worktree| {
4284                    let snapshot = worktree.read(cx).snapshot();
4285                    (snapshot.id(), snapshot)
4286                })
4287                .collect::<BTreeMap<_, _>>()
4288        });
4289
4290        host_client
4291            .project
4292            .as_ref()
4293            .unwrap()
4294            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4295
4296        for (guest_client, mut guest_cx) in clients.into_iter() {
4297            let guest_id = guest_client.client.id();
4298            let worktree_snapshots =
4299                guest_client
4300                    .project
4301                    .as_ref()
4302                    .unwrap()
4303                    .read_with(&guest_cx, |project, cx| {
4304                        project
4305                            .worktrees(cx)
4306                            .map(|worktree| {
4307                                let worktree = worktree.read(cx);
4308                                (worktree.id(), worktree.snapshot())
4309                            })
4310                            .collect::<BTreeMap<_, _>>()
4311                    });
4312
4313            assert_eq!(
4314                worktree_snapshots.keys().collect::<Vec<_>>(),
4315                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4316                "guest {} has different worktrees than the host",
4317                guest_id
4318            );
4319            for (id, host_snapshot) in &host_worktree_snapshots {
4320                let guest_snapshot = &worktree_snapshots[id];
4321                assert_eq!(
4322                    guest_snapshot.root_name(),
4323                    host_snapshot.root_name(),
4324                    "guest {} has different root name than the host for worktree {}",
4325                    guest_id,
4326                    id
4327                );
4328                assert_eq!(
4329                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4330                    host_snapshot.entries(false).collect::<Vec<_>>(),
4331                    "guest {} has different snapshot than the host for worktree {}",
4332                    guest_id,
4333                    id
4334                );
4335            }
4336
4337            guest_client
4338                .project
4339                .as_ref()
4340                .unwrap()
4341                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4342
4343            for guest_buffer in &guest_client.buffers {
4344                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4345                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4346                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4347                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4348                        guest_id, guest_client.peer_id, buffer_id
4349                    ))
4350                });
4351                let path = host_buffer
4352                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4353
4354                assert_eq!(
4355                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4356                    0,
4357                    "guest {}, buffer {}, path {:?} has deferred operations",
4358                    guest_id,
4359                    buffer_id,
4360                    path,
4361                );
4362                assert_eq!(
4363                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4364                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4365                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4366                    guest_id,
4367                    buffer_id,
4368                    path
4369                );
4370            }
4371
4372            guest_cx.update(|_| drop(guest_client));
4373        }
4374
4375        host_cx.update(|_| drop(host_client));
4376    }
4377
4378    struct TestServer {
4379        peer: Arc<Peer>,
4380        app_state: Arc<AppState>,
4381        server: Arc<Server>,
4382        foreground: Rc<executor::Foreground>,
4383        notifications: mpsc::UnboundedReceiver<()>,
4384        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4385        forbid_connections: Arc<AtomicBool>,
4386        _test_db: TestDb,
4387    }
4388
4389    impl TestServer {
4390        async fn start(
4391            foreground: Rc<executor::Foreground>,
4392            background: Arc<executor::Background>,
4393        ) -> Self {
4394            let test_db = TestDb::fake(background);
4395            let app_state = Self::build_app_state(&test_db).await;
4396            let peer = Peer::new();
4397            let notifications = mpsc::unbounded();
4398            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4399            Self {
4400                peer,
4401                app_state,
4402                server,
4403                foreground,
4404                notifications: notifications.1,
4405                connection_killers: Default::default(),
4406                forbid_connections: Default::default(),
4407                _test_db: test_db,
4408            }
4409        }
4410
4411        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4412            let http = FakeHttpClient::with_404_response();
4413            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4414            let client_name = name.to_string();
4415            let mut client = Client::new(http.clone());
4416            let server = self.server.clone();
4417            let connection_killers = self.connection_killers.clone();
4418            let forbid_connections = self.forbid_connections.clone();
4419            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4420
4421            Arc::get_mut(&mut client)
4422                .unwrap()
4423                .override_authenticate(move |cx| {
4424                    cx.spawn(|_| async move {
4425                        let access_token = "the-token".to_string();
4426                        Ok(Credentials {
4427                            user_id: user_id.0 as u64,
4428                            access_token,
4429                        })
4430                    })
4431                })
4432                .override_establish_connection(move |credentials, cx| {
4433                    assert_eq!(credentials.user_id, user_id.0 as u64);
4434                    assert_eq!(credentials.access_token, "the-token");
4435
4436                    let server = server.clone();
4437                    let connection_killers = connection_killers.clone();
4438                    let forbid_connections = forbid_connections.clone();
4439                    let client_name = client_name.clone();
4440                    let connection_id_tx = connection_id_tx.clone();
4441                    cx.spawn(move |cx| async move {
4442                        if forbid_connections.load(SeqCst) {
4443                            Err(EstablishConnectionError::other(anyhow!(
4444                                "server is forbidding connections"
4445                            )))
4446                        } else {
4447                            let (client_conn, server_conn, kill_conn) =
4448                                Connection::in_memory(cx.background());
4449                            connection_killers.lock().insert(user_id, kill_conn);
4450                            cx.background()
4451                                .spawn(server.handle_connection(
4452                                    server_conn,
4453                                    client_name,
4454                                    user_id,
4455                                    Some(connection_id_tx),
4456                                    cx.background(),
4457                                ))
4458                                .detach();
4459                            Ok(client_conn)
4460                        }
4461                    })
4462                });
4463
4464            client
4465                .authenticate_and_connect(&cx.to_async())
4466                .await
4467                .unwrap();
4468
4469            Channel::init(&client);
4470            Project::init(&client);
4471
4472            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4473            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4474
4475            let client = TestClient {
4476                client,
4477                peer_id,
4478                user_store,
4479                project: Default::default(),
4480                buffers: Default::default(),
4481            };
4482            client.wait_for_current_user(cx).await;
4483            client
4484        }
4485
4486        fn disconnect_client(&self, user_id: UserId) {
4487            self.connection_killers.lock().remove(&user_id);
4488        }
4489
4490        fn forbid_connections(&self) {
4491            self.forbid_connections.store(true, SeqCst);
4492        }
4493
4494        fn allow_connections(&self) {
4495            self.forbid_connections.store(false, SeqCst);
4496        }
4497
4498        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4499            let mut config = Config::default();
4500            config.session_secret = "a".repeat(32);
4501            config.database_url = test_db.url.clone();
4502            let github_client = github::AppClient::test();
4503            Arc::new(AppState {
4504                db: test_db.db().clone(),
4505                handlebars: Default::default(),
4506                auth_client: auth::build_client("", ""),
4507                repo_client: github::RepoClient::test(&github_client),
4508                github_client,
4509                config,
4510            })
4511        }
4512
4513        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4514            self.server.store.read()
4515        }
4516
4517        async fn condition<F>(&mut self, mut predicate: F)
4518        where
4519            F: FnMut(&Store) -> bool,
4520        {
4521            async_std::future::timeout(Duration::from_millis(500), async {
4522                while !(predicate)(&*self.server.store.read()) {
4523                    self.foreground.start_waiting();
4524                    self.notifications.next().await;
4525                    self.foreground.finish_waiting();
4526                }
4527            })
4528            .await
4529            .expect("condition timed out");
4530        }
4531    }
4532
4533    impl Drop for TestServer {
4534        fn drop(&mut self) {
4535            self.peer.reset();
4536        }
4537    }
4538
4539    struct TestClient {
4540        client: Arc<Client>,
4541        pub peer_id: PeerId,
4542        pub user_store: ModelHandle<UserStore>,
4543        project: Option<ModelHandle<Project>>,
4544        buffers: HashSet<ModelHandle<language::Buffer>>,
4545    }
4546
4547    impl Deref for TestClient {
4548        type Target = Arc<Client>;
4549
4550        fn deref(&self) -> &Self::Target {
4551            &self.client
4552        }
4553    }
4554
4555    impl TestClient {
4556        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4557            UserId::from_proto(
4558                self.user_store
4559                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4560            )
4561        }
4562
4563        async fn wait_for_current_user(&self, cx: &TestAppContext) {
4564            let mut authed_user = self
4565                .user_store
4566                .read_with(cx, |user_store, _| user_store.watch_current_user());
4567            while authed_user.next().await.unwrap().is_none() {}
4568        }
4569
4570        fn simulate_host(
4571            mut self,
4572            project: ModelHandle<Project>,
4573            mut language_server_config: LanguageServerConfig,
4574            operations: Rc<Cell<usize>>,
4575            max_operations: usize,
4576            rng: Arc<Mutex<StdRng>>,
4577            mut cx: TestAppContext,
4578        ) -> impl Future<Output = (Self, TestAppContext)> {
4579            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4580
4581            // Set up a fake language server.
4582            language_server_config.set_fake_initializer({
4583                let rng = rng.clone();
4584                let files = files.clone();
4585                let project = project.downgrade();
4586                move |fake_server| {
4587                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4588                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4589                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4590                                range: lsp::Range::new(
4591                                    lsp::Position::new(0, 0),
4592                                    lsp::Position::new(0, 0),
4593                                ),
4594                                new_text: "the-new-text".to_string(),
4595                            })),
4596                            ..Default::default()
4597                        }]))
4598                    });
4599
4600                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4601                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4602                            lsp::CodeAction {
4603                                title: "the-code-action".to_string(),
4604                                ..Default::default()
4605                            },
4606                        )])
4607                    });
4608
4609                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4610                        |params, _| {
4611                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4612                                params.position,
4613                                params.position,
4614                            )))
4615                        },
4616                    );
4617
4618                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4619                        let files = files.clone();
4620                        let rng = rng.clone();
4621                        move |_, _| {
4622                            let files = files.lock();
4623                            let mut rng = rng.lock();
4624                            let count = rng.gen_range::<usize, _>(1..3);
4625                            let files = (0..count)
4626                                .map(|_| files.choose(&mut *rng).unwrap())
4627                                .collect::<Vec<_>>();
4628                            log::info!("LSP: Returning definitions in files {:?}", &files);
4629                            Some(lsp::GotoDefinitionResponse::Array(
4630                                files
4631                                    .into_iter()
4632                                    .map(|file| lsp::Location {
4633                                        uri: lsp::Url::from_file_path(file).unwrap(),
4634                                        range: Default::default(),
4635                                    })
4636                                    .collect(),
4637                            ))
4638                        }
4639                    });
4640
4641                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4642                        let rng = rng.clone();
4643                        let project = project.clone();
4644                        move |params, mut cx| {
4645                            if let Some(project) = project.upgrade(&cx) {
4646                                project.update(&mut cx, |project, cx| {
4647                                    let path = params
4648                                        .text_document_position_params
4649                                        .text_document
4650                                        .uri
4651                                        .to_file_path()
4652                                        .unwrap();
4653                                    let (worktree, relative_path) =
4654                                        project.find_local_worktree(&path, cx)?;
4655                                    let project_path =
4656                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4657                                    let buffer =
4658                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4659
4660                                    let mut highlights = Vec::new();
4661                                    let highlight_count = rng.lock().gen_range(1..=5);
4662                                    let mut prev_end = 0;
4663                                    for _ in 0..highlight_count {
4664                                        let range =
4665                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4666                                        let start = buffer
4667                                            .offset_to_point_utf16(range.start)
4668                                            .to_lsp_position();
4669                                        let end = buffer
4670                                            .offset_to_point_utf16(range.end)
4671                                            .to_lsp_position();
4672                                        highlights.push(lsp::DocumentHighlight {
4673                                            range: lsp::Range::new(start, end),
4674                                            kind: Some(lsp::DocumentHighlightKind::READ),
4675                                        });
4676                                        prev_end = range.end;
4677                                    }
4678                                    Some(highlights)
4679                                })
4680                            } else {
4681                                None
4682                            }
4683                        }
4684                    });
4685                }
4686            });
4687
4688            project.update(&mut cx, |project, _| {
4689                project.languages().add(Arc::new(Language::new(
4690                    LanguageConfig {
4691                        name: "Rust".into(),
4692                        path_suffixes: vec!["rs".to_string()],
4693                        language_server: Some(language_server_config),
4694                        ..Default::default()
4695                    },
4696                    None,
4697                )));
4698            });
4699
4700            async move {
4701                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4702                while operations.get() < max_operations {
4703                    operations.set(operations.get() + 1);
4704
4705                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4706                    match distribution {
4707                        0..=20 if !files.lock().is_empty() => {
4708                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4709                            let mut path = path.as_path();
4710                            while let Some(parent_path) = path.parent() {
4711                                path = parent_path;
4712                                if rng.lock().gen() {
4713                                    break;
4714                                }
4715                            }
4716
4717                            log::info!("Host: find/create local worktree {:?}", path);
4718                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4719                                project.find_or_create_local_worktree(path, true, cx)
4720                            });
4721                            let find_or_create_worktree = async move {
4722                                find_or_create_worktree.await.unwrap();
4723                            };
4724                            if rng.lock().gen() {
4725                                cx.background().spawn(find_or_create_worktree).detach();
4726                            } else {
4727                                find_or_create_worktree.await;
4728                            }
4729                        }
4730                        10..=80 if !files.lock().is_empty() => {
4731                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4732                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4733                                let (worktree, path) = project
4734                                    .update(&mut cx, |project, cx| {
4735                                        project.find_or_create_local_worktree(
4736                                            file.clone(),
4737                                            true,
4738                                            cx,
4739                                        )
4740                                    })
4741                                    .await
4742                                    .unwrap();
4743                                let project_path =
4744                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4745                                log::info!(
4746                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4747                                    file,
4748                                    project_path.0,
4749                                    project_path.1
4750                                );
4751                                let buffer = project
4752                                    .update(&mut cx, |project, cx| {
4753                                        project.open_buffer(project_path, cx)
4754                                    })
4755                                    .await
4756                                    .unwrap();
4757                                self.buffers.insert(buffer.clone());
4758                                buffer
4759                            } else {
4760                                self.buffers
4761                                    .iter()
4762                                    .choose(&mut *rng.lock())
4763                                    .unwrap()
4764                                    .clone()
4765                            };
4766
4767                            if rng.lock().gen_bool(0.1) {
4768                                cx.update(|cx| {
4769                                    log::info!(
4770                                        "Host: dropping buffer {:?}",
4771                                        buffer.read(cx).file().unwrap().full_path(cx)
4772                                    );
4773                                    self.buffers.remove(&buffer);
4774                                    drop(buffer);
4775                                });
4776                            } else {
4777                                buffer.update(&mut cx, |buffer, cx| {
4778                                    log::info!(
4779                                        "Host: updating buffer {:?} ({})",
4780                                        buffer.file().unwrap().full_path(cx),
4781                                        buffer.remote_id()
4782                                    );
4783                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4784                                });
4785                            }
4786                        }
4787                        _ => loop {
4788                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4789                            let mut path = PathBuf::new();
4790                            path.push("/");
4791                            for _ in 0..path_component_count {
4792                                let letter = rng.lock().gen_range(b'a'..=b'z');
4793                                path.push(std::str::from_utf8(&[letter]).unwrap());
4794                            }
4795                            path.set_extension("rs");
4796                            let parent_path = path.parent().unwrap();
4797
4798                            log::info!("Host: creating file {:?}", path,);
4799
4800                            if fs.create_dir(&parent_path).await.is_ok()
4801                                && fs.create_file(&path, Default::default()).await.is_ok()
4802                            {
4803                                files.lock().push(path);
4804                                break;
4805                            } else {
4806                                log::info!("Host: cannot create file");
4807                            }
4808                        },
4809                    }
4810
4811                    cx.background().simulate_random_delay().await;
4812                }
4813
4814                log::info!("Host done");
4815
4816                self.project = Some(project);
4817                (self, cx)
4818            }
4819        }
4820
4821        pub async fn simulate_guest(
4822            mut self,
4823            guest_id: usize,
4824            project: ModelHandle<Project>,
4825            operations: Rc<Cell<usize>>,
4826            max_operations: usize,
4827            rng: Arc<Mutex<StdRng>>,
4828            mut cx: TestAppContext,
4829        ) -> (Self, TestAppContext) {
4830            while operations.get() < max_operations {
4831                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4832                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4833                        project
4834                            .worktrees(&cx)
4835                            .filter(|worktree| {
4836                                let worktree = worktree.read(cx);
4837                                worktree.is_visible()
4838                                    && worktree.entries(false).any(|e| e.is_file())
4839                            })
4840                            .choose(&mut *rng.lock())
4841                    }) {
4842                        worktree
4843                    } else {
4844                        cx.background().simulate_random_delay().await;
4845                        continue;
4846                    };
4847
4848                    operations.set(operations.get() + 1);
4849                    let (worktree_root_name, project_path) =
4850                        worktree.read_with(&cx, |worktree, _| {
4851                            let entry = worktree
4852                                .entries(false)
4853                                .filter(|e| e.is_file())
4854                                .choose(&mut *rng.lock())
4855                                .unwrap();
4856                            (
4857                                worktree.root_name().to_string(),
4858                                (worktree.id(), entry.path.clone()),
4859                            )
4860                        });
4861                    log::info!(
4862                        "Guest {}: opening path {:?} in worktree {} ({})",
4863                        guest_id,
4864                        project_path.1,
4865                        project_path.0,
4866                        worktree_root_name,
4867                    );
4868                    let buffer = project
4869                        .update(&mut cx, |project, cx| {
4870                            project.open_buffer(project_path.clone(), cx)
4871                        })
4872                        .await
4873                        .unwrap();
4874                    log::info!(
4875                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4876                        guest_id,
4877                        project_path.1,
4878                        project_path.0,
4879                        worktree_root_name,
4880                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4881                    );
4882                    self.buffers.insert(buffer.clone());
4883                    buffer
4884                } else {
4885                    operations.set(operations.get() + 1);
4886
4887                    self.buffers
4888                        .iter()
4889                        .choose(&mut *rng.lock())
4890                        .unwrap()
4891                        .clone()
4892                };
4893
4894                let choice = rng.lock().gen_range(0..100);
4895                match choice {
4896                    0..=9 => {
4897                        cx.update(|cx| {
4898                            log::info!(
4899                                "Guest {}: dropping buffer {:?}",
4900                                guest_id,
4901                                buffer.read(cx).file().unwrap().full_path(cx)
4902                            );
4903                            self.buffers.remove(&buffer);
4904                            drop(buffer);
4905                        });
4906                    }
4907                    10..=19 => {
4908                        let completions = project.update(&mut cx, |project, cx| {
4909                            log::info!(
4910                                "Guest {}: requesting completions for buffer {} ({:?})",
4911                                guest_id,
4912                                buffer.read(cx).remote_id(),
4913                                buffer.read(cx).file().unwrap().full_path(cx)
4914                            );
4915                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4916                            project.completions(&buffer, offset, cx)
4917                        });
4918                        let completions = cx.background().spawn(async move {
4919                            completions.await.expect("completions request failed");
4920                        });
4921                        if rng.lock().gen_bool(0.3) {
4922                            log::info!("Guest {}: detaching completions request", guest_id);
4923                            completions.detach();
4924                        } else {
4925                            completions.await;
4926                        }
4927                    }
4928                    20..=29 => {
4929                        let code_actions = project.update(&mut cx, |project, cx| {
4930                            log::info!(
4931                                "Guest {}: requesting code actions for buffer {} ({:?})",
4932                                guest_id,
4933                                buffer.read(cx).remote_id(),
4934                                buffer.read(cx).file().unwrap().full_path(cx)
4935                            );
4936                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4937                            project.code_actions(&buffer, range, cx)
4938                        });
4939                        let code_actions = cx.background().spawn(async move {
4940                            code_actions.await.expect("code actions request failed");
4941                        });
4942                        if rng.lock().gen_bool(0.3) {
4943                            log::info!("Guest {}: detaching code actions request", guest_id);
4944                            code_actions.detach();
4945                        } else {
4946                            code_actions.await;
4947                        }
4948                    }
4949                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4950                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4951                            log::info!(
4952                                "Guest {}: saving buffer {} ({:?})",
4953                                guest_id,
4954                                buffer.remote_id(),
4955                                buffer.file().unwrap().full_path(cx)
4956                            );
4957                            (buffer.version(), buffer.save(cx))
4958                        });
4959                        let save = cx.background().spawn(async move {
4960                            let (saved_version, _) = save.await.expect("save request failed");
4961                            assert!(saved_version.observed_all(&requested_version));
4962                        });
4963                        if rng.lock().gen_bool(0.3) {
4964                            log::info!("Guest {}: detaching save request", guest_id);
4965                            save.detach();
4966                        } else {
4967                            save.await;
4968                        }
4969                    }
4970                    40..=44 => {
4971                        let prepare_rename = project.update(&mut cx, |project, cx| {
4972                            log::info!(
4973                                "Guest {}: preparing rename for buffer {} ({:?})",
4974                                guest_id,
4975                                buffer.read(cx).remote_id(),
4976                                buffer.read(cx).file().unwrap().full_path(cx)
4977                            );
4978                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4979                            project.prepare_rename(buffer, offset, cx)
4980                        });
4981                        let prepare_rename = cx.background().spawn(async move {
4982                            prepare_rename.await.expect("prepare rename request failed");
4983                        });
4984                        if rng.lock().gen_bool(0.3) {
4985                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4986                            prepare_rename.detach();
4987                        } else {
4988                            prepare_rename.await;
4989                        }
4990                    }
4991                    45..=49 => {
4992                        let definitions = project.update(&mut cx, |project, cx| {
4993                            log::info!(
4994                                "Guest {}: requesting definitions for buffer {} ({:?})",
4995                                guest_id,
4996                                buffer.read(cx).remote_id(),
4997                                buffer.read(cx).file().unwrap().full_path(cx)
4998                            );
4999                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5000                            project.definition(&buffer, offset, cx)
5001                        });
5002                        let definitions = cx.background().spawn(async move {
5003                            definitions.await.expect("definitions request failed")
5004                        });
5005                        if rng.lock().gen_bool(0.3) {
5006                            log::info!("Guest {}: detaching definitions request", guest_id);
5007                            definitions.detach();
5008                        } else {
5009                            self.buffers
5010                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5011                        }
5012                    }
5013                    50..=54 => {
5014                        let highlights = project.update(&mut cx, |project, cx| {
5015                            log::info!(
5016                                "Guest {}: requesting highlights for buffer {} ({:?})",
5017                                guest_id,
5018                                buffer.read(cx).remote_id(),
5019                                buffer.read(cx).file().unwrap().full_path(cx)
5020                            );
5021                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5022                            project.document_highlights(&buffer, offset, cx)
5023                        });
5024                        let highlights = cx.background().spawn(async move {
5025                            highlights.await.expect("highlights request failed");
5026                        });
5027                        if rng.lock().gen_bool(0.3) {
5028                            log::info!("Guest {}: detaching highlights request", guest_id);
5029                            highlights.detach();
5030                        } else {
5031                            highlights.await;
5032                        }
5033                    }
5034                    55..=59 => {
5035                        let search = project.update(&mut cx, |project, cx| {
5036                            let query = rng.lock().gen_range('a'..='z');
5037                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5038                            project.search(SearchQuery::text(query, false, false), cx)
5039                        });
5040                        let search = cx
5041                            .background()
5042                            .spawn(async move { search.await.expect("search request failed") });
5043                        if rng.lock().gen_bool(0.3) {
5044                            log::info!("Guest {}: detaching search request", guest_id);
5045                            search.detach();
5046                        } else {
5047                            self.buffers.extend(search.await.into_keys());
5048                        }
5049                    }
5050                    _ => {
5051                        buffer.update(&mut cx, |buffer, cx| {
5052                            log::info!(
5053                                "Guest {}: updating buffer {} ({:?})",
5054                                guest_id,
5055                                buffer.remote_id(),
5056                                buffer.file().unwrap().full_path(cx)
5057                            );
5058                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5059                        });
5060                    }
5061                }
5062                cx.background().simulate_random_delay().await;
5063            }
5064
5065            log::info!("Guest {} done", guest_id);
5066
5067            self.project = Some(project);
5068            (self, cx)
5069        }
5070    }
5071
5072    impl Drop for TestClient {
5073        fn drop(&mut self) {
5074            self.client.tear_down();
5075        }
5076    }
5077
5078    impl Executor for Arc<gpui::executor::Background> {
5079        type Timer = gpui::executor::Timer;
5080
5081        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5082            self.spawn(future).detach();
5083        }
5084
5085        fn timer(&self, duration: Duration) -> Self::Timer {
5086            self.as_ref().timer(duration)
5087        }
5088    }
5089
5090    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5091        channel
5092            .messages()
5093            .cursor::<()>()
5094            .map(|m| {
5095                (
5096                    m.sender.github_login.clone(),
5097                    m.body.clone(),
5098                    m.is_pending(),
5099                )
5100            })
5101            .collect()
5102    }
5103
5104    struct EmptyView;
5105
5106    impl gpui::Entity for EmptyView {
5107        type Event = ();
5108    }
5109
5110    impl gpui::View for EmptyView {
5111        fn ui_name() -> &'static str {
5112            "empty view"
5113        }
5114
5115        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5116            gpui::Element::boxed(gpui::elements::Empty)
5117        }
5118    }
5119}