rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::update_diagnostic_summary)
  87            .add_message_handler(Server::disk_based_diagnostics_updating)
  88            .add_message_handler(Server::disk_based_diagnostics_updated)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  96            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  97            .add_request_handler(
  98                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  99            )
 100            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 101            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 102            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 105            .add_message_handler(Server::close_buffer)
 106            .add_request_handler(Server::update_buffer)
 107            .add_message_handler(Server::update_buffer_file)
 108            .add_message_handler(Server::buffer_reloaded)
 109            .add_message_handler(Server::buffer_saved)
 110            .add_request_handler(Server::save_buffer)
 111            .add_request_handler(Server::get_channels)
 112            .add_request_handler(Server::get_users)
 113            .add_request_handler(Server::join_channel)
 114            .add_message_handler(Server::leave_channel)
 115            .add_request_handler(Server::send_channel_message)
 116            .add_request_handler(Server::get_channel_messages);
 117
 118        Arc::new(server)
 119    }
 120
 121    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 122    where
 123        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 124        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 125        M: EnvelopedMessage,
 126    {
 127        let prev_handler = self.handlers.insert(
 128            TypeId::of::<M>(),
 129            Box::new(move |server, envelope| {
 130                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 131                (handler)(server, *envelope).boxed()
 132            }),
 133        );
 134        if prev_handler.is_some() {
 135            panic!("registered a handler for the same message twice");
 136        }
 137        self
 138    }
 139
 140    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 141    where
 142        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 143        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 144        M: RequestMessage,
 145    {
 146        self.add_message_handler(move |server, envelope| {
 147            let receipt = envelope.receipt();
 148            let response = (handler)(server.clone(), envelope);
 149            async move {
 150                match response.await {
 151                    Ok(response) => {
 152                        server.peer.respond(receipt, response)?;
 153                        Ok(())
 154                    }
 155                    Err(error) => {
 156                        server.peer.respond_with_error(
 157                            receipt,
 158                            proto::Error {
 159                                message: error.to_string(),
 160                            },
 161                        )?;
 162                        Err(error)
 163                    }
 164                }
 165            }
 166        })
 167    }
 168
 169    pub fn handle_connection<E: Executor>(
 170        self: &Arc<Self>,
 171        connection: Connection,
 172        addr: String,
 173        user_id: UserId,
 174        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 175        executor: E,
 176    ) -> impl Future<Output = ()> {
 177        let mut this = self.clone();
 178        async move {
 179            let (connection_id, handle_io, mut incoming_rx) = this
 180                .peer
 181                .add_connection(connection, {
 182                    let executor = executor.clone();
 183                    move |duration| {
 184                        let timer = executor.timer(duration);
 185                        async move {
 186                            timer.await;
 187                        }
 188                    }
 189                })
 190                .await;
 191
 192            if let Some(send_connection_id) = send_connection_id.as_mut() {
 193                let _ = send_connection_id.send(connection_id).await;
 194            }
 195
 196            this.state_mut().add_connection(connection_id, user_id);
 197            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 198                log::error!("error updating contacts for {:?}: {}", user_id, err);
 199            }
 200
 201            let handle_io = handle_io.fuse();
 202            futures::pin_mut!(handle_io);
 203            loop {
 204                let next_message = incoming_rx.next().fuse();
 205                futures::pin_mut!(next_message);
 206                futures::select_biased! {
 207                    result = handle_io => {
 208                        if let Err(err) = result {
 209                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 210                        }
 211                        break;
 212                    }
 213                    message = next_message => {
 214                        if let Some(message) = message {
 215                            let start_time = Instant::now();
 216                            let type_name = message.payload_type_name();
 217                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 218                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 219                                let notifications = this.notifications.clone();
 220                                let is_background = message.is_background();
 221                                let handle_message = (handler)(this.clone(), message);
 222                                let handle_message = async move {
 223                                    if let Err(err) = handle_message.await {
 224                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 225                                    } else {
 226                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 227                                    }
 228                                    if let Some(mut notifications) = notifications {
 229                                        let _ = notifications.send(()).await;
 230                                    }
 231                                };
 232                                if is_background {
 233                                    executor.spawn_detached(handle_message);
 234                                } else {
 235                                    handle_message.await;
 236                                }
 237                            } else {
 238                                log::warn!("unhandled message: {}", type_name);
 239                            }
 240                        } else {
 241                            log::info!("rpc connection closed {:?}", addr);
 242                            break;
 243                        }
 244                    }
 245                }
 246            }
 247
 248            if let Err(err) = this.sign_out(connection_id).await {
 249                log::error!("error signing out connection {:?} - {:?}", addr, err);
 250            }
 251        }
 252    }
 253
 254    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 255        self.peer.disconnect(connection_id);
 256        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 257
 258        for (project_id, project) in removed_connection.hosted_projects {
 259            if let Some(share) = project.share {
 260                broadcast(
 261                    connection_id,
 262                    share.guests.keys().copied().collect(),
 263                    |conn_id| {
 264                        self.peer
 265                            .send(conn_id, proto::UnshareProject { project_id })
 266                    },
 267                )?;
 268            }
 269        }
 270
 271        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 272            broadcast(connection_id, peer_ids, |conn_id| {
 273                self.peer.send(
 274                    conn_id,
 275                    proto::RemoveProjectCollaborator {
 276                        project_id,
 277                        peer_id: connection_id.0,
 278                    },
 279                )
 280            })?;
 281        }
 282
 283        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 284        Ok(())
 285    }
 286
 287    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 288        Ok(proto::Ack {})
 289    }
 290
 291    async fn register_project(
 292        mut self: Arc<Server>,
 293        request: TypedEnvelope<proto::RegisterProject>,
 294    ) -> tide::Result<proto::RegisterProjectResponse> {
 295        let project_id = {
 296            let mut state = self.state_mut();
 297            let user_id = state.user_id_for_connection(request.sender_id)?;
 298            state.register_project(request.sender_id, user_id)
 299        };
 300        Ok(proto::RegisterProjectResponse { project_id })
 301    }
 302
 303    async fn unregister_project(
 304        mut self: Arc<Server>,
 305        request: TypedEnvelope<proto::UnregisterProject>,
 306    ) -> tide::Result<()> {
 307        let project = self
 308            .state_mut()
 309            .unregister_project(request.payload.project_id, request.sender_id)?;
 310        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 311        Ok(())
 312    }
 313
 314    async fn share_project(
 315        mut self: Arc<Server>,
 316        request: TypedEnvelope<proto::ShareProject>,
 317    ) -> tide::Result<proto::Ack> {
 318        self.state_mut()
 319            .share_project(request.payload.project_id, request.sender_id);
 320        Ok(proto::Ack {})
 321    }
 322
 323    async fn unshare_project(
 324        mut self: Arc<Server>,
 325        request: TypedEnvelope<proto::UnshareProject>,
 326    ) -> tide::Result<()> {
 327        let project_id = request.payload.project_id;
 328        let project = self
 329            .state_mut()
 330            .unshare_project(project_id, request.sender_id)?;
 331
 332        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 333            self.peer
 334                .send(conn_id, proto::UnshareProject { project_id })
 335        })?;
 336        self.update_contacts_for_users(&project.authorized_user_ids)?;
 337        Ok(())
 338    }
 339
 340    async fn join_project(
 341        mut self: Arc<Server>,
 342        request: TypedEnvelope<proto::JoinProject>,
 343    ) -> tide::Result<proto::JoinProjectResponse> {
 344        let project_id = request.payload.project_id;
 345
 346        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 347        let (response, connection_ids, contact_user_ids) = self
 348            .state_mut()
 349            .join_project(request.sender_id, user_id, project_id)
 350            .and_then(|joined| {
 351                let share = joined.project.share()?;
 352                let peer_count = share.guests.len();
 353                let mut collaborators = Vec::with_capacity(peer_count);
 354                collaborators.push(proto::Collaborator {
 355                    peer_id: joined.project.host_connection_id.0,
 356                    replica_id: 0,
 357                    user_id: joined.project.host_user_id.to_proto(),
 358                });
 359                let worktrees = share
 360                    .worktrees
 361                    .iter()
 362                    .filter_map(|(id, shared_worktree)| {
 363                        let worktree = joined.project.worktrees.get(&id)?;
 364                        Some(proto::Worktree {
 365                            id: *id,
 366                            root_name: worktree.root_name.clone(),
 367                            entries: shared_worktree.entries.values().cloned().collect(),
 368                            diagnostic_summaries: shared_worktree
 369                                .diagnostic_summaries
 370                                .values()
 371                                .cloned()
 372                                .collect(),
 373                            visible: worktree.visible,
 374                        })
 375                    })
 376                    .collect();
 377                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 378                    if *peer_conn_id != request.sender_id {
 379                        collaborators.push(proto::Collaborator {
 380                            peer_id: peer_conn_id.0,
 381                            replica_id: *peer_replica_id as u32,
 382                            user_id: peer_user_id.to_proto(),
 383                        });
 384                    }
 385                }
 386                let response = proto::JoinProjectResponse {
 387                    worktrees,
 388                    replica_id: joined.replica_id as u32,
 389                    collaborators,
 390                };
 391                let connection_ids = joined.project.connection_ids();
 392                let contact_user_ids = joined.project.authorized_user_ids();
 393                Ok((response, connection_ids, contact_user_ids))
 394            })?;
 395
 396        broadcast(request.sender_id, connection_ids, |conn_id| {
 397            self.peer.send(
 398                conn_id,
 399                proto::AddProjectCollaborator {
 400                    project_id,
 401                    collaborator: Some(proto::Collaborator {
 402                        peer_id: request.sender_id.0,
 403                        replica_id: response.replica_id,
 404                        user_id: user_id.to_proto(),
 405                    }),
 406                },
 407            )
 408        })?;
 409        self.update_contacts_for_users(&contact_user_ids)?;
 410        Ok(response)
 411    }
 412
 413    async fn leave_project(
 414        mut self: Arc<Server>,
 415        request: TypedEnvelope<proto::LeaveProject>,
 416    ) -> tide::Result<()> {
 417        let sender_id = request.sender_id;
 418        let project_id = request.payload.project_id;
 419        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 420
 421        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 422            self.peer.send(
 423                conn_id,
 424                proto::RemoveProjectCollaborator {
 425                    project_id,
 426                    peer_id: sender_id.0,
 427                },
 428            )
 429        })?;
 430        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 431
 432        Ok(())
 433    }
 434
 435    async fn register_worktree(
 436        mut self: Arc<Server>,
 437        request: TypedEnvelope<proto::RegisterWorktree>,
 438    ) -> tide::Result<proto::Ack> {
 439        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 440
 441        let mut contact_user_ids = HashSet::default();
 442        contact_user_ids.insert(host_user_id);
 443        for github_login in &request.payload.authorized_logins {
 444            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 445            contact_user_ids.insert(contact_user_id);
 446        }
 447
 448        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 449        let guest_connection_ids;
 450        {
 451            let mut state = self.state_mut();
 452            guest_connection_ids = state
 453                .read_project(request.payload.project_id, request.sender_id)?
 454                .guest_connection_ids();
 455            state.register_worktree(
 456                request.payload.project_id,
 457                request.payload.worktree_id,
 458                request.sender_id,
 459                Worktree {
 460                    authorized_user_ids: contact_user_ids.clone(),
 461                    root_name: request.payload.root_name.clone(),
 462                    visible: request.payload.visible,
 463                },
 464            )?;
 465        }
 466        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 467            self.peer
 468                .forward_send(request.sender_id, connection_id, request.payload.clone())
 469        })?;
 470        self.update_contacts_for_users(&contact_user_ids)?;
 471        Ok(proto::Ack {})
 472    }
 473
 474    async fn unregister_worktree(
 475        mut self: Arc<Server>,
 476        request: TypedEnvelope<proto::UnregisterWorktree>,
 477    ) -> tide::Result<()> {
 478        let project_id = request.payload.project_id;
 479        let worktree_id = request.payload.worktree_id;
 480        let (worktree, guest_connection_ids) =
 481            self.state_mut()
 482                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 483        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 484            self.peer.send(
 485                conn_id,
 486                proto::UnregisterWorktree {
 487                    project_id,
 488                    worktree_id,
 489                },
 490            )
 491        })?;
 492        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 493        Ok(())
 494    }
 495
 496    async fn update_worktree(
 497        mut self: Arc<Server>,
 498        request: TypedEnvelope<proto::UpdateWorktree>,
 499    ) -> tide::Result<proto::Ack> {
 500        let connection_ids = self.state_mut().update_worktree(
 501            request.sender_id,
 502            request.payload.project_id,
 503            request.payload.worktree_id,
 504            &request.payload.removed_entries,
 505            &request.payload.updated_entries,
 506        )?;
 507
 508        broadcast(request.sender_id, connection_ids, |connection_id| {
 509            self.peer
 510                .forward_send(request.sender_id, connection_id, request.payload.clone())
 511        })?;
 512
 513        Ok(proto::Ack {})
 514    }
 515
 516    async fn update_diagnostic_summary(
 517        mut self: Arc<Server>,
 518        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 519    ) -> tide::Result<()> {
 520        let summary = request
 521            .payload
 522            .summary
 523            .clone()
 524            .ok_or_else(|| anyhow!("invalid summary"))?;
 525        let receiver_ids = self.state_mut().update_diagnostic_summary(
 526            request.payload.project_id,
 527            request.payload.worktree_id,
 528            request.sender_id,
 529            summary,
 530        )?;
 531
 532        broadcast(request.sender_id, receiver_ids, |connection_id| {
 533            self.peer
 534                .forward_send(request.sender_id, connection_id, request.payload.clone())
 535        })?;
 536        Ok(())
 537    }
 538
 539    async fn disk_based_diagnostics_updating(
 540        self: Arc<Server>,
 541        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 542    ) -> tide::Result<()> {
 543        let receiver_ids = self
 544            .state()
 545            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 546        broadcast(request.sender_id, receiver_ids, |connection_id| {
 547            self.peer
 548                .forward_send(request.sender_id, connection_id, request.payload.clone())
 549        })?;
 550        Ok(())
 551    }
 552
 553    async fn disk_based_diagnostics_updated(
 554        self: Arc<Server>,
 555        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 556    ) -> tide::Result<()> {
 557        let receiver_ids = self
 558            .state()
 559            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 560        broadcast(request.sender_id, receiver_ids, |connection_id| {
 561            self.peer
 562                .forward_send(request.sender_id, connection_id, request.payload.clone())
 563        })?;
 564        Ok(())
 565    }
 566
 567    async fn forward_project_request<T>(
 568        self: Arc<Server>,
 569        request: TypedEnvelope<T>,
 570    ) -> tide::Result<T::Response>
 571    where
 572        T: EntityMessage + RequestMessage,
 573    {
 574        let host_connection_id = self
 575            .state()
 576            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 577            .host_connection_id;
 578        Ok(self
 579            .peer
 580            .forward_request(request.sender_id, host_connection_id, request.payload)
 581            .await?)
 582    }
 583
 584    async fn close_buffer(
 585        self: Arc<Server>,
 586        request: TypedEnvelope<proto::CloseBuffer>,
 587    ) -> tide::Result<()> {
 588        let host_connection_id = self
 589            .state()
 590            .read_project(request.payload.project_id, request.sender_id)?
 591            .host_connection_id;
 592        self.peer
 593            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 594        Ok(())
 595    }
 596
 597    async fn save_buffer(
 598        self: Arc<Server>,
 599        request: TypedEnvelope<proto::SaveBuffer>,
 600    ) -> tide::Result<proto::BufferSaved> {
 601        let host;
 602        let mut guests;
 603        {
 604            let state = self.state();
 605            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 606            host = project.host_connection_id;
 607            guests = project.guest_connection_ids()
 608        }
 609
 610        let response = self
 611            .peer
 612            .forward_request(request.sender_id, host, request.payload.clone())
 613            .await?;
 614
 615        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 616        broadcast(host, guests, |conn_id| {
 617            self.peer.forward_send(host, conn_id, response.clone())
 618        })?;
 619
 620        Ok(response)
 621    }
 622
 623    async fn update_buffer(
 624        self: Arc<Server>,
 625        request: TypedEnvelope<proto::UpdateBuffer>,
 626    ) -> tide::Result<proto::Ack> {
 627        let receiver_ids = self
 628            .state()
 629            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 630        broadcast(request.sender_id, receiver_ids, |connection_id| {
 631            self.peer
 632                .forward_send(request.sender_id, connection_id, request.payload.clone())
 633        })?;
 634        Ok(proto::Ack {})
 635    }
 636
 637    async fn update_buffer_file(
 638        self: Arc<Server>,
 639        request: TypedEnvelope<proto::UpdateBufferFile>,
 640    ) -> tide::Result<()> {
 641        let receiver_ids = self
 642            .state()
 643            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 644        broadcast(request.sender_id, receiver_ids, |connection_id| {
 645            self.peer
 646                .forward_send(request.sender_id, connection_id, request.payload.clone())
 647        })?;
 648        Ok(())
 649    }
 650
 651    async fn buffer_reloaded(
 652        self: Arc<Server>,
 653        request: TypedEnvelope<proto::BufferReloaded>,
 654    ) -> tide::Result<()> {
 655        let receiver_ids = self
 656            .state()
 657            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 658        broadcast(request.sender_id, receiver_ids, |connection_id| {
 659            self.peer
 660                .forward_send(request.sender_id, connection_id, request.payload.clone())
 661        })?;
 662        Ok(())
 663    }
 664
 665    async fn buffer_saved(
 666        self: Arc<Server>,
 667        request: TypedEnvelope<proto::BufferSaved>,
 668    ) -> tide::Result<()> {
 669        let receiver_ids = self
 670            .state()
 671            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 672        broadcast(request.sender_id, receiver_ids, |connection_id| {
 673            self.peer
 674                .forward_send(request.sender_id, connection_id, request.payload.clone())
 675        })?;
 676        Ok(())
 677    }
 678
 679    async fn get_channels(
 680        self: Arc<Server>,
 681        request: TypedEnvelope<proto::GetChannels>,
 682    ) -> tide::Result<proto::GetChannelsResponse> {
 683        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 684        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 685        Ok(proto::GetChannelsResponse {
 686            channels: channels
 687                .into_iter()
 688                .map(|chan| proto::Channel {
 689                    id: chan.id.to_proto(),
 690                    name: chan.name,
 691                })
 692                .collect(),
 693        })
 694    }
 695
 696    async fn get_users(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::GetUsers>,
 699    ) -> tide::Result<proto::GetUsersResponse> {
 700        let user_ids = request
 701            .payload
 702            .user_ids
 703            .into_iter()
 704            .map(UserId::from_proto)
 705            .collect();
 706        let users = self
 707            .app_state
 708            .db
 709            .get_users_by_ids(user_ids)
 710            .await?
 711            .into_iter()
 712            .map(|user| proto::User {
 713                id: user.id.to_proto(),
 714                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 715                github_login: user.github_login,
 716            })
 717            .collect();
 718        Ok(proto::GetUsersResponse { users })
 719    }
 720
 721    fn update_contacts_for_users<'a>(
 722        self: &Arc<Server>,
 723        user_ids: impl IntoIterator<Item = &'a UserId>,
 724    ) -> anyhow::Result<()> {
 725        let mut result = Ok(());
 726        let state = self.state();
 727        for user_id in user_ids {
 728            let contacts = state.contacts_for_user(*user_id);
 729            for connection_id in state.connection_ids_for_user(*user_id) {
 730                if let Err(error) = self.peer.send(
 731                    connection_id,
 732                    proto::UpdateContacts {
 733                        contacts: contacts.clone(),
 734                    },
 735                ) {
 736                    result = Err(error);
 737                }
 738            }
 739        }
 740        result
 741    }
 742
 743    async fn join_channel(
 744        mut self: Arc<Self>,
 745        request: TypedEnvelope<proto::JoinChannel>,
 746    ) -> tide::Result<proto::JoinChannelResponse> {
 747        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 748        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 749        if !self
 750            .app_state
 751            .db
 752            .can_user_access_channel(user_id, channel_id)
 753            .await?
 754        {
 755            Err(anyhow!("access denied"))?;
 756        }
 757
 758        self.state_mut().join_channel(request.sender_id, channel_id);
 759        let messages = self
 760            .app_state
 761            .db
 762            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 763            .await?
 764            .into_iter()
 765            .map(|msg| proto::ChannelMessage {
 766                id: msg.id.to_proto(),
 767                body: msg.body,
 768                timestamp: msg.sent_at.unix_timestamp() as u64,
 769                sender_id: msg.sender_id.to_proto(),
 770                nonce: Some(msg.nonce.as_u128().into()),
 771            })
 772            .collect::<Vec<_>>();
 773        Ok(proto::JoinChannelResponse {
 774            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 775            messages,
 776        })
 777    }
 778
 779    async fn leave_channel(
 780        mut self: Arc<Self>,
 781        request: TypedEnvelope<proto::LeaveChannel>,
 782    ) -> tide::Result<()> {
 783        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 784        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 785        if !self
 786            .app_state
 787            .db
 788            .can_user_access_channel(user_id, channel_id)
 789            .await?
 790        {
 791            Err(anyhow!("access denied"))?;
 792        }
 793
 794        self.state_mut()
 795            .leave_channel(request.sender_id, channel_id);
 796
 797        Ok(())
 798    }
 799
 800    async fn send_channel_message(
 801        self: Arc<Self>,
 802        request: TypedEnvelope<proto::SendChannelMessage>,
 803    ) -> tide::Result<proto::SendChannelMessageResponse> {
 804        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 805        let user_id;
 806        let connection_ids;
 807        {
 808            let state = self.state();
 809            user_id = state.user_id_for_connection(request.sender_id)?;
 810            connection_ids = state.channel_connection_ids(channel_id)?;
 811        }
 812
 813        // Validate the message body.
 814        let body = request.payload.body.trim().to_string();
 815        if body.len() > MAX_MESSAGE_LEN {
 816            return Err(anyhow!("message is too long"))?;
 817        }
 818        if body.is_empty() {
 819            return Err(anyhow!("message can't be blank"))?;
 820        }
 821
 822        let timestamp = OffsetDateTime::now_utc();
 823        let nonce = request
 824            .payload
 825            .nonce
 826            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 827
 828        let message_id = self
 829            .app_state
 830            .db
 831            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 832            .await?
 833            .to_proto();
 834        let message = proto::ChannelMessage {
 835            sender_id: user_id.to_proto(),
 836            id: message_id,
 837            body,
 838            timestamp: timestamp.unix_timestamp() as u64,
 839            nonce: Some(nonce),
 840        };
 841        broadcast(request.sender_id, connection_ids, |conn_id| {
 842            self.peer.send(
 843                conn_id,
 844                proto::ChannelMessageSent {
 845                    channel_id: channel_id.to_proto(),
 846                    message: Some(message.clone()),
 847                },
 848            )
 849        })?;
 850        Ok(proto::SendChannelMessageResponse {
 851            message: Some(message),
 852        })
 853    }
 854
 855    async fn get_channel_messages(
 856        self: Arc<Self>,
 857        request: TypedEnvelope<proto::GetChannelMessages>,
 858    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 859        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 860        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 861        if !self
 862            .app_state
 863            .db
 864            .can_user_access_channel(user_id, channel_id)
 865            .await?
 866        {
 867            Err(anyhow!("access denied"))?;
 868        }
 869
 870        let messages = self
 871            .app_state
 872            .db
 873            .get_channel_messages(
 874                channel_id,
 875                MESSAGE_COUNT_PER_PAGE,
 876                Some(MessageId::from_proto(request.payload.before_message_id)),
 877            )
 878            .await?
 879            .into_iter()
 880            .map(|msg| proto::ChannelMessage {
 881                id: msg.id.to_proto(),
 882                body: msg.body,
 883                timestamp: msg.sent_at.unix_timestamp() as u64,
 884                sender_id: msg.sender_id.to_proto(),
 885                nonce: Some(msg.nonce.as_u128().into()),
 886            })
 887            .collect::<Vec<_>>();
 888
 889        Ok(proto::GetChannelMessagesResponse {
 890            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 891            messages,
 892        })
 893    }
 894
 895    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 896        self.store.read()
 897    }
 898
 899    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 900        self.store.write()
 901    }
 902}
 903
 904impl Executor for RealExecutor {
 905    type Timer = Timer;
 906
 907    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 908        task::spawn(future);
 909    }
 910
 911    fn timer(&self, duration: Duration) -> Self::Timer {
 912        Timer::after(duration)
 913    }
 914}
 915
 916fn broadcast<F>(
 917    sender_id: ConnectionId,
 918    receiver_ids: Vec<ConnectionId>,
 919    mut f: F,
 920) -> anyhow::Result<()>
 921where
 922    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 923{
 924    let mut result = Ok(());
 925    for receiver_id in receiver_ids {
 926        if receiver_id != sender_id {
 927            if let Err(error) = f(receiver_id) {
 928                if result.is_ok() {
 929                    result = Err(error);
 930                }
 931            }
 932        }
 933    }
 934    result
 935}
 936
 937pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 938    let server = Server::new(app.state().clone(), rpc.clone(), None);
 939    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 940        let server = server.clone();
 941        async move {
 942            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 943
 944            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 945            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 946            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 947            let client_protocol_version: Option<u32> = request
 948                .header("X-Zed-Protocol-Version")
 949                .and_then(|v| v.as_str().parse().ok());
 950
 951            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 952                return Ok(Response::new(StatusCode::UpgradeRequired));
 953            }
 954
 955            let header = match request.header("Sec-Websocket-Key") {
 956                Some(h) => h.as_str(),
 957                None => return Err(anyhow!("expected sec-websocket-key"))?,
 958            };
 959
 960            let user_id = process_auth_header(&request).await?;
 961
 962            let mut response = Response::new(StatusCode::SwitchingProtocols);
 963            response.insert_header(UPGRADE, "websocket");
 964            response.insert_header(CONNECTION, "Upgrade");
 965            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 966            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 967            response.insert_header("Sec-Websocket-Version", "13");
 968
 969            let http_res: &mut tide::http::Response = response.as_mut();
 970            let upgrade_receiver = http_res.recv_upgrade().await;
 971            let addr = request.remote().unwrap_or("unknown").to_string();
 972            task::spawn(async move {
 973                if let Some(stream) = upgrade_receiver.await {
 974                    server
 975                        .handle_connection(
 976                            Connection::new(
 977                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 978                            ),
 979                            addr,
 980                            user_id,
 981                            None,
 982                            RealExecutor,
 983                        )
 984                        .await;
 985                }
 986            });
 987
 988            Ok(response)
 989        }
 990    });
 991}
 992
 993fn header_contains_ignore_case<T>(
 994    request: &tide::Request<T>,
 995    header_name: HeaderName,
 996    value: &str,
 997) -> bool {
 998    request
 999        .header(header_name)
1000        .map(|h| {
1001            h.as_str()
1002                .split(',')
1003                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1004        })
1005        .unwrap_or(false)
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010    use super::*;
1011    use crate::{
1012        auth,
1013        db::{tests::TestDb, UserId},
1014        github, AppState, Config,
1015    };
1016    use ::rpc::Peer;
1017    use client::{
1018        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1019        EstablishConnectionError, UserStore,
1020    };
1021    use collections::BTreeMap;
1022    use editor::{
1023        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
1024        Redo, Rename, ToOffset, ToggleCodeActions, Undo,
1025    };
1026    use gpui::{executor, ModelHandle, TestAppContext};
1027    use language::{
1028        tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language, LanguageConfig,
1029        LanguageRegistry, LanguageServerConfig, Point, ToLspPosition,
1030    };
1031    use lsp;
1032    use parking_lot::Mutex;
1033    use postage::{barrier, watch};
1034    use project::{
1035        fs::{FakeFs, Fs as _},
1036        search::SearchQuery,
1037        DiagnosticSummary, Project, ProjectPath,
1038    };
1039    use rand::prelude::*;
1040    use rpc::PeerId;
1041    use serde_json::json;
1042    use sqlx::types::time::OffsetDateTime;
1043    use std::{
1044        cell::Cell,
1045        env,
1046        ops::Deref,
1047        path::{Path, PathBuf},
1048        rc::Rc,
1049        sync::{
1050            atomic::{AtomicBool, Ordering::SeqCst},
1051            Arc,
1052        },
1053        time::Duration,
1054    };
1055    use workspace::{Settings, Workspace, WorkspaceParams};
1056
1057    #[cfg(test)]
1058    #[ctor::ctor]
1059    fn init_logger() {
1060        if std::env::var("RUST_LOG").is_ok() {
1061            env_logger::init();
1062        }
1063    }
1064
1065    #[gpui::test(iterations = 10)]
1066    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1067        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1068        let lang_registry = Arc::new(LanguageRegistry::new());
1069        let fs = FakeFs::new(cx_a.background());
1070        cx_a.foreground().forbid_parking();
1071
1072        // Connect to a server as 2 clients.
1073        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1074        let client_a = server.create_client(cx_a, "user_a").await;
1075        let client_b = server.create_client(cx_b, "user_b").await;
1076
1077        // Share a project as client A
1078        fs.insert_tree(
1079            "/a",
1080            json!({
1081                ".zed.toml": r#"collaborators = ["user_b"]"#,
1082                "a.txt": "a-contents",
1083                "b.txt": "b-contents",
1084            }),
1085        )
1086        .await;
1087        let project_a = cx_a.update(|cx| {
1088            Project::local(
1089                client_a.clone(),
1090                client_a.user_store.clone(),
1091                lang_registry.clone(),
1092                fs.clone(),
1093                cx,
1094            )
1095        });
1096        let (worktree_a, _) = project_a
1097            .update(cx_a, |p, cx| {
1098                p.find_or_create_local_worktree("/a", true, cx)
1099            })
1100            .await
1101            .unwrap();
1102        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1103        worktree_a
1104            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1105            .await;
1106        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1107        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1108
1109        // Join that project as client B
1110        let project_b = Project::remote(
1111            project_id,
1112            client_b.clone(),
1113            client_b.user_store.clone(),
1114            lang_registry.clone(),
1115            fs.clone(),
1116            &mut cx_b.to_async(),
1117        )
1118        .await
1119        .unwrap();
1120
1121        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1122            assert_eq!(
1123                project
1124                    .collaborators()
1125                    .get(&client_a.peer_id)
1126                    .unwrap()
1127                    .user
1128                    .github_login,
1129                "user_a"
1130            );
1131            project.replica_id()
1132        });
1133        project_a
1134            .condition(&cx_a, |tree, _| {
1135                tree.collaborators()
1136                    .get(&client_b.peer_id)
1137                    .map_or(false, |collaborator| {
1138                        collaborator.replica_id == replica_id_b
1139                            && collaborator.user.github_login == "user_b"
1140                    })
1141            })
1142            .await;
1143
1144        // Open the same file as client B and client A.
1145        let buffer_b = project_b
1146            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1147            .await
1148            .unwrap();
1149        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1150        buffer_b.read_with(cx_b, |buf, cx| {
1151            assert_eq!(buf.read(cx).text(), "b-contents")
1152        });
1153        project_a.read_with(cx_a, |project, cx| {
1154            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1155        });
1156        let buffer_a = project_a
1157            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1158            .await
1159            .unwrap();
1160
1161        let editor_b = cx_b.add_view(window_b, |cx| {
1162            Editor::for_buffer(
1163                buffer_b,
1164                None,
1165                watch::channel_with(Settings::test(cx)).1,
1166                cx,
1167            )
1168        });
1169
1170        // TODO
1171        // // Create a selection set as client B and see that selection set as client A.
1172        // buffer_a
1173        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1174        //     .await;
1175
1176        // Edit the buffer as client B and see that edit as client A.
1177        editor_b.update(cx_b, |editor, cx| {
1178            editor.handle_input(&Input("ok, ".into()), cx)
1179        });
1180        buffer_a
1181            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1182            .await;
1183
1184        // TODO
1185        // // Remove the selection set as client B, see those selections disappear as client A.
1186        cx_b.update(move |_| drop(editor_b));
1187        // buffer_a
1188        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1189        //     .await;
1190
1191        // Dropping the client B's project removes client B from client A's collaborators.
1192        cx_b.update(move |_| drop(project_b));
1193        project_a
1194            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1195            .await;
1196    }
1197
1198    #[gpui::test(iterations = 10)]
1199    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1200        let lang_registry = Arc::new(LanguageRegistry::new());
1201        let fs = FakeFs::new(cx_a.background());
1202        cx_a.foreground().forbid_parking();
1203
1204        // Connect to a server as 2 clients.
1205        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1206        let client_a = server.create_client(cx_a, "user_a").await;
1207        let client_b = server.create_client(cx_b, "user_b").await;
1208
1209        // Share a project as client A
1210        fs.insert_tree(
1211            "/a",
1212            json!({
1213                ".zed.toml": r#"collaborators = ["user_b"]"#,
1214                "a.txt": "a-contents",
1215                "b.txt": "b-contents",
1216            }),
1217        )
1218        .await;
1219        let project_a = cx_a.update(|cx| {
1220            Project::local(
1221                client_a.clone(),
1222                client_a.user_store.clone(),
1223                lang_registry.clone(),
1224                fs.clone(),
1225                cx,
1226            )
1227        });
1228        let (worktree_a, _) = project_a
1229            .update(cx_a, |p, cx| {
1230                p.find_or_create_local_worktree("/a", true, cx)
1231            })
1232            .await
1233            .unwrap();
1234        worktree_a
1235            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1236            .await;
1237        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1238        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1239        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1240        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1241
1242        // Join that project as client B
1243        let project_b = Project::remote(
1244            project_id,
1245            client_b.clone(),
1246            client_b.user_store.clone(),
1247            lang_registry.clone(),
1248            fs.clone(),
1249            &mut cx_b.to_async(),
1250        )
1251        .await
1252        .unwrap();
1253        project_b
1254            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1255            .await
1256            .unwrap();
1257
1258        // Unshare the project as client A
1259        project_a
1260            .update(cx_a, |project, cx| project.unshare(cx))
1261            .await
1262            .unwrap();
1263        project_b
1264            .condition(cx_b, |project, _| project.is_read_only())
1265            .await;
1266        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1267        cx_b.update(|_| {
1268            drop(project_b);
1269        });
1270
1271        // Share the project again and ensure guests can still join.
1272        project_a
1273            .update(cx_a, |project, cx| project.share(cx))
1274            .await
1275            .unwrap();
1276        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1277
1278        let project_b2 = Project::remote(
1279            project_id,
1280            client_b.clone(),
1281            client_b.user_store.clone(),
1282            lang_registry.clone(),
1283            fs.clone(),
1284            &mut cx_b.to_async(),
1285        )
1286        .await
1287        .unwrap();
1288        project_b2
1289            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1290            .await
1291            .unwrap();
1292    }
1293
1294    #[gpui::test(iterations = 10)]
1295    async fn test_propagate_saves_and_fs_changes(
1296        cx_a: &mut TestAppContext,
1297        cx_b: &mut TestAppContext,
1298        cx_c: &mut TestAppContext,
1299    ) {
1300        let lang_registry = Arc::new(LanguageRegistry::new());
1301        let fs = FakeFs::new(cx_a.background());
1302        cx_a.foreground().forbid_parking();
1303
1304        // Connect to a server as 3 clients.
1305        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1306        let client_a = server.create_client(cx_a, "user_a").await;
1307        let client_b = server.create_client(cx_b, "user_b").await;
1308        let client_c = server.create_client(cx_c, "user_c").await;
1309
1310        // Share a worktree as client A.
1311        fs.insert_tree(
1312            "/a",
1313            json!({
1314                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1315                "file1": "",
1316                "file2": ""
1317            }),
1318        )
1319        .await;
1320        let project_a = cx_a.update(|cx| {
1321            Project::local(
1322                client_a.clone(),
1323                client_a.user_store.clone(),
1324                lang_registry.clone(),
1325                fs.clone(),
1326                cx,
1327            )
1328        });
1329        let (worktree_a, _) = project_a
1330            .update(cx_a, |p, cx| {
1331                p.find_or_create_local_worktree("/a", true, cx)
1332            })
1333            .await
1334            .unwrap();
1335        worktree_a
1336            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1337            .await;
1338        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1339        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1340        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1341
1342        // Join that worktree as clients B and C.
1343        let project_b = Project::remote(
1344            project_id,
1345            client_b.clone(),
1346            client_b.user_store.clone(),
1347            lang_registry.clone(),
1348            fs.clone(),
1349            &mut cx_b.to_async(),
1350        )
1351        .await
1352        .unwrap();
1353        let project_c = Project::remote(
1354            project_id,
1355            client_c.clone(),
1356            client_c.user_store.clone(),
1357            lang_registry.clone(),
1358            fs.clone(),
1359            &mut cx_c.to_async(),
1360        )
1361        .await
1362        .unwrap();
1363        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1364        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1365
1366        // Open and edit a buffer as both guests B and C.
1367        let buffer_b = project_b
1368            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1369            .await
1370            .unwrap();
1371        let buffer_c = project_c
1372            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1373            .await
1374            .unwrap();
1375        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1376        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1377
1378        // Open and edit that buffer as the host.
1379        let buffer_a = project_a
1380            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1381            .await
1382            .unwrap();
1383
1384        buffer_a
1385            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1386            .await;
1387        buffer_a.update(cx_a, |buf, cx| {
1388            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1389        });
1390
1391        // Wait for edits to propagate
1392        buffer_a
1393            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1394            .await;
1395        buffer_b
1396            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1397            .await;
1398        buffer_c
1399            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1400            .await;
1401
1402        // Edit the buffer as the host and concurrently save as guest B.
1403        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1404        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1405        save_b.await.unwrap();
1406        assert_eq!(
1407            fs.load("/a/file1".as_ref()).await.unwrap(),
1408            "hi-a, i-am-c, i-am-b, i-am-a"
1409        );
1410        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1411        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1412        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1413
1414        // Make changes on host's file system, see those changes on guest worktrees.
1415        fs.rename(
1416            "/a/file1".as_ref(),
1417            "/a/file1-renamed".as_ref(),
1418            Default::default(),
1419        )
1420        .await
1421        .unwrap();
1422
1423        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1424            .await
1425            .unwrap();
1426        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1427
1428        worktree_a
1429            .condition(&cx_a, |tree, _| {
1430                tree.paths()
1431                    .map(|p| p.to_string_lossy())
1432                    .collect::<Vec<_>>()
1433                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1434            })
1435            .await;
1436        worktree_b
1437            .condition(&cx_b, |tree, _| {
1438                tree.paths()
1439                    .map(|p| p.to_string_lossy())
1440                    .collect::<Vec<_>>()
1441                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1442            })
1443            .await;
1444        worktree_c
1445            .condition(&cx_c, |tree, _| {
1446                tree.paths()
1447                    .map(|p| p.to_string_lossy())
1448                    .collect::<Vec<_>>()
1449                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1450            })
1451            .await;
1452
1453        // Ensure buffer files are updated as well.
1454        buffer_a
1455            .condition(&cx_a, |buf, _| {
1456                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1457            })
1458            .await;
1459        buffer_b
1460            .condition(&cx_b, |buf, _| {
1461                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1462            })
1463            .await;
1464        buffer_c
1465            .condition(&cx_c, |buf, _| {
1466                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1467            })
1468            .await;
1469    }
1470
1471    #[gpui::test(iterations = 10)]
1472    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1473        cx_a.foreground().forbid_parking();
1474        let lang_registry = Arc::new(LanguageRegistry::new());
1475        let fs = FakeFs::new(cx_a.background());
1476
1477        // Connect to a server as 2 clients.
1478        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1479        let client_a = server.create_client(cx_a, "user_a").await;
1480        let client_b = server.create_client(cx_b, "user_b").await;
1481
1482        // Share a project as client A
1483        fs.insert_tree(
1484            "/dir",
1485            json!({
1486                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1487                "a.txt": "a-contents",
1488            }),
1489        )
1490        .await;
1491
1492        let project_a = cx_a.update(|cx| {
1493            Project::local(
1494                client_a.clone(),
1495                client_a.user_store.clone(),
1496                lang_registry.clone(),
1497                fs.clone(),
1498                cx,
1499            )
1500        });
1501        let (worktree_a, _) = project_a
1502            .update(cx_a, |p, cx| {
1503                p.find_or_create_local_worktree("/dir", true, cx)
1504            })
1505            .await
1506            .unwrap();
1507        worktree_a
1508            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1509            .await;
1510        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1511        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1512        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1513
1514        // Join that project as client B
1515        let project_b = Project::remote(
1516            project_id,
1517            client_b.clone(),
1518            client_b.user_store.clone(),
1519            lang_registry.clone(),
1520            fs.clone(),
1521            &mut cx_b.to_async(),
1522        )
1523        .await
1524        .unwrap();
1525
1526        // Open a buffer as client B
1527        let buffer_b = project_b
1528            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1529            .await
1530            .unwrap();
1531
1532        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1533        buffer_b.read_with(cx_b, |buf, _| {
1534            assert!(buf.is_dirty());
1535            assert!(!buf.has_conflict());
1536        });
1537
1538        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1539        buffer_b
1540            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1541            .await;
1542        buffer_b.read_with(cx_b, |buf, _| {
1543            assert!(!buf.has_conflict());
1544        });
1545
1546        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1547        buffer_b.read_with(cx_b, |buf, _| {
1548            assert!(buf.is_dirty());
1549            assert!(!buf.has_conflict());
1550        });
1551    }
1552
1553    #[gpui::test(iterations = 10)]
1554    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1555        cx_a.foreground().forbid_parking();
1556        let lang_registry = Arc::new(LanguageRegistry::new());
1557        let fs = FakeFs::new(cx_a.background());
1558
1559        // Connect to a server as 2 clients.
1560        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1561        let client_a = server.create_client(cx_a, "user_a").await;
1562        let client_b = server.create_client(cx_b, "user_b").await;
1563
1564        // Share a project as client A
1565        fs.insert_tree(
1566            "/dir",
1567            json!({
1568                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1569                "a.txt": "a-contents",
1570            }),
1571        )
1572        .await;
1573
1574        let project_a = cx_a.update(|cx| {
1575            Project::local(
1576                client_a.clone(),
1577                client_a.user_store.clone(),
1578                lang_registry.clone(),
1579                fs.clone(),
1580                cx,
1581            )
1582        });
1583        let (worktree_a, _) = project_a
1584            .update(cx_a, |p, cx| {
1585                p.find_or_create_local_worktree("/dir", true, cx)
1586            })
1587            .await
1588            .unwrap();
1589        worktree_a
1590            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1591            .await;
1592        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1593        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1594        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1595
1596        // Join that project as client B
1597        let project_b = Project::remote(
1598            project_id,
1599            client_b.clone(),
1600            client_b.user_store.clone(),
1601            lang_registry.clone(),
1602            fs.clone(),
1603            &mut cx_b.to_async(),
1604        )
1605        .await
1606        .unwrap();
1607        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1608
1609        // Open a buffer as client B
1610        let buffer_b = project_b
1611            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1612            .await
1613            .unwrap();
1614        buffer_b.read_with(cx_b, |buf, _| {
1615            assert!(!buf.is_dirty());
1616            assert!(!buf.has_conflict());
1617        });
1618
1619        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1620            .await
1621            .unwrap();
1622        buffer_b
1623            .condition(&cx_b, |buf, _| {
1624                buf.text() == "new contents" && !buf.is_dirty()
1625            })
1626            .await;
1627        buffer_b.read_with(cx_b, |buf, _| {
1628            assert!(!buf.has_conflict());
1629        });
1630    }
1631
1632    #[gpui::test(iterations = 10)]
1633    async fn test_editing_while_guest_opens_buffer(
1634        cx_a: &mut TestAppContext,
1635        cx_b: &mut TestAppContext,
1636    ) {
1637        cx_a.foreground().forbid_parking();
1638        let lang_registry = Arc::new(LanguageRegistry::new());
1639        let fs = FakeFs::new(cx_a.background());
1640
1641        // Connect to a server as 2 clients.
1642        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1643        let client_a = server.create_client(cx_a, "user_a").await;
1644        let client_b = server.create_client(cx_b, "user_b").await;
1645
1646        // Share a project as client A
1647        fs.insert_tree(
1648            "/dir",
1649            json!({
1650                ".zed.toml": r#"collaborators = ["user_b"]"#,
1651                "a.txt": "a-contents",
1652            }),
1653        )
1654        .await;
1655        let project_a = cx_a.update(|cx| {
1656            Project::local(
1657                client_a.clone(),
1658                client_a.user_store.clone(),
1659                lang_registry.clone(),
1660                fs.clone(),
1661                cx,
1662            )
1663        });
1664        let (worktree_a, _) = project_a
1665            .update(cx_a, |p, cx| {
1666                p.find_or_create_local_worktree("/dir", true, cx)
1667            })
1668            .await
1669            .unwrap();
1670        worktree_a
1671            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1672            .await;
1673        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1674        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1675        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1676
1677        // Join that project as client B
1678        let project_b = Project::remote(
1679            project_id,
1680            client_b.clone(),
1681            client_b.user_store.clone(),
1682            lang_registry.clone(),
1683            fs.clone(),
1684            &mut cx_b.to_async(),
1685        )
1686        .await
1687        .unwrap();
1688
1689        // Open a buffer as client A
1690        let buffer_a = project_a
1691            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1692            .await
1693            .unwrap();
1694
1695        // Start opening the same buffer as client B
1696        let buffer_b = cx_b
1697            .background()
1698            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1699
1700        // Edit the buffer as client A while client B is still opening it.
1701        cx_b.background().simulate_random_delay().await;
1702        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1703        cx_b.background().simulate_random_delay().await;
1704        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1705
1706        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1707        let buffer_b = buffer_b.await.unwrap();
1708        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1709    }
1710
1711    #[gpui::test(iterations = 10)]
1712    async fn test_leaving_worktree_while_opening_buffer(
1713        cx_a: &mut TestAppContext,
1714        cx_b: &mut TestAppContext,
1715    ) {
1716        cx_a.foreground().forbid_parking();
1717        let lang_registry = Arc::new(LanguageRegistry::new());
1718        let fs = FakeFs::new(cx_a.background());
1719
1720        // Connect to a server as 2 clients.
1721        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1722        let client_a = server.create_client(cx_a, "user_a").await;
1723        let client_b = server.create_client(cx_b, "user_b").await;
1724
1725        // Share a project as client A
1726        fs.insert_tree(
1727            "/dir",
1728            json!({
1729                ".zed.toml": r#"collaborators = ["user_b"]"#,
1730                "a.txt": "a-contents",
1731            }),
1732        )
1733        .await;
1734        let project_a = cx_a.update(|cx| {
1735            Project::local(
1736                client_a.clone(),
1737                client_a.user_store.clone(),
1738                lang_registry.clone(),
1739                fs.clone(),
1740                cx,
1741            )
1742        });
1743        let (worktree_a, _) = project_a
1744            .update(cx_a, |p, cx| {
1745                p.find_or_create_local_worktree("/dir", true, cx)
1746            })
1747            .await
1748            .unwrap();
1749        worktree_a
1750            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1751            .await;
1752        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1753        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1754        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1755
1756        // Join that project as client B
1757        let project_b = Project::remote(
1758            project_id,
1759            client_b.clone(),
1760            client_b.user_store.clone(),
1761            lang_registry.clone(),
1762            fs.clone(),
1763            &mut cx_b.to_async(),
1764        )
1765        .await
1766        .unwrap();
1767
1768        // See that a guest has joined as client A.
1769        project_a
1770            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1771            .await;
1772
1773        // Begin opening a buffer as client B, but leave the project before the open completes.
1774        let buffer_b = cx_b
1775            .background()
1776            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1777        cx_b.update(|_| drop(project_b));
1778        drop(buffer_b);
1779
1780        // See that the guest has left.
1781        project_a
1782            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1783            .await;
1784    }
1785
1786    #[gpui::test(iterations = 10)]
1787    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1788        cx_a.foreground().forbid_parking();
1789        let lang_registry = Arc::new(LanguageRegistry::new());
1790        let fs = FakeFs::new(cx_a.background());
1791
1792        // Connect to a server as 2 clients.
1793        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1794        let client_a = server.create_client(cx_a, "user_a").await;
1795        let client_b = server.create_client(cx_b, "user_b").await;
1796
1797        // Share a project as client A
1798        fs.insert_tree(
1799            "/a",
1800            json!({
1801                ".zed.toml": r#"collaborators = ["user_b"]"#,
1802                "a.txt": "a-contents",
1803                "b.txt": "b-contents",
1804            }),
1805        )
1806        .await;
1807        let project_a = cx_a.update(|cx| {
1808            Project::local(
1809                client_a.clone(),
1810                client_a.user_store.clone(),
1811                lang_registry.clone(),
1812                fs.clone(),
1813                cx,
1814            )
1815        });
1816        let (worktree_a, _) = project_a
1817            .update(cx_a, |p, cx| {
1818                p.find_or_create_local_worktree("/a", true, cx)
1819            })
1820            .await
1821            .unwrap();
1822        worktree_a
1823            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1824            .await;
1825        let project_id = project_a
1826            .update(cx_a, |project, _| project.next_remote_id())
1827            .await;
1828        project_a
1829            .update(cx_a, |project, cx| project.share(cx))
1830            .await
1831            .unwrap();
1832
1833        // Join that project as client B
1834        let _project_b = Project::remote(
1835            project_id,
1836            client_b.clone(),
1837            client_b.user_store.clone(),
1838            lang_registry.clone(),
1839            fs.clone(),
1840            &mut cx_b.to_async(),
1841        )
1842        .await
1843        .unwrap();
1844
1845        // Client A sees that a guest has joined.
1846        project_a
1847            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1848            .await;
1849
1850        // Drop client B's connection and ensure client A observes client B leaving the project.
1851        client_b.disconnect(&cx_b.to_async()).unwrap();
1852        project_a
1853            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1854            .await;
1855
1856        // Rejoin the project as client B
1857        let _project_b = Project::remote(
1858            project_id,
1859            client_b.clone(),
1860            client_b.user_store.clone(),
1861            lang_registry.clone(),
1862            fs.clone(),
1863            &mut cx_b.to_async(),
1864        )
1865        .await
1866        .unwrap();
1867
1868        // Client A sees that a guest has re-joined.
1869        project_a
1870            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1871            .await;
1872
1873        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1874        server.disconnect_client(client_b.current_user_id(cx_b));
1875        cx_a.foreground().advance_clock(Duration::from_secs(3));
1876        project_a
1877            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1878            .await;
1879    }
1880
1881    #[gpui::test(iterations = 10)]
1882    async fn test_collaborating_with_diagnostics(
1883        cx_a: &mut TestAppContext,
1884        cx_b: &mut TestAppContext,
1885    ) {
1886        cx_a.foreground().forbid_parking();
1887        let mut lang_registry = Arc::new(LanguageRegistry::new());
1888        let fs = FakeFs::new(cx_a.background());
1889
1890        // Set up a fake language server.
1891        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1892        Arc::get_mut(&mut lang_registry)
1893            .unwrap()
1894            .add(Arc::new(Language::new(
1895                LanguageConfig {
1896                    name: "Rust".into(),
1897                    path_suffixes: vec!["rs".to_string()],
1898                    language_server: Some(language_server_config),
1899                    ..Default::default()
1900                },
1901                Some(tree_sitter_rust::language()),
1902            )));
1903
1904        // Connect to a server as 2 clients.
1905        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1906        let client_a = server.create_client(cx_a, "user_a").await;
1907        let client_b = server.create_client(cx_b, "user_b").await;
1908
1909        // Share a project as client A
1910        fs.insert_tree(
1911            "/a",
1912            json!({
1913                ".zed.toml": r#"collaborators = ["user_b"]"#,
1914                "a.rs": "let one = two",
1915                "other.rs": "",
1916            }),
1917        )
1918        .await;
1919        let project_a = cx_a.update(|cx| {
1920            Project::local(
1921                client_a.clone(),
1922                client_a.user_store.clone(),
1923                lang_registry.clone(),
1924                fs.clone(),
1925                cx,
1926            )
1927        });
1928        let (worktree_a, _) = project_a
1929            .update(cx_a, |p, cx| {
1930                p.find_or_create_local_worktree("/a", true, cx)
1931            })
1932            .await
1933            .unwrap();
1934        worktree_a
1935            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1936            .await;
1937        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1938        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1939        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1940
1941        // Cause the language server to start.
1942        let _ = cx_a
1943            .background()
1944            .spawn(project_a.update(cx_a, |project, cx| {
1945                project.open_buffer(
1946                    ProjectPath {
1947                        worktree_id,
1948                        path: Path::new("other.rs").into(),
1949                    },
1950                    cx,
1951                )
1952            }))
1953            .await
1954            .unwrap();
1955
1956        // Simulate a language server reporting errors for a file.
1957        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1958        fake_language_server
1959            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
1960                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1961                version: None,
1962                diagnostics: vec![lsp::Diagnostic {
1963                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1964                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1965                    message: "message 1".to_string(),
1966                    ..Default::default()
1967                }],
1968            })
1969            .await;
1970
1971        // Wait for server to see the diagnostics update.
1972        server
1973            .condition(|store| {
1974                let worktree = store
1975                    .project(project_id)
1976                    .unwrap()
1977                    .share
1978                    .as_ref()
1979                    .unwrap()
1980                    .worktrees
1981                    .get(&worktree_id.to_proto())
1982                    .unwrap();
1983
1984                !worktree.diagnostic_summaries.is_empty()
1985            })
1986            .await;
1987
1988        // Join the worktree as client B.
1989        let project_b = Project::remote(
1990            project_id,
1991            client_b.clone(),
1992            client_b.user_store.clone(),
1993            lang_registry.clone(),
1994            fs.clone(),
1995            &mut cx_b.to_async(),
1996        )
1997        .await
1998        .unwrap();
1999
2000        project_b.read_with(cx_b, |project, cx| {
2001            assert_eq!(
2002                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2003                &[(
2004                    ProjectPath {
2005                        worktree_id,
2006                        path: Arc::from(Path::new("a.rs")),
2007                    },
2008                    DiagnosticSummary {
2009                        error_count: 1,
2010                        warning_count: 0,
2011                        ..Default::default()
2012                    },
2013                )]
2014            )
2015        });
2016
2017        // Simulate a language server reporting more errors for a file.
2018        fake_language_server
2019            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2020                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2021                version: None,
2022                diagnostics: vec![
2023                    lsp::Diagnostic {
2024                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2025                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2026                        message: "message 1".to_string(),
2027                        ..Default::default()
2028                    },
2029                    lsp::Diagnostic {
2030                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2031                        range: lsp::Range::new(
2032                            lsp::Position::new(0, 10),
2033                            lsp::Position::new(0, 13),
2034                        ),
2035                        message: "message 2".to_string(),
2036                        ..Default::default()
2037                    },
2038                ],
2039            })
2040            .await;
2041
2042        // Client b gets the updated summaries
2043        project_b
2044            .condition(&cx_b, |project, cx| {
2045                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2046                    == &[(
2047                        ProjectPath {
2048                            worktree_id,
2049                            path: Arc::from(Path::new("a.rs")),
2050                        },
2051                        DiagnosticSummary {
2052                            error_count: 1,
2053                            warning_count: 1,
2054                            ..Default::default()
2055                        },
2056                    )]
2057            })
2058            .await;
2059
2060        // Open the file with the errors on client B. They should be present.
2061        let buffer_b = cx_b
2062            .background()
2063            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2064            .await
2065            .unwrap();
2066
2067        buffer_b.read_with(cx_b, |buffer, _| {
2068            assert_eq!(
2069                buffer
2070                    .snapshot()
2071                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2072                    .map(|entry| entry)
2073                    .collect::<Vec<_>>(),
2074                &[
2075                    DiagnosticEntry {
2076                        range: Point::new(0, 4)..Point::new(0, 7),
2077                        diagnostic: Diagnostic {
2078                            group_id: 0,
2079                            message: "message 1".to_string(),
2080                            severity: lsp::DiagnosticSeverity::ERROR,
2081                            is_primary: true,
2082                            ..Default::default()
2083                        }
2084                    },
2085                    DiagnosticEntry {
2086                        range: Point::new(0, 10)..Point::new(0, 13),
2087                        diagnostic: Diagnostic {
2088                            group_id: 1,
2089                            severity: lsp::DiagnosticSeverity::WARNING,
2090                            message: "message 2".to_string(),
2091                            is_primary: true,
2092                            ..Default::default()
2093                        }
2094                    }
2095                ]
2096            );
2097        });
2098    }
2099
2100    #[gpui::test(iterations = 10)]
2101    async fn test_collaborating_with_completion(
2102        cx_a: &mut TestAppContext,
2103        cx_b: &mut TestAppContext,
2104    ) {
2105        cx_a.foreground().forbid_parking();
2106        let mut lang_registry = Arc::new(LanguageRegistry::new());
2107        let fs = FakeFs::new(cx_a.background());
2108
2109        // Set up a fake language server.
2110        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2111        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2112            completion_provider: Some(lsp::CompletionOptions {
2113                trigger_characters: Some(vec![".".to_string()]),
2114                ..Default::default()
2115            }),
2116            ..Default::default()
2117        });
2118        Arc::get_mut(&mut lang_registry)
2119            .unwrap()
2120            .add(Arc::new(Language::new(
2121                LanguageConfig {
2122                    name: "Rust".into(),
2123                    path_suffixes: vec!["rs".to_string()],
2124                    language_server: Some(language_server_config),
2125                    ..Default::default()
2126                },
2127                Some(tree_sitter_rust::language()),
2128            )));
2129
2130        // Connect to a server as 2 clients.
2131        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2132        let client_a = server.create_client(cx_a, "user_a").await;
2133        let client_b = server.create_client(cx_b, "user_b").await;
2134
2135        // Share a project as client A
2136        fs.insert_tree(
2137            "/a",
2138            json!({
2139                ".zed.toml": r#"collaborators = ["user_b"]"#,
2140                "main.rs": "fn main() { a }",
2141                "other.rs": "",
2142            }),
2143        )
2144        .await;
2145        let project_a = cx_a.update(|cx| {
2146            Project::local(
2147                client_a.clone(),
2148                client_a.user_store.clone(),
2149                lang_registry.clone(),
2150                fs.clone(),
2151                cx,
2152            )
2153        });
2154        let (worktree_a, _) = project_a
2155            .update(cx_a, |p, cx| {
2156                p.find_or_create_local_worktree("/a", true, cx)
2157            })
2158            .await
2159            .unwrap();
2160        worktree_a
2161            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2162            .await;
2163        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2164        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2165        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2166
2167        // Join the worktree as client B.
2168        let project_b = Project::remote(
2169            project_id,
2170            client_b.clone(),
2171            client_b.user_store.clone(),
2172            lang_registry.clone(),
2173            fs.clone(),
2174            &mut cx_b.to_async(),
2175        )
2176        .await
2177        .unwrap();
2178
2179        // Open a file in an editor as the guest.
2180        let buffer_b = project_b
2181            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2182            .await
2183            .unwrap();
2184        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2185        let editor_b = cx_b.add_view(window_b, |cx| {
2186            Editor::for_buffer(
2187                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2188                Some(project_b.clone()),
2189                watch::channel_with(Settings::test(cx)).1,
2190                cx,
2191            )
2192        });
2193
2194        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2195        buffer_b
2196            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2197            .await;
2198
2199        // Type a completion trigger character as the guest.
2200        editor_b.update(cx_b, |editor, cx| {
2201            editor.select_ranges([13..13], None, cx);
2202            editor.handle_input(&Input(".".into()), cx);
2203            cx.focus(&editor_b);
2204        });
2205
2206        // Receive a completion request as the host's language server.
2207        // Return some completions from the host's language server.
2208        cx_a.foreground().start_waiting();
2209        fake_language_server
2210            .handle_request::<lsp::request::Completion, _>(|params, _| {
2211                assert_eq!(
2212                    params.text_document_position.text_document.uri,
2213                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2214                );
2215                assert_eq!(
2216                    params.text_document_position.position,
2217                    lsp::Position::new(0, 14),
2218                );
2219
2220                Some(lsp::CompletionResponse::Array(vec![
2221                    lsp::CompletionItem {
2222                        label: "first_method(…)".into(),
2223                        detail: Some("fn(&mut self, B) -> C".into()),
2224                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2225                            new_text: "first_method($1)".to_string(),
2226                            range: lsp::Range::new(
2227                                lsp::Position::new(0, 14),
2228                                lsp::Position::new(0, 14),
2229                            ),
2230                        })),
2231                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2232                        ..Default::default()
2233                    },
2234                    lsp::CompletionItem {
2235                        label: "second_method(…)".into(),
2236                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2237                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2238                            new_text: "second_method()".to_string(),
2239                            range: lsp::Range::new(
2240                                lsp::Position::new(0, 14),
2241                                lsp::Position::new(0, 14),
2242                            ),
2243                        })),
2244                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2245                        ..Default::default()
2246                    },
2247                ]))
2248            })
2249            .next()
2250            .await
2251            .unwrap();
2252        cx_a.foreground().finish_waiting();
2253
2254        // Open the buffer on the host.
2255        let buffer_a = project_a
2256            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2257            .await
2258            .unwrap();
2259        buffer_a
2260            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2261            .await;
2262
2263        // Confirm a completion on the guest.
2264        editor_b
2265            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2266            .await;
2267        editor_b.update(cx_b, |editor, cx| {
2268            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2269            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2270        });
2271
2272        // Return a resolved completion from the host's language server.
2273        // The resolved completion has an additional text edit.
2274        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2275            |params, _| {
2276                assert_eq!(params.label, "first_method(…)");
2277                lsp::CompletionItem {
2278                    label: "first_method(…)".into(),
2279                    detail: Some("fn(&mut self, B) -> C".into()),
2280                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2281                        new_text: "first_method($1)".to_string(),
2282                        range: lsp::Range::new(
2283                            lsp::Position::new(0, 14),
2284                            lsp::Position::new(0, 14),
2285                        ),
2286                    })),
2287                    additional_text_edits: Some(vec![lsp::TextEdit {
2288                        new_text: "use d::SomeTrait;\n".to_string(),
2289                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2290                    }]),
2291                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2292                    ..Default::default()
2293                }
2294            },
2295        );
2296
2297        // The additional edit is applied.
2298        buffer_a
2299            .condition(&cx_a, |buffer, _| {
2300                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2301            })
2302            .await;
2303        buffer_b
2304            .condition(&cx_b, |buffer, _| {
2305                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2306            })
2307            .await;
2308    }
2309
2310    #[gpui::test(iterations = 10)]
2311    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2312        cx_a.foreground().forbid_parking();
2313        let mut lang_registry = Arc::new(LanguageRegistry::new());
2314        let fs = FakeFs::new(cx_a.background());
2315
2316        // Set up a fake language server.
2317        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2318        Arc::get_mut(&mut lang_registry)
2319            .unwrap()
2320            .add(Arc::new(Language::new(
2321                LanguageConfig {
2322                    name: "Rust".into(),
2323                    path_suffixes: vec!["rs".to_string()],
2324                    language_server: Some(language_server_config),
2325                    ..Default::default()
2326                },
2327                Some(tree_sitter_rust::language()),
2328            )));
2329
2330        // Connect to a server as 2 clients.
2331        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2332        let client_a = server.create_client(cx_a, "user_a").await;
2333        let client_b = server.create_client(cx_b, "user_b").await;
2334
2335        // Share a project as client A
2336        fs.insert_tree(
2337            "/a",
2338            json!({
2339                ".zed.toml": r#"collaborators = ["user_b"]"#,
2340                "a.rs": "let one = two",
2341            }),
2342        )
2343        .await;
2344        let project_a = cx_a.update(|cx| {
2345            Project::local(
2346                client_a.clone(),
2347                client_a.user_store.clone(),
2348                lang_registry.clone(),
2349                fs.clone(),
2350                cx,
2351            )
2352        });
2353        let (worktree_a, _) = project_a
2354            .update(cx_a, |p, cx| {
2355                p.find_or_create_local_worktree("/a", true, cx)
2356            })
2357            .await
2358            .unwrap();
2359        worktree_a
2360            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2361            .await;
2362        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2363        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2364        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2365
2366        // Join the worktree as client B.
2367        let project_b = Project::remote(
2368            project_id,
2369            client_b.clone(),
2370            client_b.user_store.clone(),
2371            lang_registry.clone(),
2372            fs.clone(),
2373            &mut cx_b.to_async(),
2374        )
2375        .await
2376        .unwrap();
2377
2378        let buffer_b = cx_b
2379            .background()
2380            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2381            .await
2382            .unwrap();
2383
2384        let format = project_b.update(cx_b, |project, cx| {
2385            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2386        });
2387
2388        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2389        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2390            Some(vec![
2391                lsp::TextEdit {
2392                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2393                    new_text: "h".to_string(),
2394                },
2395                lsp::TextEdit {
2396                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2397                    new_text: "y".to_string(),
2398                },
2399            ])
2400        });
2401
2402        format.await.unwrap();
2403        assert_eq!(
2404            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2405            "let honey = two"
2406        );
2407    }
2408
2409    #[gpui::test(iterations = 10)]
2410    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2411        cx_a.foreground().forbid_parking();
2412        let mut lang_registry = Arc::new(LanguageRegistry::new());
2413        let fs = FakeFs::new(cx_a.background());
2414        fs.insert_tree(
2415            "/root-1",
2416            json!({
2417                ".zed.toml": r#"collaborators = ["user_b"]"#,
2418                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2419            }),
2420        )
2421        .await;
2422        fs.insert_tree(
2423            "/root-2",
2424            json!({
2425                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2426            }),
2427        )
2428        .await;
2429
2430        // Set up a fake language server.
2431        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2432        Arc::get_mut(&mut lang_registry)
2433            .unwrap()
2434            .add(Arc::new(Language::new(
2435                LanguageConfig {
2436                    name: "Rust".into(),
2437                    path_suffixes: vec!["rs".to_string()],
2438                    language_server: Some(language_server_config),
2439                    ..Default::default()
2440                },
2441                Some(tree_sitter_rust::language()),
2442            )));
2443
2444        // Connect to a server as 2 clients.
2445        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2446        let client_a = server.create_client(cx_a, "user_a").await;
2447        let client_b = server.create_client(cx_b, "user_b").await;
2448
2449        // Share a project as client A
2450        let project_a = cx_a.update(|cx| {
2451            Project::local(
2452                client_a.clone(),
2453                client_a.user_store.clone(),
2454                lang_registry.clone(),
2455                fs.clone(),
2456                cx,
2457            )
2458        });
2459        let (worktree_a, _) = project_a
2460            .update(cx_a, |p, cx| {
2461                p.find_or_create_local_worktree("/root-1", true, cx)
2462            })
2463            .await
2464            .unwrap();
2465        worktree_a
2466            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2467            .await;
2468        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2469        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2470        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2471
2472        // Join the worktree as client B.
2473        let project_b = Project::remote(
2474            project_id,
2475            client_b.clone(),
2476            client_b.user_store.clone(),
2477            lang_registry.clone(),
2478            fs.clone(),
2479            &mut cx_b.to_async(),
2480        )
2481        .await
2482        .unwrap();
2483
2484        // Open the file on client B.
2485        let buffer_b = cx_b
2486            .background()
2487            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2488            .await
2489            .unwrap();
2490
2491        // Request the definition of a symbol as the guest.
2492        let definitions_1 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2493
2494        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2495        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2496            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2497                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2498                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2499            )))
2500        });
2501
2502        let definitions_1 = definitions_1.await.unwrap();
2503        cx_b.read(|cx| {
2504            assert_eq!(definitions_1.len(), 1);
2505            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2506            let target_buffer = definitions_1[0].buffer.read(cx);
2507            assert_eq!(
2508                target_buffer.text(),
2509                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2510            );
2511            assert_eq!(
2512                definitions_1[0].range.to_point(target_buffer),
2513                Point::new(0, 6)..Point::new(0, 9)
2514            );
2515        });
2516
2517        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2518        // the previous call to `definition`.
2519        let definitions_2 = project_b.update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2520        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2521            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2522                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2523                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2524            )))
2525        });
2526
2527        let definitions_2 = definitions_2.await.unwrap();
2528        cx_b.read(|cx| {
2529            assert_eq!(definitions_2.len(), 1);
2530            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2531            let target_buffer = definitions_2[0].buffer.read(cx);
2532            assert_eq!(
2533                target_buffer.text(),
2534                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2535            );
2536            assert_eq!(
2537                definitions_2[0].range.to_point(target_buffer),
2538                Point::new(1, 6)..Point::new(1, 11)
2539            );
2540        });
2541        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2542    }
2543
2544    #[gpui::test(iterations = 10)]
2545    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2546        cx_a.foreground().forbid_parking();
2547        let mut lang_registry = Arc::new(LanguageRegistry::new());
2548        let fs = FakeFs::new(cx_a.background());
2549        fs.insert_tree(
2550            "/root-1",
2551            json!({
2552                ".zed.toml": r#"collaborators = ["user_b"]"#,
2553                "one.rs": "const ONE: usize = 1;",
2554                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2555            }),
2556        )
2557        .await;
2558        fs.insert_tree(
2559            "/root-2",
2560            json!({
2561                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2562            }),
2563        )
2564        .await;
2565
2566        // Set up a fake language server.
2567        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2568        Arc::get_mut(&mut lang_registry)
2569            .unwrap()
2570            .add(Arc::new(Language::new(
2571                LanguageConfig {
2572                    name: "Rust".into(),
2573                    path_suffixes: vec!["rs".to_string()],
2574                    language_server: Some(language_server_config),
2575                    ..Default::default()
2576                },
2577                Some(tree_sitter_rust::language()),
2578            )));
2579
2580        // Connect to a server as 2 clients.
2581        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2582        let client_a = server.create_client(cx_a, "user_a").await;
2583        let client_b = server.create_client(cx_b, "user_b").await;
2584
2585        // Share a project as client A
2586        let project_a = cx_a.update(|cx| {
2587            Project::local(
2588                client_a.clone(),
2589                client_a.user_store.clone(),
2590                lang_registry.clone(),
2591                fs.clone(),
2592                cx,
2593            )
2594        });
2595        let (worktree_a, _) = project_a
2596            .update(cx_a, |p, cx| {
2597                p.find_or_create_local_worktree("/root-1", true, cx)
2598            })
2599            .await
2600            .unwrap();
2601        worktree_a
2602            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2603            .await;
2604        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2605        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2606        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2607
2608        // Join the worktree as client B.
2609        let project_b = Project::remote(
2610            project_id,
2611            client_b.clone(),
2612            client_b.user_store.clone(),
2613            lang_registry.clone(),
2614            fs.clone(),
2615            &mut cx_b.to_async(),
2616        )
2617        .await
2618        .unwrap();
2619
2620        // Open the file on client B.
2621        let buffer_b = cx_b
2622            .background()
2623            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2624            .await
2625            .unwrap();
2626
2627        // Request references to a symbol as the guest.
2628        let references = project_b.update(cx_b, |p, cx| p.references(&buffer_b, 7, cx));
2629
2630        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2631        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2632            assert_eq!(
2633                params.text_document_position.text_document.uri.as_str(),
2634                "file:///root-1/one.rs"
2635            );
2636            Some(vec![
2637                lsp::Location {
2638                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2639                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2640                },
2641                lsp::Location {
2642                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2643                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2644                },
2645                lsp::Location {
2646                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2647                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2648                },
2649            ])
2650        });
2651
2652        let references = references.await.unwrap();
2653        cx_b.read(|cx| {
2654            assert_eq!(references.len(), 3);
2655            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2656
2657            let two_buffer = references[0].buffer.read(cx);
2658            let three_buffer = references[2].buffer.read(cx);
2659            assert_eq!(
2660                two_buffer.file().unwrap().path().as_ref(),
2661                Path::new("two.rs")
2662            );
2663            assert_eq!(references[1].buffer, references[0].buffer);
2664            assert_eq!(
2665                three_buffer.file().unwrap().full_path(cx),
2666                Path::new("three.rs")
2667            );
2668
2669            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2670            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2671            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2672        });
2673    }
2674
2675    #[gpui::test(iterations = 10)]
2676    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2677        cx_a.foreground().forbid_parking();
2678        let lang_registry = Arc::new(LanguageRegistry::new());
2679        let fs = FakeFs::new(cx_a.background());
2680        fs.insert_tree(
2681            "/root-1",
2682            json!({
2683                ".zed.toml": r#"collaborators = ["user_b"]"#,
2684                "a": "hello world",
2685                "b": "goodnight moon",
2686                "c": "a world of goo",
2687                "d": "world champion of clown world",
2688            }),
2689        )
2690        .await;
2691        fs.insert_tree(
2692            "/root-2",
2693            json!({
2694                "e": "disney world is fun",
2695            }),
2696        )
2697        .await;
2698
2699        // Connect to a server as 2 clients.
2700        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2701        let client_a = server.create_client(cx_a, "user_a").await;
2702        let client_b = server.create_client(cx_b, "user_b").await;
2703
2704        // Share a project as client A
2705        let project_a = cx_a.update(|cx| {
2706            Project::local(
2707                client_a.clone(),
2708                client_a.user_store.clone(),
2709                lang_registry.clone(),
2710                fs.clone(),
2711                cx,
2712            )
2713        });
2714        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2715
2716        let (worktree_1, _) = project_a
2717            .update(cx_a, |p, cx| {
2718                p.find_or_create_local_worktree("/root-1", true, cx)
2719            })
2720            .await
2721            .unwrap();
2722        worktree_1
2723            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2724            .await;
2725        let (worktree_2, _) = project_a
2726            .update(cx_a, |p, cx| {
2727                p.find_or_create_local_worktree("/root-2", true, cx)
2728            })
2729            .await
2730            .unwrap();
2731        worktree_2
2732            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2733            .await;
2734
2735        eprintln!("sharing");
2736
2737        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2738
2739        // Join the worktree as client B.
2740        let project_b = Project::remote(
2741            project_id,
2742            client_b.clone(),
2743            client_b.user_store.clone(),
2744            lang_registry.clone(),
2745            fs.clone(),
2746            &mut cx_b.to_async(),
2747        )
2748        .await
2749        .unwrap();
2750
2751        let results = project_b
2752            .update(cx_b, |project, cx| {
2753                project.search(SearchQuery::text("world", false, false), cx)
2754            })
2755            .await
2756            .unwrap();
2757
2758        let mut ranges_by_path = results
2759            .into_iter()
2760            .map(|(buffer, ranges)| {
2761                buffer.read_with(cx_b, |buffer, cx| {
2762                    let path = buffer.file().unwrap().full_path(cx);
2763                    let offset_ranges = ranges
2764                        .into_iter()
2765                        .map(|range| range.to_offset(buffer))
2766                        .collect::<Vec<_>>();
2767                    (path, offset_ranges)
2768                })
2769            })
2770            .collect::<Vec<_>>();
2771        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2772
2773        assert_eq!(
2774            ranges_by_path,
2775            &[
2776                (PathBuf::from("root-1/a"), vec![6..11]),
2777                (PathBuf::from("root-1/c"), vec![2..7]),
2778                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2779                (PathBuf::from("root-2/e"), vec![7..12]),
2780            ]
2781        );
2782    }
2783
2784    #[gpui::test(iterations = 10)]
2785    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2786        cx_a.foreground().forbid_parking();
2787        let lang_registry = Arc::new(LanguageRegistry::new());
2788        let fs = FakeFs::new(cx_a.background());
2789        fs.insert_tree(
2790            "/root-1",
2791            json!({
2792                ".zed.toml": r#"collaborators = ["user_b"]"#,
2793                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2794            }),
2795        )
2796        .await;
2797
2798        // Set up a fake language server.
2799        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2800        lang_registry.add(Arc::new(Language::new(
2801            LanguageConfig {
2802                name: "Rust".into(),
2803                path_suffixes: vec!["rs".to_string()],
2804                language_server: Some(language_server_config),
2805                ..Default::default()
2806            },
2807            Some(tree_sitter_rust::language()),
2808        )));
2809
2810        // Connect to a server as 2 clients.
2811        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2812        let client_a = server.create_client(cx_a, "user_a").await;
2813        let client_b = server.create_client(cx_b, "user_b").await;
2814
2815        // Share a project as client A
2816        let project_a = cx_a.update(|cx| {
2817            Project::local(
2818                client_a.clone(),
2819                client_a.user_store.clone(),
2820                lang_registry.clone(),
2821                fs.clone(),
2822                cx,
2823            )
2824        });
2825        let (worktree_a, _) = project_a
2826            .update(cx_a, |p, cx| {
2827                p.find_or_create_local_worktree("/root-1", true, cx)
2828            })
2829            .await
2830            .unwrap();
2831        worktree_a
2832            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2833            .await;
2834        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2835        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2836        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2837
2838        // Join the worktree as client B.
2839        let project_b = Project::remote(
2840            project_id,
2841            client_b.clone(),
2842            client_b.user_store.clone(),
2843            lang_registry.clone(),
2844            fs.clone(),
2845            &mut cx_b.to_async(),
2846        )
2847        .await
2848        .unwrap();
2849
2850        // Open the file on client B.
2851        let buffer_b = cx_b
2852            .background()
2853            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2854            .await
2855            .unwrap();
2856
2857        // Request document highlights as the guest.
2858        let highlights = project_b.update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx));
2859
2860        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2861        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2862            |params, _| {
2863                assert_eq!(
2864                    params
2865                        .text_document_position_params
2866                        .text_document
2867                        .uri
2868                        .as_str(),
2869                    "file:///root-1/main.rs"
2870                );
2871                assert_eq!(
2872                    params.text_document_position_params.position,
2873                    lsp::Position::new(0, 34)
2874                );
2875                Some(vec![
2876                    lsp::DocumentHighlight {
2877                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2878                        range: lsp::Range::new(
2879                            lsp::Position::new(0, 10),
2880                            lsp::Position::new(0, 16),
2881                        ),
2882                    },
2883                    lsp::DocumentHighlight {
2884                        kind: Some(lsp::DocumentHighlightKind::READ),
2885                        range: lsp::Range::new(
2886                            lsp::Position::new(0, 32),
2887                            lsp::Position::new(0, 38),
2888                        ),
2889                    },
2890                    lsp::DocumentHighlight {
2891                        kind: Some(lsp::DocumentHighlightKind::READ),
2892                        range: lsp::Range::new(
2893                            lsp::Position::new(0, 41),
2894                            lsp::Position::new(0, 47),
2895                        ),
2896                    },
2897                ])
2898            },
2899        );
2900
2901        let highlights = highlights.await.unwrap();
2902        buffer_b.read_with(cx_b, |buffer, _| {
2903            let snapshot = buffer.snapshot();
2904
2905            let highlights = highlights
2906                .into_iter()
2907                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2908                .collect::<Vec<_>>();
2909            assert_eq!(
2910                highlights,
2911                &[
2912                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2913                    (lsp::DocumentHighlightKind::READ, 32..38),
2914                    (lsp::DocumentHighlightKind::READ, 41..47)
2915                ]
2916            )
2917        });
2918    }
2919
2920    #[gpui::test(iterations = 10)]
2921    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2922        cx_a.foreground().forbid_parking();
2923        let mut lang_registry = Arc::new(LanguageRegistry::new());
2924        let fs = FakeFs::new(cx_a.background());
2925        fs.insert_tree(
2926            "/code",
2927            json!({
2928                "crate-1": {
2929                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2930                    "one.rs": "const ONE: usize = 1;",
2931                },
2932                "crate-2": {
2933                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2934                },
2935                "private": {
2936                    "passwords.txt": "the-password",
2937                }
2938            }),
2939        )
2940        .await;
2941
2942        // Set up a fake language server.
2943        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2944        Arc::get_mut(&mut lang_registry)
2945            .unwrap()
2946            .add(Arc::new(Language::new(
2947                LanguageConfig {
2948                    name: "Rust".into(),
2949                    path_suffixes: vec!["rs".to_string()],
2950                    language_server: Some(language_server_config),
2951                    ..Default::default()
2952                },
2953                Some(tree_sitter_rust::language()),
2954            )));
2955
2956        // Connect to a server as 2 clients.
2957        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2958        let client_a = server.create_client(cx_a, "user_a").await;
2959        let client_b = server.create_client(cx_b, "user_b").await;
2960
2961        // Share a project as client A
2962        let project_a = cx_a.update(|cx| {
2963            Project::local(
2964                client_a.clone(),
2965                client_a.user_store.clone(),
2966                lang_registry.clone(),
2967                fs.clone(),
2968                cx,
2969            )
2970        });
2971        let (worktree_a, _) = project_a
2972            .update(cx_a, |p, cx| {
2973                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2974            })
2975            .await
2976            .unwrap();
2977        worktree_a
2978            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2979            .await;
2980        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2981        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2982        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2983
2984        // Join the worktree as client B.
2985        let project_b = Project::remote(
2986            project_id,
2987            client_b.clone(),
2988            client_b.user_store.clone(),
2989            lang_registry.clone(),
2990            fs.clone(),
2991            &mut cx_b.to_async(),
2992        )
2993        .await
2994        .unwrap();
2995
2996        // Cause the language server to start.
2997        let _buffer = cx_b
2998            .background()
2999            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3000            .await
3001            .unwrap();
3002
3003        // Request the definition of a symbol as the guest.
3004        let symbols = project_b.update(cx_b, |p, cx| p.symbols("two", cx));
3005        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3006        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3007            #[allow(deprecated)]
3008            Some(vec![lsp::SymbolInformation {
3009                name: "TWO".into(),
3010                location: lsp::Location {
3011                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3012                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3013                },
3014                kind: lsp::SymbolKind::CONSTANT,
3015                tags: None,
3016                container_name: None,
3017                deprecated: None,
3018            }])
3019        });
3020
3021        let symbols = symbols.await.unwrap();
3022        assert_eq!(symbols.len(), 1);
3023        assert_eq!(symbols[0].name, "TWO");
3024
3025        // Open one of the returned symbols.
3026        let buffer_b_2 = project_b
3027            .update(cx_b, |project, cx| {
3028                project.open_buffer_for_symbol(&symbols[0], cx)
3029            })
3030            .await
3031            .unwrap();
3032        buffer_b_2.read_with(cx_b, |buffer, _| {
3033            assert_eq!(
3034                buffer.file().unwrap().path().as_ref(),
3035                Path::new("../crate-2/two.rs")
3036            );
3037        });
3038
3039        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3040        let mut fake_symbol = symbols[0].clone();
3041        fake_symbol.path = Path::new("/code/secrets").into();
3042        let error = project_b
3043            .update(cx_b, |project, cx| {
3044                project.open_buffer_for_symbol(&fake_symbol, cx)
3045            })
3046            .await
3047            .unwrap_err();
3048        assert!(error.to_string().contains("invalid symbol signature"));
3049    }
3050
3051    #[gpui::test(iterations = 10)]
3052    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3053        cx_a: &mut TestAppContext,
3054        cx_b: &mut TestAppContext,
3055        mut rng: StdRng,
3056    ) {
3057        cx_a.foreground().forbid_parking();
3058        let mut lang_registry = Arc::new(LanguageRegistry::new());
3059        let fs = FakeFs::new(cx_a.background());
3060        fs.insert_tree(
3061            "/root",
3062            json!({
3063                ".zed.toml": r#"collaborators = ["user_b"]"#,
3064                "a.rs": "const ONE: usize = b::TWO;",
3065                "b.rs": "const TWO: usize = 2",
3066            }),
3067        )
3068        .await;
3069
3070        // Set up a fake language server.
3071        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3072
3073        Arc::get_mut(&mut lang_registry)
3074            .unwrap()
3075            .add(Arc::new(Language::new(
3076                LanguageConfig {
3077                    name: "Rust".into(),
3078                    path_suffixes: vec!["rs".to_string()],
3079                    language_server: Some(language_server_config),
3080                    ..Default::default()
3081                },
3082                Some(tree_sitter_rust::language()),
3083            )));
3084
3085        // Connect to a server as 2 clients.
3086        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3087        let client_a = server.create_client(cx_a, "user_a").await;
3088        let client_b = server.create_client(cx_b, "user_b").await;
3089
3090        // Share a project as client A
3091        let project_a = cx_a.update(|cx| {
3092            Project::local(
3093                client_a.clone(),
3094                client_a.user_store.clone(),
3095                lang_registry.clone(),
3096                fs.clone(),
3097                cx,
3098            )
3099        });
3100
3101        let (worktree_a, _) = project_a
3102            .update(cx_a, |p, cx| {
3103                p.find_or_create_local_worktree("/root", true, cx)
3104            })
3105            .await
3106            .unwrap();
3107        worktree_a
3108            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3109            .await;
3110        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3111        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3112        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3113
3114        // Join the worktree as client B.
3115        let project_b = Project::remote(
3116            project_id,
3117            client_b.clone(),
3118            client_b.user_store.clone(),
3119            lang_registry.clone(),
3120            fs.clone(),
3121            &mut cx_b.to_async(),
3122        )
3123        .await
3124        .unwrap();
3125
3126        let buffer_b1 = cx_b
3127            .background()
3128            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3129            .await
3130            .unwrap();
3131
3132        let definitions;
3133        let buffer_b2;
3134        if rng.gen() {
3135            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3136            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3137        } else {
3138            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3139            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3140        }
3141
3142        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3143        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3144            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3145                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3146                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3147            )))
3148        });
3149
3150        let buffer_b2 = buffer_b2.await.unwrap();
3151        let definitions = definitions.await.unwrap();
3152        assert_eq!(definitions.len(), 1);
3153        assert_eq!(definitions[0].buffer, buffer_b2);
3154    }
3155
3156    #[gpui::test(iterations = 10)]
3157    async fn test_collaborating_with_code_actions(
3158        cx_a: &mut TestAppContext,
3159        cx_b: &mut TestAppContext,
3160    ) {
3161        cx_a.foreground().forbid_parking();
3162        let mut lang_registry = Arc::new(LanguageRegistry::new());
3163        let fs = FakeFs::new(cx_a.background());
3164        let mut path_openers_b = Vec::new();
3165        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3166
3167        // Set up a fake language server.
3168        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3169        Arc::get_mut(&mut lang_registry)
3170            .unwrap()
3171            .add(Arc::new(Language::new(
3172                LanguageConfig {
3173                    name: "Rust".into(),
3174                    path_suffixes: vec!["rs".to_string()],
3175                    language_server: Some(language_server_config),
3176                    ..Default::default()
3177                },
3178                Some(tree_sitter_rust::language()),
3179            )));
3180
3181        // Connect to a server as 2 clients.
3182        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3183        let client_a = server.create_client(cx_a, "user_a").await;
3184        let client_b = server.create_client(cx_b, "user_b").await;
3185
3186        // Share a project as client A
3187        fs.insert_tree(
3188            "/a",
3189            json!({
3190                ".zed.toml": r#"collaborators = ["user_b"]"#,
3191                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3192                "other.rs": "pub fn foo() -> usize { 4 }",
3193            }),
3194        )
3195        .await;
3196        let project_a = cx_a.update(|cx| {
3197            Project::local(
3198                client_a.clone(),
3199                client_a.user_store.clone(),
3200                lang_registry.clone(),
3201                fs.clone(),
3202                cx,
3203            )
3204        });
3205        let (worktree_a, _) = project_a
3206            .update(cx_a, |p, cx| {
3207                p.find_or_create_local_worktree("/a", true, cx)
3208            })
3209            .await
3210            .unwrap();
3211        worktree_a
3212            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3213            .await;
3214        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3215        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3216        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3217
3218        // Join the worktree as client B.
3219        let project_b = Project::remote(
3220            project_id,
3221            client_b.clone(),
3222            client_b.user_store.clone(),
3223            lang_registry.clone(),
3224            fs.clone(),
3225            &mut cx_b.to_async(),
3226        )
3227        .await
3228        .unwrap();
3229        let mut params = cx_b.update(WorkspaceParams::test);
3230        params.languages = lang_registry.clone();
3231        params.client = client_b.client.clone();
3232        params.user_store = client_b.user_store.clone();
3233        params.project = project_b;
3234        params.path_openers = path_openers_b.into();
3235
3236        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3237        let editor_b = workspace_b
3238            .update(cx_b, |workspace, cx| {
3239                workspace.open_path((worktree_id, "main.rs").into(), cx)
3240            })
3241            .await
3242            .unwrap()
3243            .downcast::<Editor>()
3244            .unwrap();
3245
3246        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3247        fake_language_server
3248            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3249                assert_eq!(
3250                    params.text_document.uri,
3251                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3252                );
3253                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3254                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3255                None
3256            })
3257            .next()
3258            .await;
3259
3260        // Move cursor to a location that contains code actions.
3261        editor_b.update(cx_b, |editor, cx| {
3262            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3263            cx.focus(&editor_b);
3264        });
3265
3266        fake_language_server
3267            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3268                assert_eq!(
3269                    params.text_document.uri,
3270                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3271                );
3272                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3273                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3274
3275                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3276                    lsp::CodeAction {
3277                        title: "Inline into all callers".to_string(),
3278                        edit: Some(lsp::WorkspaceEdit {
3279                            changes: Some(
3280                                [
3281                                    (
3282                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3283                                        vec![lsp::TextEdit::new(
3284                                            lsp::Range::new(
3285                                                lsp::Position::new(1, 22),
3286                                                lsp::Position::new(1, 34),
3287                                            ),
3288                                            "4".to_string(),
3289                                        )],
3290                                    ),
3291                                    (
3292                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3293                                        vec![lsp::TextEdit::new(
3294                                            lsp::Range::new(
3295                                                lsp::Position::new(0, 0),
3296                                                lsp::Position::new(0, 27),
3297                                            ),
3298                                            "".to_string(),
3299                                        )],
3300                                    ),
3301                                ]
3302                                .into_iter()
3303                                .collect(),
3304                            ),
3305                            ..Default::default()
3306                        }),
3307                        data: Some(json!({
3308                            "codeActionParams": {
3309                                "range": {
3310                                    "start": {"line": 1, "column": 31},
3311                                    "end": {"line": 1, "column": 31},
3312                                }
3313                            }
3314                        })),
3315                        ..Default::default()
3316                    },
3317                )])
3318            })
3319            .next()
3320            .await;
3321
3322        // Toggle code actions and wait for them to display.
3323        editor_b.update(cx_b, |editor, cx| {
3324            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3325        });
3326        editor_b
3327            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3328            .await;
3329
3330        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3331
3332        // Confirming the code action will trigger a resolve request.
3333        let confirm_action = workspace_b
3334            .update(cx_b, |workspace, cx| {
3335                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3336            })
3337            .unwrap();
3338        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3339            lsp::CodeAction {
3340                title: "Inline into all callers".to_string(),
3341                edit: Some(lsp::WorkspaceEdit {
3342                    changes: Some(
3343                        [
3344                            (
3345                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3346                                vec![lsp::TextEdit::new(
3347                                    lsp::Range::new(
3348                                        lsp::Position::new(1, 22),
3349                                        lsp::Position::new(1, 34),
3350                                    ),
3351                                    "4".to_string(),
3352                                )],
3353                            ),
3354                            (
3355                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3356                                vec![lsp::TextEdit::new(
3357                                    lsp::Range::new(
3358                                        lsp::Position::new(0, 0),
3359                                        lsp::Position::new(0, 27),
3360                                    ),
3361                                    "".to_string(),
3362                                )],
3363                            ),
3364                        ]
3365                        .into_iter()
3366                        .collect(),
3367                    ),
3368                    ..Default::default()
3369                }),
3370                ..Default::default()
3371            }
3372        });
3373
3374        // After the action is confirmed, an editor containing both modified files is opened.
3375        confirm_action.await.unwrap();
3376        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3377            workspace
3378                .active_item(cx)
3379                .unwrap()
3380                .downcast::<Editor>()
3381                .unwrap()
3382        });
3383        code_action_editor.update(cx_b, |editor, cx| {
3384            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3385            editor.undo(&Undo, cx);
3386            assert_eq!(
3387                editor.text(cx),
3388                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3389            );
3390            editor.redo(&Redo, cx);
3391            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3392        });
3393    }
3394
3395    #[gpui::test(iterations = 10)]
3396    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3397        cx_a.foreground().forbid_parking();
3398        let mut lang_registry = Arc::new(LanguageRegistry::new());
3399        let fs = FakeFs::new(cx_a.background());
3400        let mut path_openers_b = Vec::new();
3401        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3402
3403        // Set up a fake language server.
3404        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3405        Arc::get_mut(&mut lang_registry)
3406            .unwrap()
3407            .add(Arc::new(Language::new(
3408                LanguageConfig {
3409                    name: "Rust".into(),
3410                    path_suffixes: vec!["rs".to_string()],
3411                    language_server: Some(language_server_config),
3412                    ..Default::default()
3413                },
3414                Some(tree_sitter_rust::language()),
3415            )));
3416
3417        // Connect to a server as 2 clients.
3418        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3419        let client_a = server.create_client(cx_a, "user_a").await;
3420        let client_b = server.create_client(cx_b, "user_b").await;
3421
3422        // Share a project as client A
3423        fs.insert_tree(
3424            "/dir",
3425            json!({
3426                ".zed.toml": r#"collaborators = ["user_b"]"#,
3427                "one.rs": "const ONE: usize = 1;",
3428                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3429            }),
3430        )
3431        .await;
3432        let project_a = cx_a.update(|cx| {
3433            Project::local(
3434                client_a.clone(),
3435                client_a.user_store.clone(),
3436                lang_registry.clone(),
3437                fs.clone(),
3438                cx,
3439            )
3440        });
3441        let (worktree_a, _) = project_a
3442            .update(cx_a, |p, cx| {
3443                p.find_or_create_local_worktree("/dir", true, cx)
3444            })
3445            .await
3446            .unwrap();
3447        worktree_a
3448            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3449            .await;
3450        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3451        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3452        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3453
3454        // Join the worktree as client B.
3455        let project_b = Project::remote(
3456            project_id,
3457            client_b.clone(),
3458            client_b.user_store.clone(),
3459            lang_registry.clone(),
3460            fs.clone(),
3461            &mut cx_b.to_async(),
3462        )
3463        .await
3464        .unwrap();
3465        let mut params = cx_b.update(WorkspaceParams::test);
3466        params.languages = lang_registry.clone();
3467        params.client = client_b.client.clone();
3468        params.user_store = client_b.user_store.clone();
3469        params.project = project_b;
3470        params.path_openers = path_openers_b.into();
3471
3472        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3473        let editor_b = workspace_b
3474            .update(cx_b, |workspace, cx| {
3475                workspace.open_path((worktree_id, "one.rs").into(), cx)
3476            })
3477            .await
3478            .unwrap()
3479            .downcast::<Editor>()
3480            .unwrap();
3481        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3482
3483        // Move cursor to a location that can be renamed.
3484        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3485            editor.select_ranges([7..7], None, cx);
3486            editor.rename(&Rename, cx).unwrap()
3487        });
3488
3489        fake_language_server
3490            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3491                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3492                assert_eq!(params.position, lsp::Position::new(0, 7));
3493                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3494                    lsp::Position::new(0, 6),
3495                    lsp::Position::new(0, 9),
3496                )))
3497            })
3498            .next()
3499            .await
3500            .unwrap();
3501        prepare_rename.await.unwrap();
3502        editor_b.update(cx_b, |editor, cx| {
3503            let rename = editor.pending_rename().unwrap();
3504            let buffer = editor.buffer().read(cx).snapshot(cx);
3505            assert_eq!(
3506                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3507                6..9
3508            );
3509            rename.editor.update(cx, |rename_editor, cx| {
3510                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3511                    rename_buffer.edit([0..3], "THREE", cx);
3512                });
3513            });
3514        });
3515
3516        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3517            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3518        });
3519        fake_language_server
3520            .handle_request::<lsp::request::Rename, _>(|params, _| {
3521                assert_eq!(
3522                    params.text_document_position.text_document.uri.as_str(),
3523                    "file:///dir/one.rs"
3524                );
3525                assert_eq!(
3526                    params.text_document_position.position,
3527                    lsp::Position::new(0, 6)
3528                );
3529                assert_eq!(params.new_name, "THREE");
3530                Some(lsp::WorkspaceEdit {
3531                    changes: Some(
3532                        [
3533                            (
3534                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3535                                vec![lsp::TextEdit::new(
3536                                    lsp::Range::new(
3537                                        lsp::Position::new(0, 6),
3538                                        lsp::Position::new(0, 9),
3539                                    ),
3540                                    "THREE".to_string(),
3541                                )],
3542                            ),
3543                            (
3544                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3545                                vec![
3546                                    lsp::TextEdit::new(
3547                                        lsp::Range::new(
3548                                            lsp::Position::new(0, 24),
3549                                            lsp::Position::new(0, 27),
3550                                        ),
3551                                        "THREE".to_string(),
3552                                    ),
3553                                    lsp::TextEdit::new(
3554                                        lsp::Range::new(
3555                                            lsp::Position::new(0, 35),
3556                                            lsp::Position::new(0, 38),
3557                                        ),
3558                                        "THREE".to_string(),
3559                                    ),
3560                                ],
3561                            ),
3562                        ]
3563                        .into_iter()
3564                        .collect(),
3565                    ),
3566                    ..Default::default()
3567                })
3568            })
3569            .next()
3570            .await
3571            .unwrap();
3572        confirm_rename.await.unwrap();
3573
3574        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3575            workspace
3576                .active_item(cx)
3577                .unwrap()
3578                .downcast::<Editor>()
3579                .unwrap()
3580        });
3581        rename_editor.update(cx_b, |editor, cx| {
3582            assert_eq!(
3583                editor.text(cx),
3584                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3585            );
3586            editor.undo(&Undo, cx);
3587            assert_eq!(
3588                editor.text(cx),
3589                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3590            );
3591            editor.redo(&Redo, cx);
3592            assert_eq!(
3593                editor.text(cx),
3594                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3595            );
3596        });
3597
3598        // Ensure temporary rename edits cannot be undone/redone.
3599        editor_b.update(cx_b, |editor, cx| {
3600            editor.undo(&Undo, cx);
3601            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3602            editor.undo(&Undo, cx);
3603            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3604            editor.redo(&Redo, cx);
3605            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3606        })
3607    }
3608
3609    #[gpui::test(iterations = 10)]
3610    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3611        cx_a.foreground().forbid_parking();
3612
3613        // Connect to a server as 2 clients.
3614        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3615        let client_a = server.create_client(cx_a, "user_a").await;
3616        let client_b = server.create_client(cx_b, "user_b").await;
3617
3618        // Create an org that includes these 2 users.
3619        let db = &server.app_state.db;
3620        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3621        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3622            .await
3623            .unwrap();
3624        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3625            .await
3626            .unwrap();
3627
3628        // Create a channel that includes all the users.
3629        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3630        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3631            .await
3632            .unwrap();
3633        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3634            .await
3635            .unwrap();
3636        db.create_channel_message(
3637            channel_id,
3638            client_b.current_user_id(&cx_b),
3639            "hello A, it's B.",
3640            OffsetDateTime::now_utc(),
3641            1,
3642        )
3643        .await
3644        .unwrap();
3645
3646        let channels_a = cx_a
3647            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3648        channels_a
3649            .condition(cx_a, |list, _| list.available_channels().is_some())
3650            .await;
3651        channels_a.read_with(cx_a, |list, _| {
3652            assert_eq!(
3653                list.available_channels().unwrap(),
3654                &[ChannelDetails {
3655                    id: channel_id.to_proto(),
3656                    name: "test-channel".to_string()
3657                }]
3658            )
3659        });
3660        let channel_a = channels_a.update(cx_a, |this, cx| {
3661            this.get_channel(channel_id.to_proto(), cx).unwrap()
3662        });
3663        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3664        channel_a
3665            .condition(&cx_a, |channel, _| {
3666                channel_messages(channel)
3667                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3668            })
3669            .await;
3670
3671        let channels_b = cx_b
3672            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3673        channels_b
3674            .condition(cx_b, |list, _| list.available_channels().is_some())
3675            .await;
3676        channels_b.read_with(cx_b, |list, _| {
3677            assert_eq!(
3678                list.available_channels().unwrap(),
3679                &[ChannelDetails {
3680                    id: channel_id.to_proto(),
3681                    name: "test-channel".to_string()
3682                }]
3683            )
3684        });
3685
3686        let channel_b = channels_b.update(cx_b, |this, cx| {
3687            this.get_channel(channel_id.to_proto(), cx).unwrap()
3688        });
3689        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3690        channel_b
3691            .condition(&cx_b, |channel, _| {
3692                channel_messages(channel)
3693                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3694            })
3695            .await;
3696
3697        channel_a
3698            .update(cx_a, |channel, cx| {
3699                channel
3700                    .send_message("oh, hi B.".to_string(), cx)
3701                    .unwrap()
3702                    .detach();
3703                let task = channel.send_message("sup".to_string(), cx).unwrap();
3704                assert_eq!(
3705                    channel_messages(channel),
3706                    &[
3707                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3708                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3709                        ("user_a".to_string(), "sup".to_string(), true)
3710                    ]
3711                );
3712                task
3713            })
3714            .await
3715            .unwrap();
3716
3717        channel_b
3718            .condition(&cx_b, |channel, _| {
3719                channel_messages(channel)
3720                    == [
3721                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3722                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3723                        ("user_a".to_string(), "sup".to_string(), false),
3724                    ]
3725            })
3726            .await;
3727
3728        assert_eq!(
3729            server
3730                .state()
3731                .await
3732                .channel(channel_id)
3733                .unwrap()
3734                .connection_ids
3735                .len(),
3736            2
3737        );
3738        cx_b.update(|_| drop(channel_b));
3739        server
3740            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3741            .await;
3742
3743        cx_a.update(|_| drop(channel_a));
3744        server
3745            .condition(|state| state.channel(channel_id).is_none())
3746            .await;
3747    }
3748
3749    #[gpui::test(iterations = 10)]
3750    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3751        cx_a.foreground().forbid_parking();
3752
3753        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3754        let client_a = server.create_client(cx_a, "user_a").await;
3755
3756        let db = &server.app_state.db;
3757        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3758        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3759        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3760            .await
3761            .unwrap();
3762        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3763            .await
3764            .unwrap();
3765
3766        let channels_a = cx_a
3767            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3768        channels_a
3769            .condition(cx_a, |list, _| list.available_channels().is_some())
3770            .await;
3771        let channel_a = channels_a.update(cx_a, |this, cx| {
3772            this.get_channel(channel_id.to_proto(), cx).unwrap()
3773        });
3774
3775        // Messages aren't allowed to be too long.
3776        channel_a
3777            .update(cx_a, |channel, cx| {
3778                let long_body = "this is long.\n".repeat(1024);
3779                channel.send_message(long_body, cx).unwrap()
3780            })
3781            .await
3782            .unwrap_err();
3783
3784        // Messages aren't allowed to be blank.
3785        channel_a.update(cx_a, |channel, cx| {
3786            channel.send_message(String::new(), cx).unwrap_err()
3787        });
3788
3789        // Leading and trailing whitespace are trimmed.
3790        channel_a
3791            .update(cx_a, |channel, cx| {
3792                channel
3793                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3794                    .unwrap()
3795            })
3796            .await
3797            .unwrap();
3798        assert_eq!(
3799            db.get_channel_messages(channel_id, 10, None)
3800                .await
3801                .unwrap()
3802                .iter()
3803                .map(|m| &m.body)
3804                .collect::<Vec<_>>(),
3805            &["surrounded by whitespace"]
3806        );
3807    }
3808
3809    #[gpui::test(iterations = 10)]
3810    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3811        cx_a.foreground().forbid_parking();
3812
3813        // Connect to a server as 2 clients.
3814        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3815        let client_a = server.create_client(cx_a, "user_a").await;
3816        let client_b = server.create_client(cx_b, "user_b").await;
3817        let mut status_b = client_b.status();
3818
3819        // Create an org that includes these 2 users.
3820        let db = &server.app_state.db;
3821        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3822        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3823            .await
3824            .unwrap();
3825        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3826            .await
3827            .unwrap();
3828
3829        // Create a channel that includes all the users.
3830        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3831        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3832            .await
3833            .unwrap();
3834        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3835            .await
3836            .unwrap();
3837        db.create_channel_message(
3838            channel_id,
3839            client_b.current_user_id(&cx_b),
3840            "hello A, it's B.",
3841            OffsetDateTime::now_utc(),
3842            2,
3843        )
3844        .await
3845        .unwrap();
3846
3847        let channels_a = cx_a
3848            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3849        channels_a
3850            .condition(cx_a, |list, _| list.available_channels().is_some())
3851            .await;
3852
3853        channels_a.read_with(cx_a, |list, _| {
3854            assert_eq!(
3855                list.available_channels().unwrap(),
3856                &[ChannelDetails {
3857                    id: channel_id.to_proto(),
3858                    name: "test-channel".to_string()
3859                }]
3860            )
3861        });
3862        let channel_a = channels_a.update(cx_a, |this, cx| {
3863            this.get_channel(channel_id.to_proto(), cx).unwrap()
3864        });
3865        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3866        channel_a
3867            .condition(&cx_a, |channel, _| {
3868                channel_messages(channel)
3869                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3870            })
3871            .await;
3872
3873        let channels_b = cx_b
3874            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3875        channels_b
3876            .condition(cx_b, |list, _| list.available_channels().is_some())
3877            .await;
3878        channels_b.read_with(cx_b, |list, _| {
3879            assert_eq!(
3880                list.available_channels().unwrap(),
3881                &[ChannelDetails {
3882                    id: channel_id.to_proto(),
3883                    name: "test-channel".to_string()
3884                }]
3885            )
3886        });
3887
3888        let channel_b = channels_b.update(cx_b, |this, cx| {
3889            this.get_channel(channel_id.to_proto(), cx).unwrap()
3890        });
3891        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3892        channel_b
3893            .condition(&cx_b, |channel, _| {
3894                channel_messages(channel)
3895                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3896            })
3897            .await;
3898
3899        // Disconnect client B, ensuring we can still access its cached channel data.
3900        server.forbid_connections();
3901        server.disconnect_client(client_b.current_user_id(&cx_b));
3902        cx_b.foreground().advance_clock(Duration::from_secs(3));
3903        while !matches!(
3904            status_b.next().await,
3905            Some(client::Status::ReconnectionError { .. })
3906        ) {}
3907
3908        channels_b.read_with(cx_b, |channels, _| {
3909            assert_eq!(
3910                channels.available_channels().unwrap(),
3911                [ChannelDetails {
3912                    id: channel_id.to_proto(),
3913                    name: "test-channel".to_string()
3914                }]
3915            )
3916        });
3917        channel_b.read_with(cx_b, |channel, _| {
3918            assert_eq!(
3919                channel_messages(channel),
3920                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3921            )
3922        });
3923
3924        // Send a message from client B while it is disconnected.
3925        channel_b
3926            .update(cx_b, |channel, cx| {
3927                let task = channel
3928                    .send_message("can you see this?".to_string(), cx)
3929                    .unwrap();
3930                assert_eq!(
3931                    channel_messages(channel),
3932                    &[
3933                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3934                        ("user_b".to_string(), "can you see this?".to_string(), true)
3935                    ]
3936                );
3937                task
3938            })
3939            .await
3940            .unwrap_err();
3941
3942        // Send a message from client A while B is disconnected.
3943        channel_a
3944            .update(cx_a, |channel, cx| {
3945                channel
3946                    .send_message("oh, hi B.".to_string(), cx)
3947                    .unwrap()
3948                    .detach();
3949                let task = channel.send_message("sup".to_string(), cx).unwrap();
3950                assert_eq!(
3951                    channel_messages(channel),
3952                    &[
3953                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3954                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3955                        ("user_a".to_string(), "sup".to_string(), true)
3956                    ]
3957                );
3958                task
3959            })
3960            .await
3961            .unwrap();
3962
3963        // Give client B a chance to reconnect.
3964        server.allow_connections();
3965        cx_b.foreground().advance_clock(Duration::from_secs(10));
3966
3967        // Verify that B sees the new messages upon reconnection, as well as the message client B
3968        // sent while offline.
3969        channel_b
3970            .condition(&cx_b, |channel, _| {
3971                channel_messages(channel)
3972                    == [
3973                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3974                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3975                        ("user_a".to_string(), "sup".to_string(), false),
3976                        ("user_b".to_string(), "can you see this?".to_string(), false),
3977                    ]
3978            })
3979            .await;
3980
3981        // Ensure client A and B can communicate normally after reconnection.
3982        channel_a
3983            .update(cx_a, |channel, cx| {
3984                channel.send_message("you online?".to_string(), cx).unwrap()
3985            })
3986            .await
3987            .unwrap();
3988        channel_b
3989            .condition(&cx_b, |channel, _| {
3990                channel_messages(channel)
3991                    == [
3992                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3993                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3994                        ("user_a".to_string(), "sup".to_string(), false),
3995                        ("user_b".to_string(), "can you see this?".to_string(), false),
3996                        ("user_a".to_string(), "you online?".to_string(), false),
3997                    ]
3998            })
3999            .await;
4000
4001        channel_b
4002            .update(cx_b, |channel, cx| {
4003                channel.send_message("yep".to_string(), cx).unwrap()
4004            })
4005            .await
4006            .unwrap();
4007        channel_a
4008            .condition(&cx_a, |channel, _| {
4009                channel_messages(channel)
4010                    == [
4011                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4012                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4013                        ("user_a".to_string(), "sup".to_string(), false),
4014                        ("user_b".to_string(), "can you see this?".to_string(), false),
4015                        ("user_a".to_string(), "you online?".to_string(), false),
4016                        ("user_b".to_string(), "yep".to_string(), false),
4017                    ]
4018            })
4019            .await;
4020    }
4021
4022    #[gpui::test(iterations = 10)]
4023    async fn test_contacts(
4024        cx_a: &mut TestAppContext,
4025        cx_b: &mut TestAppContext,
4026        cx_c: &mut TestAppContext,
4027    ) {
4028        cx_a.foreground().forbid_parking();
4029        let lang_registry = Arc::new(LanguageRegistry::new());
4030        let fs = FakeFs::new(cx_a.background());
4031
4032        // Connect to a server as 3 clients.
4033        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4034        let client_a = server.create_client(cx_a, "user_a").await;
4035        let client_b = server.create_client(cx_b, "user_b").await;
4036        let client_c = server.create_client(cx_c, "user_c").await;
4037
4038        // Share a worktree as client A.
4039        fs.insert_tree(
4040            "/a",
4041            json!({
4042                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4043            }),
4044        )
4045        .await;
4046
4047        let project_a = cx_a.update(|cx| {
4048            Project::local(
4049                client_a.clone(),
4050                client_a.user_store.clone(),
4051                lang_registry.clone(),
4052                fs.clone(),
4053                cx,
4054            )
4055        });
4056        let (worktree_a, _) = project_a
4057            .update(cx_a, |p, cx| {
4058                p.find_or_create_local_worktree("/a", true, cx)
4059            })
4060            .await
4061            .unwrap();
4062        worktree_a
4063            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4064            .await;
4065
4066        client_a
4067            .user_store
4068            .condition(&cx_a, |user_store, _| {
4069                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4070            })
4071            .await;
4072        client_b
4073            .user_store
4074            .condition(&cx_b, |user_store, _| {
4075                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4076            })
4077            .await;
4078        client_c
4079            .user_store
4080            .condition(&cx_c, |user_store, _| {
4081                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4082            })
4083            .await;
4084
4085        let project_id = project_a
4086            .update(cx_a, |project, _| project.next_remote_id())
4087            .await;
4088        project_a
4089            .update(cx_a, |project, cx| project.share(cx))
4090            .await
4091            .unwrap();
4092
4093        let _project_b = Project::remote(
4094            project_id,
4095            client_b.clone(),
4096            client_b.user_store.clone(),
4097            lang_registry.clone(),
4098            fs.clone(),
4099            &mut cx_b.to_async(),
4100        )
4101        .await
4102        .unwrap();
4103
4104        client_a
4105            .user_store
4106            .condition(&cx_a, |user_store, _| {
4107                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4108            })
4109            .await;
4110        client_b
4111            .user_store
4112            .condition(&cx_b, |user_store, _| {
4113                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4114            })
4115            .await;
4116        client_c
4117            .user_store
4118            .condition(&cx_c, |user_store, _| {
4119                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4120            })
4121            .await;
4122
4123        project_a
4124            .condition(&cx_a, |project, _| {
4125                project.collaborators().contains_key(&client_b.peer_id)
4126            })
4127            .await;
4128
4129        cx_a.update(move |_| drop(project_a));
4130        client_a
4131            .user_store
4132            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4133            .await;
4134        client_b
4135            .user_store
4136            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4137            .await;
4138        client_c
4139            .user_store
4140            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4141            .await;
4142
4143        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4144            user_store
4145                .contacts()
4146                .iter()
4147                .map(|contact| {
4148                    let worktrees = contact
4149                        .projects
4150                        .iter()
4151                        .map(|p| {
4152                            (
4153                                p.worktree_root_names[0].as_str(),
4154                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4155                            )
4156                        })
4157                        .collect();
4158                    (contact.user.github_login.as_str(), worktrees)
4159                })
4160                .collect()
4161        }
4162    }
4163
4164    #[gpui::test(iterations = 100)]
4165    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4166        cx.foreground().forbid_parking();
4167        let max_peers = env::var("MAX_PEERS")
4168            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4169            .unwrap_or(5);
4170        let max_operations = env::var("OPERATIONS")
4171            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4172            .unwrap_or(10);
4173
4174        let rng = Arc::new(Mutex::new(rng));
4175
4176        let guest_lang_registry = Arc::new(LanguageRegistry::new());
4177        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4178
4179        let fs = FakeFs::new(cx.background());
4180        fs.insert_tree(
4181            "/_collab",
4182            json!({
4183                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4184            }),
4185        )
4186        .await;
4187
4188        let operations = Rc::new(Cell::new(0));
4189        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4190        let mut clients = Vec::new();
4191
4192        let mut next_entity_id = 100000;
4193        let mut host_cx = TestAppContext::new(
4194            cx.foreground_platform(),
4195            cx.platform(),
4196            cx.foreground(),
4197            cx.background(),
4198            cx.font_cache(),
4199            cx.leak_detector(),
4200            next_entity_id,
4201        );
4202        let host = server.create_client(&mut host_cx, "host").await;
4203        let host_project = host_cx.update(|cx| {
4204            Project::local(
4205                host.client.clone(),
4206                host.user_store.clone(),
4207                Arc::new(LanguageRegistry::new()),
4208                fs.clone(),
4209                cx,
4210            )
4211        });
4212        let host_project_id = host_project
4213            .update(&mut host_cx, |p, _| p.next_remote_id())
4214            .await;
4215
4216        let (collab_worktree, _) = host_project
4217            .update(&mut host_cx, |project, cx| {
4218                project.find_or_create_local_worktree("/_collab", true, cx)
4219            })
4220            .await
4221            .unwrap();
4222        collab_worktree
4223            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4224            .await;
4225        host_project
4226            .update(&mut host_cx, |project, cx| project.share(cx))
4227            .await
4228            .unwrap();
4229
4230        clients.push(cx.foreground().spawn(host.simulate_host(
4231            host_project,
4232            language_server_config,
4233            operations.clone(),
4234            max_operations,
4235            rng.clone(),
4236            host_cx,
4237        )));
4238
4239        while operations.get() < max_operations {
4240            cx.background().simulate_random_delay().await;
4241            if clients.len() >= max_peers {
4242                break;
4243            } else if rng.lock().gen_bool(0.05) {
4244                operations.set(operations.get() + 1);
4245
4246                let guest_id = clients.len();
4247                log::info!("Adding guest {}", guest_id);
4248                next_entity_id += 100000;
4249                let mut guest_cx = TestAppContext::new(
4250                    cx.foreground_platform(),
4251                    cx.platform(),
4252                    cx.foreground(),
4253                    cx.background(),
4254                    cx.font_cache(),
4255                    cx.leak_detector(),
4256                    next_entity_id,
4257                );
4258                let guest = server
4259                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4260                    .await;
4261                let guest_project = Project::remote(
4262                    host_project_id,
4263                    guest.client.clone(),
4264                    guest.user_store.clone(),
4265                    guest_lang_registry.clone(),
4266                    FakeFs::new(cx.background()),
4267                    &mut guest_cx.to_async(),
4268                )
4269                .await
4270                .unwrap();
4271                clients.push(cx.foreground().spawn(guest.simulate_guest(
4272                    guest_id,
4273                    guest_project,
4274                    operations.clone(),
4275                    max_operations,
4276                    rng.clone(),
4277                    guest_cx,
4278                )));
4279
4280                log::info!("Guest {} added", guest_id);
4281            }
4282        }
4283
4284        let mut clients = futures::future::join_all(clients).await;
4285        cx.foreground().run_until_parked();
4286
4287        let (host_client, mut host_cx) = clients.remove(0);
4288        let host_project = host_client.project.as_ref().unwrap();
4289        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4290            project
4291                .worktrees(cx)
4292                .map(|worktree| {
4293                    let snapshot = worktree.read(cx).snapshot();
4294                    (snapshot.id(), snapshot)
4295                })
4296                .collect::<BTreeMap<_, _>>()
4297        });
4298
4299        host_client
4300            .project
4301            .as_ref()
4302            .unwrap()
4303            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4304
4305        for (guest_client, mut guest_cx) in clients.into_iter() {
4306            let guest_id = guest_client.client.id();
4307            let worktree_snapshots =
4308                guest_client
4309                    .project
4310                    .as_ref()
4311                    .unwrap()
4312                    .read_with(&guest_cx, |project, cx| {
4313                        project
4314                            .worktrees(cx)
4315                            .map(|worktree| {
4316                                let worktree = worktree.read(cx);
4317                                (worktree.id(), worktree.snapshot())
4318                            })
4319                            .collect::<BTreeMap<_, _>>()
4320                    });
4321
4322            assert_eq!(
4323                worktree_snapshots.keys().collect::<Vec<_>>(),
4324                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4325                "guest {} has different worktrees than the host",
4326                guest_id
4327            );
4328            for (id, host_snapshot) in &host_worktree_snapshots {
4329                let guest_snapshot = &worktree_snapshots[id];
4330                assert_eq!(
4331                    guest_snapshot.root_name(),
4332                    host_snapshot.root_name(),
4333                    "guest {} has different root name than the host for worktree {}",
4334                    guest_id,
4335                    id
4336                );
4337                assert_eq!(
4338                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4339                    host_snapshot.entries(false).collect::<Vec<_>>(),
4340                    "guest {} has different snapshot than the host for worktree {}",
4341                    guest_id,
4342                    id
4343                );
4344            }
4345
4346            guest_client
4347                .project
4348                .as_ref()
4349                .unwrap()
4350                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4351
4352            for guest_buffer in &guest_client.buffers {
4353                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4354                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4355                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4356                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4357                        guest_id, guest_client.peer_id, buffer_id
4358                    ))
4359                });
4360                let path = host_buffer
4361                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4362
4363                assert_eq!(
4364                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4365                    0,
4366                    "guest {}, buffer {}, path {:?} has deferred operations",
4367                    guest_id,
4368                    buffer_id,
4369                    path,
4370                );
4371                assert_eq!(
4372                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4373                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4374                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4375                    guest_id,
4376                    buffer_id,
4377                    path
4378                );
4379            }
4380
4381            guest_cx.update(|_| drop(guest_client));
4382        }
4383
4384        host_cx.update(|_| drop(host_client));
4385    }
4386
4387    struct TestServer {
4388        peer: Arc<Peer>,
4389        app_state: Arc<AppState>,
4390        server: Arc<Server>,
4391        foreground: Rc<executor::Foreground>,
4392        notifications: mpsc::UnboundedReceiver<()>,
4393        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4394        forbid_connections: Arc<AtomicBool>,
4395        _test_db: TestDb,
4396    }
4397
4398    impl TestServer {
4399        async fn start(
4400            foreground: Rc<executor::Foreground>,
4401            background: Arc<executor::Background>,
4402        ) -> Self {
4403            let test_db = TestDb::fake(background);
4404            let app_state = Self::build_app_state(&test_db).await;
4405            let peer = Peer::new();
4406            let notifications = mpsc::unbounded();
4407            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4408            Self {
4409                peer,
4410                app_state,
4411                server,
4412                foreground,
4413                notifications: notifications.1,
4414                connection_killers: Default::default(),
4415                forbid_connections: Default::default(),
4416                _test_db: test_db,
4417            }
4418        }
4419
4420        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4421            let http = FakeHttpClient::with_404_response();
4422            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4423            let client_name = name.to_string();
4424            let mut client = Client::new(http.clone());
4425            let server = self.server.clone();
4426            let connection_killers = self.connection_killers.clone();
4427            let forbid_connections = self.forbid_connections.clone();
4428            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4429
4430            Arc::get_mut(&mut client)
4431                .unwrap()
4432                .override_authenticate(move |cx| {
4433                    cx.spawn(|_| async move {
4434                        let access_token = "the-token".to_string();
4435                        Ok(Credentials {
4436                            user_id: user_id.0 as u64,
4437                            access_token,
4438                        })
4439                    })
4440                })
4441                .override_establish_connection(move |credentials, cx| {
4442                    assert_eq!(credentials.user_id, user_id.0 as u64);
4443                    assert_eq!(credentials.access_token, "the-token");
4444
4445                    let server = server.clone();
4446                    let connection_killers = connection_killers.clone();
4447                    let forbid_connections = forbid_connections.clone();
4448                    let client_name = client_name.clone();
4449                    let connection_id_tx = connection_id_tx.clone();
4450                    cx.spawn(move |cx| async move {
4451                        if forbid_connections.load(SeqCst) {
4452                            Err(EstablishConnectionError::other(anyhow!(
4453                                "server is forbidding connections"
4454                            )))
4455                        } else {
4456                            let (client_conn, server_conn, kill_conn) =
4457                                Connection::in_memory(cx.background());
4458                            connection_killers.lock().insert(user_id, kill_conn);
4459                            cx.background()
4460                                .spawn(server.handle_connection(
4461                                    server_conn,
4462                                    client_name,
4463                                    user_id,
4464                                    Some(connection_id_tx),
4465                                    cx.background(),
4466                                ))
4467                                .detach();
4468                            Ok(client_conn)
4469                        }
4470                    })
4471                });
4472
4473            client
4474                .authenticate_and_connect(&cx.to_async())
4475                .await
4476                .unwrap();
4477
4478            Channel::init(&client);
4479            Project::init(&client);
4480
4481            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4482            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4483            let mut authed_user =
4484                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
4485            while authed_user.next().await.unwrap().is_none() {}
4486
4487            TestClient {
4488                client,
4489                peer_id,
4490                user_store,
4491                project: Default::default(),
4492                buffers: Default::default(),
4493            }
4494        }
4495
4496        fn disconnect_client(&self, user_id: UserId) {
4497            self.connection_killers.lock().remove(&user_id);
4498        }
4499
4500        fn forbid_connections(&self) {
4501            self.forbid_connections.store(true, SeqCst);
4502        }
4503
4504        fn allow_connections(&self) {
4505            self.forbid_connections.store(false, SeqCst);
4506        }
4507
4508        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4509            let mut config = Config::default();
4510            config.session_secret = "a".repeat(32);
4511            config.database_url = test_db.url.clone();
4512            let github_client = github::AppClient::test();
4513            Arc::new(AppState {
4514                db: test_db.db().clone(),
4515                handlebars: Default::default(),
4516                auth_client: auth::build_client("", ""),
4517                repo_client: github::RepoClient::test(&github_client),
4518                github_client,
4519                config,
4520            })
4521        }
4522
4523        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4524            self.server.store.read()
4525        }
4526
4527        async fn condition<F>(&mut self, mut predicate: F)
4528        where
4529            F: FnMut(&Store) -> bool,
4530        {
4531            async_std::future::timeout(Duration::from_millis(500), async {
4532                while !(predicate)(&*self.server.store.read()) {
4533                    self.foreground.start_waiting();
4534                    self.notifications.next().await;
4535                    self.foreground.finish_waiting();
4536                }
4537            })
4538            .await
4539            .expect("condition timed out");
4540        }
4541    }
4542
4543    impl Drop for TestServer {
4544        fn drop(&mut self) {
4545            self.peer.reset();
4546        }
4547    }
4548
4549    struct TestClient {
4550        client: Arc<Client>,
4551        pub peer_id: PeerId,
4552        pub user_store: ModelHandle<UserStore>,
4553        project: Option<ModelHandle<Project>>,
4554        buffers: HashSet<ModelHandle<language::Buffer>>,
4555    }
4556
4557    impl Deref for TestClient {
4558        type Target = Arc<Client>;
4559
4560        fn deref(&self) -> &Self::Target {
4561            &self.client
4562        }
4563    }
4564
4565    impl TestClient {
4566        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4567            UserId::from_proto(
4568                self.user_store
4569                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4570            )
4571        }
4572
4573        fn simulate_host(
4574            mut self,
4575            project: ModelHandle<Project>,
4576            mut language_server_config: LanguageServerConfig,
4577            operations: Rc<Cell<usize>>,
4578            max_operations: usize,
4579            rng: Arc<Mutex<StdRng>>,
4580            mut cx: TestAppContext,
4581        ) -> impl Future<Output = (Self, TestAppContext)> {
4582            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4583
4584            // Set up a fake language server.
4585            language_server_config.set_fake_initializer({
4586                let rng = rng.clone();
4587                let files = files.clone();
4588                let project = project.downgrade();
4589                move |fake_server| {
4590                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4591                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4592                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4593                                range: lsp::Range::new(
4594                                    lsp::Position::new(0, 0),
4595                                    lsp::Position::new(0, 0),
4596                                ),
4597                                new_text: "the-new-text".to_string(),
4598                            })),
4599                            ..Default::default()
4600                        }]))
4601                    });
4602
4603                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4604                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4605                            lsp::CodeAction {
4606                                title: "the-code-action".to_string(),
4607                                ..Default::default()
4608                            },
4609                        )])
4610                    });
4611
4612                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4613                        |params, _| {
4614                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4615                                params.position,
4616                                params.position,
4617                            )))
4618                        },
4619                    );
4620
4621                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4622                        let files = files.clone();
4623                        let rng = rng.clone();
4624                        move |_, _| {
4625                            let files = files.lock();
4626                            let mut rng = rng.lock();
4627                            let count = rng.gen_range::<usize, _>(1..3);
4628                            let files = (0..count)
4629                                .map(|_| files.choose(&mut *rng).unwrap())
4630                                .collect::<Vec<_>>();
4631                            log::info!("LSP: Returning definitions in files {:?}", &files);
4632                            Some(lsp::GotoDefinitionResponse::Array(
4633                                files
4634                                    .into_iter()
4635                                    .map(|file| lsp::Location {
4636                                        uri: lsp::Url::from_file_path(file).unwrap(),
4637                                        range: Default::default(),
4638                                    })
4639                                    .collect(),
4640                            ))
4641                        }
4642                    });
4643
4644                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4645                        let rng = rng.clone();
4646                        let project = project.clone();
4647                        move |params, mut cx| {
4648                            if let Some(project) = project.upgrade(&cx) {
4649                                project.update(&mut cx, |project, cx| {
4650                                    let path = params
4651                                        .text_document_position_params
4652                                        .text_document
4653                                        .uri
4654                                        .to_file_path()
4655                                        .unwrap();
4656                                    let (worktree, relative_path) =
4657                                        project.find_local_worktree(&path, cx)?;
4658                                    let project_path =
4659                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4660                                    let buffer =
4661                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4662
4663                                    let mut highlights = Vec::new();
4664                                    let highlight_count = rng.lock().gen_range(1..=5);
4665                                    let mut prev_end = 0;
4666                                    for _ in 0..highlight_count {
4667                                        let range =
4668                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4669                                        let start = buffer
4670                                            .offset_to_point_utf16(range.start)
4671                                            .to_lsp_position();
4672                                        let end = buffer
4673                                            .offset_to_point_utf16(range.end)
4674                                            .to_lsp_position();
4675                                        highlights.push(lsp::DocumentHighlight {
4676                                            range: lsp::Range::new(start, end),
4677                                            kind: Some(lsp::DocumentHighlightKind::READ),
4678                                        });
4679                                        prev_end = range.end;
4680                                    }
4681                                    Some(highlights)
4682                                })
4683                            } else {
4684                                None
4685                            }
4686                        }
4687                    });
4688                }
4689            });
4690
4691            project.update(&mut cx, |project, _| {
4692                project.languages().add(Arc::new(Language::new(
4693                    LanguageConfig {
4694                        name: "Rust".into(),
4695                        path_suffixes: vec!["rs".to_string()],
4696                        language_server: Some(language_server_config),
4697                        ..Default::default()
4698                    },
4699                    None,
4700                )));
4701            });
4702
4703            async move {
4704                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4705                while operations.get() < max_operations {
4706                    operations.set(operations.get() + 1);
4707
4708                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4709                    match distribution {
4710                        0..=20 if !files.lock().is_empty() => {
4711                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4712                            let mut path = path.as_path();
4713                            while let Some(parent_path) = path.parent() {
4714                                path = parent_path;
4715                                if rng.lock().gen() {
4716                                    break;
4717                                }
4718                            }
4719
4720                            log::info!("Host: find/create local worktree {:?}", path);
4721                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4722                                project.find_or_create_local_worktree(path, true, cx)
4723                            });
4724                            let find_or_create_worktree = async move {
4725                                find_or_create_worktree.await.unwrap();
4726                            };
4727                            if rng.lock().gen() {
4728                                cx.background().spawn(find_or_create_worktree).detach();
4729                            } else {
4730                                find_or_create_worktree.await;
4731                            }
4732                        }
4733                        10..=80 if !files.lock().is_empty() => {
4734                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4735                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4736                                let (worktree, path) = project
4737                                    .update(&mut cx, |project, cx| {
4738                                        project.find_or_create_local_worktree(
4739                                            file.clone(),
4740                                            true,
4741                                            cx,
4742                                        )
4743                                    })
4744                                    .await
4745                                    .unwrap();
4746                                let project_path =
4747                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4748                                log::info!(
4749                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4750                                    file,
4751                                    project_path.0,
4752                                    project_path.1
4753                                );
4754                                let buffer = project
4755                                    .update(&mut cx, |project, cx| {
4756                                        project.open_buffer(project_path, cx)
4757                                    })
4758                                    .await
4759                                    .unwrap();
4760                                self.buffers.insert(buffer.clone());
4761                                buffer
4762                            } else {
4763                                self.buffers
4764                                    .iter()
4765                                    .choose(&mut *rng.lock())
4766                                    .unwrap()
4767                                    .clone()
4768                            };
4769
4770                            if rng.lock().gen_bool(0.1) {
4771                                cx.update(|cx| {
4772                                    log::info!(
4773                                        "Host: dropping buffer {:?}",
4774                                        buffer.read(cx).file().unwrap().full_path(cx)
4775                                    );
4776                                    self.buffers.remove(&buffer);
4777                                    drop(buffer);
4778                                });
4779                            } else {
4780                                buffer.update(&mut cx, |buffer, cx| {
4781                                    log::info!(
4782                                        "Host: updating buffer {:?} ({})",
4783                                        buffer.file().unwrap().full_path(cx),
4784                                        buffer.remote_id()
4785                                    );
4786                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4787                                });
4788                            }
4789                        }
4790                        _ => loop {
4791                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4792                            let mut path = PathBuf::new();
4793                            path.push("/");
4794                            for _ in 0..path_component_count {
4795                                let letter = rng.lock().gen_range(b'a'..=b'z');
4796                                path.push(std::str::from_utf8(&[letter]).unwrap());
4797                            }
4798                            path.set_extension("rs");
4799                            let parent_path = path.parent().unwrap();
4800
4801                            log::info!("Host: creating file {:?}", path,);
4802
4803                            if fs.create_dir(&parent_path).await.is_ok()
4804                                && fs.create_file(&path, Default::default()).await.is_ok()
4805                            {
4806                                files.lock().push(path);
4807                                break;
4808                            } else {
4809                                log::info!("Host: cannot create file");
4810                            }
4811                        },
4812                    }
4813
4814                    cx.background().simulate_random_delay().await;
4815                }
4816
4817                log::info!("Host done");
4818
4819                self.project = Some(project);
4820                (self, cx)
4821            }
4822        }
4823
4824        pub async fn simulate_guest(
4825            mut self,
4826            guest_id: usize,
4827            project: ModelHandle<Project>,
4828            operations: Rc<Cell<usize>>,
4829            max_operations: usize,
4830            rng: Arc<Mutex<StdRng>>,
4831            mut cx: TestAppContext,
4832        ) -> (Self, TestAppContext) {
4833            while operations.get() < max_operations {
4834                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4835                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4836                        project
4837                            .worktrees(&cx)
4838                            .filter(|worktree| {
4839                                let worktree = worktree.read(cx);
4840                                worktree.is_visible()
4841                                    && worktree.entries(false).any(|e| e.is_file())
4842                            })
4843                            .choose(&mut *rng.lock())
4844                    }) {
4845                        worktree
4846                    } else {
4847                        cx.background().simulate_random_delay().await;
4848                        continue;
4849                    };
4850
4851                    operations.set(operations.get() + 1);
4852                    let (worktree_root_name, project_path) =
4853                        worktree.read_with(&cx, |worktree, _| {
4854                            let entry = worktree
4855                                .entries(false)
4856                                .filter(|e| e.is_file())
4857                                .choose(&mut *rng.lock())
4858                                .unwrap();
4859                            (
4860                                worktree.root_name().to_string(),
4861                                (worktree.id(), entry.path.clone()),
4862                            )
4863                        });
4864                    log::info!(
4865                        "Guest {}: opening path {:?} in worktree {} ({})",
4866                        guest_id,
4867                        project_path.1,
4868                        project_path.0,
4869                        worktree_root_name,
4870                    );
4871                    let buffer = project
4872                        .update(&mut cx, |project, cx| {
4873                            project.open_buffer(project_path.clone(), cx)
4874                        })
4875                        .await
4876                        .unwrap();
4877                    log::info!(
4878                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4879                        guest_id,
4880                        project_path.1,
4881                        project_path.0,
4882                        worktree_root_name,
4883                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4884                    );
4885                    self.buffers.insert(buffer.clone());
4886                    buffer
4887                } else {
4888                    operations.set(operations.get() + 1);
4889
4890                    self.buffers
4891                        .iter()
4892                        .choose(&mut *rng.lock())
4893                        .unwrap()
4894                        .clone()
4895                };
4896
4897                let choice = rng.lock().gen_range(0..100);
4898                match choice {
4899                    0..=9 => {
4900                        cx.update(|cx| {
4901                            log::info!(
4902                                "Guest {}: dropping buffer {:?}",
4903                                guest_id,
4904                                buffer.read(cx).file().unwrap().full_path(cx)
4905                            );
4906                            self.buffers.remove(&buffer);
4907                            drop(buffer);
4908                        });
4909                    }
4910                    10..=19 => {
4911                        let completions = project.update(&mut cx, |project, cx| {
4912                            log::info!(
4913                                "Guest {}: requesting completions for buffer {} ({:?})",
4914                                guest_id,
4915                                buffer.read(cx).remote_id(),
4916                                buffer.read(cx).file().unwrap().full_path(cx)
4917                            );
4918                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4919                            project.completions(&buffer, offset, cx)
4920                        });
4921                        let completions = cx.background().spawn(async move {
4922                            completions.await.expect("completions request failed");
4923                        });
4924                        if rng.lock().gen_bool(0.3) {
4925                            log::info!("Guest {}: detaching completions request", guest_id);
4926                            completions.detach();
4927                        } else {
4928                            completions.await;
4929                        }
4930                    }
4931                    20..=29 => {
4932                        let code_actions = project.update(&mut cx, |project, cx| {
4933                            log::info!(
4934                                "Guest {}: requesting code actions for buffer {} ({:?})",
4935                                guest_id,
4936                                buffer.read(cx).remote_id(),
4937                                buffer.read(cx).file().unwrap().full_path(cx)
4938                            );
4939                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4940                            project.code_actions(&buffer, range, cx)
4941                        });
4942                        let code_actions = cx.background().spawn(async move {
4943                            code_actions.await.expect("code actions request failed");
4944                        });
4945                        if rng.lock().gen_bool(0.3) {
4946                            log::info!("Guest {}: detaching code actions request", guest_id);
4947                            code_actions.detach();
4948                        } else {
4949                            code_actions.await;
4950                        }
4951                    }
4952                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4953                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4954                            log::info!(
4955                                "Guest {}: saving buffer {} ({:?})",
4956                                guest_id,
4957                                buffer.remote_id(),
4958                                buffer.file().unwrap().full_path(cx)
4959                            );
4960                            (buffer.version(), buffer.save(cx))
4961                        });
4962                        let save = cx.background().spawn(async move {
4963                            let (saved_version, _) = save.await.expect("save request failed");
4964                            assert!(saved_version.observed_all(&requested_version));
4965                        });
4966                        if rng.lock().gen_bool(0.3) {
4967                            log::info!("Guest {}: detaching save request", guest_id);
4968                            save.detach();
4969                        } else {
4970                            save.await;
4971                        }
4972                    }
4973                    40..=44 => {
4974                        let prepare_rename = project.update(&mut cx, |project, cx| {
4975                            log::info!(
4976                                "Guest {}: preparing rename for buffer {} ({:?})",
4977                                guest_id,
4978                                buffer.read(cx).remote_id(),
4979                                buffer.read(cx).file().unwrap().full_path(cx)
4980                            );
4981                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4982                            project.prepare_rename(buffer, offset, cx)
4983                        });
4984                        let prepare_rename = cx.background().spawn(async move {
4985                            prepare_rename.await.expect("prepare rename request failed");
4986                        });
4987                        if rng.lock().gen_bool(0.3) {
4988                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4989                            prepare_rename.detach();
4990                        } else {
4991                            prepare_rename.await;
4992                        }
4993                    }
4994                    45..=49 => {
4995                        let definitions = project.update(&mut cx, |project, cx| {
4996                            log::info!(
4997                                "Guest {}: requesting definitions for buffer {} ({:?})",
4998                                guest_id,
4999                                buffer.read(cx).remote_id(),
5000                                buffer.read(cx).file().unwrap().full_path(cx)
5001                            );
5002                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5003                            project.definition(&buffer, offset, cx)
5004                        });
5005                        let definitions = cx.background().spawn(async move {
5006                            definitions.await.expect("definitions request failed")
5007                        });
5008                        if rng.lock().gen_bool(0.3) {
5009                            log::info!("Guest {}: detaching definitions request", guest_id);
5010                            definitions.detach();
5011                        } else {
5012                            self.buffers
5013                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5014                        }
5015                    }
5016                    50..=54 => {
5017                        let highlights = project.update(&mut cx, |project, cx| {
5018                            log::info!(
5019                                "Guest {}: requesting highlights for buffer {} ({:?})",
5020                                guest_id,
5021                                buffer.read(cx).remote_id(),
5022                                buffer.read(cx).file().unwrap().full_path(cx)
5023                            );
5024                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5025                            project.document_highlights(&buffer, offset, cx)
5026                        });
5027                        let highlights = cx.background().spawn(async move {
5028                            highlights.await.expect("highlights request failed");
5029                        });
5030                        if rng.lock().gen_bool(0.3) {
5031                            log::info!("Guest {}: detaching highlights request", guest_id);
5032                            highlights.detach();
5033                        } else {
5034                            highlights.await;
5035                        }
5036                    }
5037                    55..=59 => {
5038                        let search = project.update(&mut cx, |project, cx| {
5039                            let query = rng.lock().gen_range('a'..='z');
5040                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5041                            project.search(SearchQuery::text(query, false, false), cx)
5042                        });
5043                        let search = cx
5044                            .background()
5045                            .spawn(async move { search.await.expect("search request failed") });
5046                        if rng.lock().gen_bool(0.3) {
5047                            log::info!("Guest {}: detaching search request", guest_id);
5048                            search.detach();
5049                        } else {
5050                            self.buffers.extend(search.await.into_keys());
5051                        }
5052                    }
5053                    _ => {
5054                        buffer.update(&mut cx, |buffer, cx| {
5055                            log::info!(
5056                                "Guest {}: updating buffer {} ({:?})",
5057                                guest_id,
5058                                buffer.remote_id(),
5059                                buffer.file().unwrap().full_path(cx)
5060                            );
5061                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5062                        });
5063                    }
5064                }
5065                cx.background().simulate_random_delay().await;
5066            }
5067
5068            log::info!("Guest {} done", guest_id);
5069
5070            self.project = Some(project);
5071            (self, cx)
5072        }
5073    }
5074
5075    impl Drop for TestClient {
5076        fn drop(&mut self) {
5077            self.client.tear_down();
5078        }
5079    }
5080
5081    impl Executor for Arc<gpui::executor::Background> {
5082        type Timer = gpui::executor::Timer;
5083
5084        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5085            self.spawn(future).detach();
5086        }
5087
5088        fn timer(&self, duration: Duration) -> Self::Timer {
5089            self.as_ref().timer(duration)
5090        }
5091    }
5092
5093    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5094        channel
5095            .messages()
5096            .cursor::<()>()
5097            .map(|m| {
5098                (
5099                    m.sender.github_login.clone(),
5100                    m.body.clone(),
5101                    m.is_pending(),
5102                )
5103            })
5104            .collect()
5105    }
5106
5107    struct EmptyView;
5108
5109    impl gpui::Entity for EmptyView {
5110        type Event = ();
5111    }
5112
5113    impl gpui::View for EmptyView {
5114        fn ui_name() -> &'static str {
5115            "empty view"
5116        }
5117
5118        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5119            gpui::Element::boxed(gpui::elements::Empty)
5120        }
5121    }
5122}