rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::update_diagnostic_summary)
  87            .add_message_handler(Server::lsp_event)
  88            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  89            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  90            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  91            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  93            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBuffer>)
  95            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  96            .add_request_handler(
  97                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  98            )
  99            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 100            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 101            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 102            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 104            .add_request_handler(Server::update_buffer)
 105            .add_message_handler(Server::update_buffer_file)
 106            .add_message_handler(Server::buffer_reloaded)
 107            .add_message_handler(Server::buffer_saved)
 108            .add_request_handler(Server::save_buffer)
 109            .add_request_handler(Server::get_channels)
 110            .add_request_handler(Server::get_users)
 111            .add_request_handler(Server::join_channel)
 112            .add_message_handler(Server::leave_channel)
 113            .add_request_handler(Server::send_channel_message)
 114            .add_request_handler(Server::get_channel_messages);
 115
 116        Arc::new(server)
 117    }
 118
 119    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 120    where
 121        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 122        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 123        M: EnvelopedMessage,
 124    {
 125        let prev_handler = self.handlers.insert(
 126            TypeId::of::<M>(),
 127            Box::new(move |server, envelope| {
 128                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 129                (handler)(server, *envelope).boxed()
 130            }),
 131        );
 132        if prev_handler.is_some() {
 133            panic!("registered a handler for the same message twice");
 134        }
 135        self
 136    }
 137
 138    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 139    where
 140        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 141        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 142        M: RequestMessage,
 143    {
 144        self.add_message_handler(move |server, envelope| {
 145            let receipt = envelope.receipt();
 146            let response = (handler)(server.clone(), envelope);
 147            async move {
 148                match response.await {
 149                    Ok(response) => {
 150                        server.peer.respond(receipt, response)?;
 151                        Ok(())
 152                    }
 153                    Err(error) => {
 154                        server.peer.respond_with_error(
 155                            receipt,
 156                            proto::Error {
 157                                message: error.to_string(),
 158                            },
 159                        )?;
 160                        Err(error)
 161                    }
 162                }
 163            }
 164        })
 165    }
 166
 167    pub fn handle_connection<E: Executor>(
 168        self: &Arc<Self>,
 169        connection: Connection,
 170        addr: String,
 171        user_id: UserId,
 172        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 173        executor: E,
 174    ) -> impl Future<Output = ()> {
 175        let mut this = self.clone();
 176        async move {
 177            let (connection_id, handle_io, mut incoming_rx) = this
 178                .peer
 179                .add_connection(connection, {
 180                    let executor = executor.clone();
 181                    move |duration| {
 182                        let timer = executor.timer(duration);
 183                        async move {
 184                            timer.await;
 185                        }
 186                    }
 187                })
 188                .await;
 189
 190            if let Some(send_connection_id) = send_connection_id.as_mut() {
 191                let _ = send_connection_id.send(connection_id).await;
 192            }
 193
 194            this.state_mut().add_connection(connection_id, user_id);
 195            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 196                log::error!("error updating contacts for {:?}: {}", user_id, err);
 197            }
 198
 199            let handle_io = handle_io.fuse();
 200            futures::pin_mut!(handle_io);
 201            loop {
 202                let next_message = incoming_rx.next().fuse();
 203                futures::pin_mut!(next_message);
 204                futures::select_biased! {
 205                    result = handle_io => {
 206                        if let Err(err) = result {
 207                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 208                        }
 209                        break;
 210                    }
 211                    message = next_message => {
 212                        if let Some(message) = message {
 213                            let start_time = Instant::now();
 214                            let type_name = message.payload_type_name();
 215                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 216                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 217                                let notifications = this.notifications.clone();
 218                                let is_background = message.is_background();
 219                                let handle_message = (handler)(this.clone(), message);
 220                                let handle_message = async move {
 221                                    if let Err(err) = handle_message.await {
 222                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 223                                    } else {
 224                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 225                                    }
 226                                    if let Some(mut notifications) = notifications {
 227                                        let _ = notifications.send(()).await;
 228                                    }
 229                                };
 230                                if is_background {
 231                                    executor.spawn_detached(handle_message);
 232                                } else {
 233                                    handle_message.await;
 234                                }
 235                            } else {
 236                                log::warn!("unhandled message: {}", type_name);
 237                            }
 238                        } else {
 239                            log::info!("rpc connection closed {:?}", addr);
 240                            break;
 241                        }
 242                    }
 243                }
 244            }
 245
 246            if let Err(err) = this.sign_out(connection_id).await {
 247                log::error!("error signing out connection {:?} - {:?}", addr, err);
 248            }
 249        }
 250    }
 251
 252    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 253        self.peer.disconnect(connection_id);
 254        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 255
 256        for (project_id, project) in removed_connection.hosted_projects {
 257            if let Some(share) = project.share {
 258                broadcast(
 259                    connection_id,
 260                    share.guests.keys().copied().collect(),
 261                    |conn_id| {
 262                        self.peer
 263                            .send(conn_id, proto::UnshareProject { project_id })
 264                    },
 265                )?;
 266            }
 267        }
 268
 269        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 270            broadcast(connection_id, peer_ids, |conn_id| {
 271                self.peer.send(
 272                    conn_id,
 273                    proto::RemoveProjectCollaborator {
 274                        project_id,
 275                        peer_id: connection_id.0,
 276                    },
 277                )
 278            })?;
 279        }
 280
 281        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 282        Ok(())
 283    }
 284
 285    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 286        Ok(proto::Ack {})
 287    }
 288
 289    async fn register_project(
 290        mut self: Arc<Server>,
 291        request: TypedEnvelope<proto::RegisterProject>,
 292    ) -> tide::Result<proto::RegisterProjectResponse> {
 293        let project_id = {
 294            let mut state = self.state_mut();
 295            let user_id = state.user_id_for_connection(request.sender_id)?;
 296            state.register_project(request.sender_id, user_id)
 297        };
 298        Ok(proto::RegisterProjectResponse { project_id })
 299    }
 300
 301    async fn unregister_project(
 302        mut self: Arc<Server>,
 303        request: TypedEnvelope<proto::UnregisterProject>,
 304    ) -> tide::Result<()> {
 305        let project = self
 306            .state_mut()
 307            .unregister_project(request.payload.project_id, request.sender_id)?;
 308        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 309        Ok(())
 310    }
 311
 312    async fn share_project(
 313        mut self: Arc<Server>,
 314        request: TypedEnvelope<proto::ShareProject>,
 315    ) -> tide::Result<proto::Ack> {
 316        self.state_mut()
 317            .share_project(request.payload.project_id, request.sender_id);
 318        Ok(proto::Ack {})
 319    }
 320
 321    async fn unshare_project(
 322        mut self: Arc<Server>,
 323        request: TypedEnvelope<proto::UnshareProject>,
 324    ) -> tide::Result<()> {
 325        let project_id = request.payload.project_id;
 326        let project = self
 327            .state_mut()
 328            .unshare_project(project_id, request.sender_id)?;
 329
 330        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 331            self.peer
 332                .send(conn_id, proto::UnshareProject { project_id })
 333        })?;
 334        self.update_contacts_for_users(&project.authorized_user_ids)?;
 335        Ok(())
 336    }
 337
 338    async fn join_project(
 339        mut self: Arc<Server>,
 340        request: TypedEnvelope<proto::JoinProject>,
 341    ) -> tide::Result<proto::JoinProjectResponse> {
 342        let project_id = request.payload.project_id;
 343
 344        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 345        let (response, connection_ids, contact_user_ids) = self
 346            .state_mut()
 347            .join_project(request.sender_id, user_id, project_id)
 348            .and_then(|joined| {
 349                let share = joined.project.share()?;
 350                let peer_count = share.guests.len();
 351                let mut collaborators = Vec::with_capacity(peer_count);
 352                collaborators.push(proto::Collaborator {
 353                    peer_id: joined.project.host_connection_id.0,
 354                    replica_id: 0,
 355                    user_id: joined.project.host_user_id.to_proto(),
 356                });
 357                let worktrees = share
 358                    .worktrees
 359                    .iter()
 360                    .filter_map(|(id, shared_worktree)| {
 361                        let worktree = joined.project.worktrees.get(&id)?;
 362                        Some(proto::Worktree {
 363                            id: *id,
 364                            root_name: worktree.root_name.clone(),
 365                            entries: shared_worktree.entries.values().cloned().collect(),
 366                            diagnostic_summaries: shared_worktree
 367                                .diagnostic_summaries
 368                                .values()
 369                                .cloned()
 370                                .collect(),
 371                            visible: worktree.visible,
 372                        })
 373                    })
 374                    .collect();
 375                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 376                    if *peer_conn_id != request.sender_id {
 377                        collaborators.push(proto::Collaborator {
 378                            peer_id: peer_conn_id.0,
 379                            replica_id: *peer_replica_id as u32,
 380                            user_id: peer_user_id.to_proto(),
 381                        });
 382                    }
 383                }
 384                let response = proto::JoinProjectResponse {
 385                    worktrees,
 386                    replica_id: joined.replica_id as u32,
 387                    collaborators,
 388                };
 389                let connection_ids = joined.project.connection_ids();
 390                let contact_user_ids = joined.project.authorized_user_ids();
 391                Ok((response, connection_ids, contact_user_ids))
 392            })?;
 393
 394        broadcast(request.sender_id, connection_ids, |conn_id| {
 395            self.peer.send(
 396                conn_id,
 397                proto::AddProjectCollaborator {
 398                    project_id,
 399                    collaborator: Some(proto::Collaborator {
 400                        peer_id: request.sender_id.0,
 401                        replica_id: response.replica_id,
 402                        user_id: user_id.to_proto(),
 403                    }),
 404                },
 405            )
 406        })?;
 407        self.update_contacts_for_users(&contact_user_ids)?;
 408        Ok(response)
 409    }
 410
 411    async fn leave_project(
 412        mut self: Arc<Server>,
 413        request: TypedEnvelope<proto::LeaveProject>,
 414    ) -> tide::Result<()> {
 415        let sender_id = request.sender_id;
 416        let project_id = request.payload.project_id;
 417        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 418
 419        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 420            self.peer.send(
 421                conn_id,
 422                proto::RemoveProjectCollaborator {
 423                    project_id,
 424                    peer_id: sender_id.0,
 425                },
 426            )
 427        })?;
 428        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 429
 430        Ok(())
 431    }
 432
 433    async fn register_worktree(
 434        mut self: Arc<Server>,
 435        request: TypedEnvelope<proto::RegisterWorktree>,
 436    ) -> tide::Result<proto::Ack> {
 437        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 438
 439        let mut contact_user_ids = HashSet::default();
 440        contact_user_ids.insert(host_user_id);
 441        for github_login in &request.payload.authorized_logins {
 442            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 443            contact_user_ids.insert(contact_user_id);
 444        }
 445
 446        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 447        let guest_connection_ids;
 448        {
 449            let mut state = self.state_mut();
 450            guest_connection_ids = state
 451                .read_project(request.payload.project_id, request.sender_id)?
 452                .guest_connection_ids();
 453            state.register_worktree(
 454                request.payload.project_id,
 455                request.payload.worktree_id,
 456                request.sender_id,
 457                Worktree {
 458                    authorized_user_ids: contact_user_ids.clone(),
 459                    root_name: request.payload.root_name.clone(),
 460                    visible: request.payload.visible,
 461                },
 462            )?;
 463        }
 464        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 465            self.peer
 466                .forward_send(request.sender_id, connection_id, request.payload.clone())
 467        })?;
 468        self.update_contacts_for_users(&contact_user_ids)?;
 469        Ok(proto::Ack {})
 470    }
 471
 472    async fn unregister_worktree(
 473        mut self: Arc<Server>,
 474        request: TypedEnvelope<proto::UnregisterWorktree>,
 475    ) -> tide::Result<()> {
 476        let project_id = request.payload.project_id;
 477        let worktree_id = request.payload.worktree_id;
 478        let (worktree, guest_connection_ids) =
 479            self.state_mut()
 480                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 481        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 482            self.peer.send(
 483                conn_id,
 484                proto::UnregisterWorktree {
 485                    project_id,
 486                    worktree_id,
 487                },
 488            )
 489        })?;
 490        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 491        Ok(())
 492    }
 493
 494    async fn update_worktree(
 495        mut self: Arc<Server>,
 496        request: TypedEnvelope<proto::UpdateWorktree>,
 497    ) -> tide::Result<proto::Ack> {
 498        let connection_ids = self.state_mut().update_worktree(
 499            request.sender_id,
 500            request.payload.project_id,
 501            request.payload.worktree_id,
 502            &request.payload.removed_entries,
 503            &request.payload.updated_entries,
 504        )?;
 505
 506        broadcast(request.sender_id, connection_ids, |connection_id| {
 507            self.peer
 508                .forward_send(request.sender_id, connection_id, request.payload.clone())
 509        })?;
 510
 511        Ok(proto::Ack {})
 512    }
 513
 514    async fn update_diagnostic_summary(
 515        mut self: Arc<Server>,
 516        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 517    ) -> tide::Result<()> {
 518        let summary = request
 519            .payload
 520            .summary
 521            .clone()
 522            .ok_or_else(|| anyhow!("invalid summary"))?;
 523        let receiver_ids = self.state_mut().update_diagnostic_summary(
 524            request.payload.project_id,
 525            request.payload.worktree_id,
 526            request.sender_id,
 527            summary,
 528        )?;
 529
 530        broadcast(request.sender_id, receiver_ids, |connection_id| {
 531            self.peer
 532                .forward_send(request.sender_id, connection_id, request.payload.clone())
 533        })?;
 534        Ok(())
 535    }
 536
 537    async fn lsp_event(
 538        self: Arc<Server>,
 539        request: TypedEnvelope<proto::LspEvent>,
 540    ) -> tide::Result<()> {
 541        let receiver_ids = self
 542            .state()
 543            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 544        broadcast(request.sender_id, receiver_ids, |connection_id| {
 545            self.peer
 546                .forward_send(request.sender_id, connection_id, request.payload.clone())
 547        })?;
 548        Ok(())
 549    }
 550
 551    async fn forward_project_request<T>(
 552        self: Arc<Server>,
 553        request: TypedEnvelope<T>,
 554    ) -> tide::Result<T::Response>
 555    where
 556        T: EntityMessage + RequestMessage,
 557    {
 558        let host_connection_id = self
 559            .state()
 560            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 561            .host_connection_id;
 562        Ok(self
 563            .peer
 564            .forward_request(request.sender_id, host_connection_id, request.payload)
 565            .await?)
 566    }
 567
 568    async fn save_buffer(
 569        self: Arc<Server>,
 570        request: TypedEnvelope<proto::SaveBuffer>,
 571    ) -> tide::Result<proto::BufferSaved> {
 572        let host;
 573        let mut guests;
 574        {
 575            let state = self.state();
 576            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 577            host = project.host_connection_id;
 578            guests = project.guest_connection_ids()
 579        }
 580
 581        let response = self
 582            .peer
 583            .forward_request(request.sender_id, host, request.payload.clone())
 584            .await?;
 585
 586        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 587        broadcast(host, guests, |conn_id| {
 588            self.peer.forward_send(host, conn_id, response.clone())
 589        })?;
 590
 591        Ok(response)
 592    }
 593
 594    async fn update_buffer(
 595        self: Arc<Server>,
 596        request: TypedEnvelope<proto::UpdateBuffer>,
 597    ) -> tide::Result<proto::Ack> {
 598        let receiver_ids = self
 599            .state()
 600            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 601        broadcast(request.sender_id, receiver_ids, |connection_id| {
 602            self.peer
 603                .forward_send(request.sender_id, connection_id, request.payload.clone())
 604        })?;
 605        Ok(proto::Ack {})
 606    }
 607
 608    async fn update_buffer_file(
 609        self: Arc<Server>,
 610        request: TypedEnvelope<proto::UpdateBufferFile>,
 611    ) -> tide::Result<()> {
 612        let receiver_ids = self
 613            .state()
 614            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 615        broadcast(request.sender_id, receiver_ids, |connection_id| {
 616            self.peer
 617                .forward_send(request.sender_id, connection_id, request.payload.clone())
 618        })?;
 619        Ok(())
 620    }
 621
 622    async fn buffer_reloaded(
 623        self: Arc<Server>,
 624        request: TypedEnvelope<proto::BufferReloaded>,
 625    ) -> tide::Result<()> {
 626        let receiver_ids = self
 627            .state()
 628            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 629        broadcast(request.sender_id, receiver_ids, |connection_id| {
 630            self.peer
 631                .forward_send(request.sender_id, connection_id, request.payload.clone())
 632        })?;
 633        Ok(())
 634    }
 635
 636    async fn buffer_saved(
 637        self: Arc<Server>,
 638        request: TypedEnvelope<proto::BufferSaved>,
 639    ) -> tide::Result<()> {
 640        let receiver_ids = self
 641            .state()
 642            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 643        broadcast(request.sender_id, receiver_ids, |connection_id| {
 644            self.peer
 645                .forward_send(request.sender_id, connection_id, request.payload.clone())
 646        })?;
 647        Ok(())
 648    }
 649
 650    async fn get_channels(
 651        self: Arc<Server>,
 652        request: TypedEnvelope<proto::GetChannels>,
 653    ) -> tide::Result<proto::GetChannelsResponse> {
 654        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 655        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 656        Ok(proto::GetChannelsResponse {
 657            channels: channels
 658                .into_iter()
 659                .map(|chan| proto::Channel {
 660                    id: chan.id.to_proto(),
 661                    name: chan.name,
 662                })
 663                .collect(),
 664        })
 665    }
 666
 667    async fn get_users(
 668        self: Arc<Server>,
 669        request: TypedEnvelope<proto::GetUsers>,
 670    ) -> tide::Result<proto::GetUsersResponse> {
 671        let user_ids = request
 672            .payload
 673            .user_ids
 674            .into_iter()
 675            .map(UserId::from_proto)
 676            .collect();
 677        let users = self
 678            .app_state
 679            .db
 680            .get_users_by_ids(user_ids)
 681            .await?
 682            .into_iter()
 683            .map(|user| proto::User {
 684                id: user.id.to_proto(),
 685                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 686                github_login: user.github_login,
 687            })
 688            .collect();
 689        Ok(proto::GetUsersResponse { users })
 690    }
 691
 692    fn update_contacts_for_users<'a>(
 693        self: &Arc<Server>,
 694        user_ids: impl IntoIterator<Item = &'a UserId>,
 695    ) -> anyhow::Result<()> {
 696        let mut result = Ok(());
 697        let state = self.state();
 698        for user_id in user_ids {
 699            let contacts = state.contacts_for_user(*user_id);
 700            for connection_id in state.connection_ids_for_user(*user_id) {
 701                if let Err(error) = self.peer.send(
 702                    connection_id,
 703                    proto::UpdateContacts {
 704                        contacts: contacts.clone(),
 705                    },
 706                ) {
 707                    result = Err(error);
 708                }
 709            }
 710        }
 711        result
 712    }
 713
 714    async fn join_channel(
 715        mut self: Arc<Self>,
 716        request: TypedEnvelope<proto::JoinChannel>,
 717    ) -> tide::Result<proto::JoinChannelResponse> {
 718        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 719        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 720        if !self
 721            .app_state
 722            .db
 723            .can_user_access_channel(user_id, channel_id)
 724            .await?
 725        {
 726            Err(anyhow!("access denied"))?;
 727        }
 728
 729        self.state_mut().join_channel(request.sender_id, channel_id);
 730        let messages = self
 731            .app_state
 732            .db
 733            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 734            .await?
 735            .into_iter()
 736            .map(|msg| proto::ChannelMessage {
 737                id: msg.id.to_proto(),
 738                body: msg.body,
 739                timestamp: msg.sent_at.unix_timestamp() as u64,
 740                sender_id: msg.sender_id.to_proto(),
 741                nonce: Some(msg.nonce.as_u128().into()),
 742            })
 743            .collect::<Vec<_>>();
 744        Ok(proto::JoinChannelResponse {
 745            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 746            messages,
 747        })
 748    }
 749
 750    async fn leave_channel(
 751        mut self: Arc<Self>,
 752        request: TypedEnvelope<proto::LeaveChannel>,
 753    ) -> tide::Result<()> {
 754        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 755        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 756        if !self
 757            .app_state
 758            .db
 759            .can_user_access_channel(user_id, channel_id)
 760            .await?
 761        {
 762            Err(anyhow!("access denied"))?;
 763        }
 764
 765        self.state_mut()
 766            .leave_channel(request.sender_id, channel_id);
 767
 768        Ok(())
 769    }
 770
 771    async fn send_channel_message(
 772        self: Arc<Self>,
 773        request: TypedEnvelope<proto::SendChannelMessage>,
 774    ) -> tide::Result<proto::SendChannelMessageResponse> {
 775        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 776        let user_id;
 777        let connection_ids;
 778        {
 779            let state = self.state();
 780            user_id = state.user_id_for_connection(request.sender_id)?;
 781            connection_ids = state.channel_connection_ids(channel_id)?;
 782        }
 783
 784        // Validate the message body.
 785        let body = request.payload.body.trim().to_string();
 786        if body.len() > MAX_MESSAGE_LEN {
 787            return Err(anyhow!("message is too long"))?;
 788        }
 789        if body.is_empty() {
 790            return Err(anyhow!("message can't be blank"))?;
 791        }
 792
 793        let timestamp = OffsetDateTime::now_utc();
 794        let nonce = request
 795            .payload
 796            .nonce
 797            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 798
 799        let message_id = self
 800            .app_state
 801            .db
 802            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 803            .await?
 804            .to_proto();
 805        let message = proto::ChannelMessage {
 806            sender_id: user_id.to_proto(),
 807            id: message_id,
 808            body,
 809            timestamp: timestamp.unix_timestamp() as u64,
 810            nonce: Some(nonce),
 811        };
 812        broadcast(request.sender_id, connection_ids, |conn_id| {
 813            self.peer.send(
 814                conn_id,
 815                proto::ChannelMessageSent {
 816                    channel_id: channel_id.to_proto(),
 817                    message: Some(message.clone()),
 818                },
 819            )
 820        })?;
 821        Ok(proto::SendChannelMessageResponse {
 822            message: Some(message),
 823        })
 824    }
 825
 826    async fn get_channel_messages(
 827        self: Arc<Self>,
 828        request: TypedEnvelope<proto::GetChannelMessages>,
 829    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 830        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 831        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 832        if !self
 833            .app_state
 834            .db
 835            .can_user_access_channel(user_id, channel_id)
 836            .await?
 837        {
 838            Err(anyhow!("access denied"))?;
 839        }
 840
 841        let messages = self
 842            .app_state
 843            .db
 844            .get_channel_messages(
 845                channel_id,
 846                MESSAGE_COUNT_PER_PAGE,
 847                Some(MessageId::from_proto(request.payload.before_message_id)),
 848            )
 849            .await?
 850            .into_iter()
 851            .map(|msg| proto::ChannelMessage {
 852                id: msg.id.to_proto(),
 853                body: msg.body,
 854                timestamp: msg.sent_at.unix_timestamp() as u64,
 855                sender_id: msg.sender_id.to_proto(),
 856                nonce: Some(msg.nonce.as_u128().into()),
 857            })
 858            .collect::<Vec<_>>();
 859
 860        Ok(proto::GetChannelMessagesResponse {
 861            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 862            messages,
 863        })
 864    }
 865
 866    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 867        self.store.read()
 868    }
 869
 870    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 871        self.store.write()
 872    }
 873}
 874
 875impl Executor for RealExecutor {
 876    type Timer = Timer;
 877
 878    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 879        task::spawn(future);
 880    }
 881
 882    fn timer(&self, duration: Duration) -> Self::Timer {
 883        Timer::after(duration)
 884    }
 885}
 886
 887fn broadcast<F>(
 888    sender_id: ConnectionId,
 889    receiver_ids: Vec<ConnectionId>,
 890    mut f: F,
 891) -> anyhow::Result<()>
 892where
 893    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 894{
 895    let mut result = Ok(());
 896    for receiver_id in receiver_ids {
 897        if receiver_id != sender_id {
 898            if let Err(error) = f(receiver_id) {
 899                if result.is_ok() {
 900                    result = Err(error);
 901                }
 902            }
 903        }
 904    }
 905    result
 906}
 907
 908pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 909    let server = Server::new(app.state().clone(), rpc.clone(), None);
 910    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 911        let server = server.clone();
 912        async move {
 913            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 914
 915            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 916            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 917            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 918            let client_protocol_version: Option<u32> = request
 919                .header("X-Zed-Protocol-Version")
 920                .and_then(|v| v.as_str().parse().ok());
 921
 922            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 923                return Ok(Response::new(StatusCode::UpgradeRequired));
 924            }
 925
 926            let header = match request.header("Sec-Websocket-Key") {
 927                Some(h) => h.as_str(),
 928                None => return Err(anyhow!("expected sec-websocket-key"))?,
 929            };
 930
 931            let user_id = process_auth_header(&request).await?;
 932
 933            let mut response = Response::new(StatusCode::SwitchingProtocols);
 934            response.insert_header(UPGRADE, "websocket");
 935            response.insert_header(CONNECTION, "Upgrade");
 936            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 937            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 938            response.insert_header("Sec-Websocket-Version", "13");
 939
 940            let http_res: &mut tide::http::Response = response.as_mut();
 941            let upgrade_receiver = http_res.recv_upgrade().await;
 942            let addr = request.remote().unwrap_or("unknown").to_string();
 943            task::spawn(async move {
 944                if let Some(stream) = upgrade_receiver.await {
 945                    server
 946                        .handle_connection(
 947                            Connection::new(
 948                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 949                            ),
 950                            addr,
 951                            user_id,
 952                            None,
 953                            RealExecutor,
 954                        )
 955                        .await;
 956                }
 957            });
 958
 959            Ok(response)
 960        }
 961    });
 962}
 963
 964fn header_contains_ignore_case<T>(
 965    request: &tide::Request<T>,
 966    header_name: HeaderName,
 967    value: &str,
 968) -> bool {
 969    request
 970        .header(header_name)
 971        .map(|h| {
 972            h.as_str()
 973                .split(',')
 974                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
 975        })
 976        .unwrap_or(false)
 977}
 978
 979#[cfg(test)]
 980mod tests {
 981    use super::*;
 982    use crate::{
 983        auth,
 984        db::{tests::TestDb, UserId},
 985        github, AppState, Config,
 986    };
 987    use ::rpc::Peer;
 988    use client::{
 989        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
 990        EstablishConnectionError, UserStore,
 991    };
 992    use collections::BTreeMap;
 993    use editor::{
 994        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, MultiBuffer,
 995        Redo, Rename, ToOffset, ToggleCodeActions, Undo,
 996    };
 997    use gpui::{executor, ModelHandle, TestAppContext};
 998    use language::{
 999        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1000        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1001    };
1002    use lsp;
1003    use parking_lot::Mutex;
1004    use postage::{barrier, watch};
1005    use project::{
1006        fs::{FakeFs, Fs as _},
1007        search::SearchQuery,
1008        worktree::WorktreeHandle,
1009        DiagnosticSummary, Project, ProjectPath,
1010    };
1011    use rand::prelude::*;
1012    use rpc::PeerId;
1013    use serde_json::json;
1014    use sqlx::types::time::OffsetDateTime;
1015    use std::{
1016        cell::Cell,
1017        env,
1018        ops::Deref,
1019        path::{Path, PathBuf},
1020        rc::Rc,
1021        sync::{
1022            atomic::{AtomicBool, Ordering::SeqCst},
1023            Arc,
1024        },
1025        time::Duration,
1026    };
1027    use workspace::{Settings, Workspace, WorkspaceParams};
1028
1029    #[cfg(test)]
1030    #[ctor::ctor]
1031    fn init_logger() {
1032        if std::env::var("RUST_LOG").is_ok() {
1033            env_logger::init();
1034        }
1035    }
1036
1037    #[gpui::test(iterations = 10)]
1038    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1039        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1040        let lang_registry = Arc::new(LanguageRegistry::test());
1041        let fs = FakeFs::new(cx_a.background());
1042        cx_a.foreground().forbid_parking();
1043
1044        // Connect to a server as 2 clients.
1045        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1046        let client_a = server.create_client(cx_a, "user_a").await;
1047        let client_b = server.create_client(cx_b, "user_b").await;
1048
1049        // Share a project as client A
1050        fs.insert_tree(
1051            "/a",
1052            json!({
1053                ".zed.toml": r#"collaborators = ["user_b"]"#,
1054                "a.txt": "a-contents",
1055                "b.txt": "b-contents",
1056            }),
1057        )
1058        .await;
1059        let project_a = cx_a.update(|cx| {
1060            Project::local(
1061                client_a.clone(),
1062                client_a.user_store.clone(),
1063                lang_registry.clone(),
1064                fs.clone(),
1065                cx,
1066            )
1067        });
1068        let (worktree_a, _) = project_a
1069            .update(cx_a, |p, cx| {
1070                p.find_or_create_local_worktree("/a", true, cx)
1071            })
1072            .await
1073            .unwrap();
1074        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1075        worktree_a
1076            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1077            .await;
1078        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1079        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1080
1081        // Join that project as client B
1082        let project_b = Project::remote(
1083            project_id,
1084            client_b.clone(),
1085            client_b.user_store.clone(),
1086            lang_registry.clone(),
1087            fs.clone(),
1088            &mut cx_b.to_async(),
1089        )
1090        .await
1091        .unwrap();
1092
1093        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1094            assert_eq!(
1095                project
1096                    .collaborators()
1097                    .get(&client_a.peer_id)
1098                    .unwrap()
1099                    .user
1100                    .github_login,
1101                "user_a"
1102            );
1103            project.replica_id()
1104        });
1105        project_a
1106            .condition(&cx_a, |tree, _| {
1107                tree.collaborators()
1108                    .get(&client_b.peer_id)
1109                    .map_or(false, |collaborator| {
1110                        collaborator.replica_id == replica_id_b
1111                            && collaborator.user.github_login == "user_b"
1112                    })
1113            })
1114            .await;
1115
1116        // Open the same file as client B and client A.
1117        let buffer_b = project_b
1118            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1119            .await
1120            .unwrap();
1121        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1122        buffer_b.read_with(cx_b, |buf, cx| {
1123            assert_eq!(buf.read(cx).text(), "b-contents")
1124        });
1125        project_a.read_with(cx_a, |project, cx| {
1126            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1127        });
1128        let buffer_a = project_a
1129            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1130            .await
1131            .unwrap();
1132
1133        let editor_b = cx_b.add_view(window_b, |cx| {
1134            Editor::for_buffer(
1135                buffer_b,
1136                None,
1137                watch::channel_with(Settings::test(cx)).1,
1138                cx,
1139            )
1140        });
1141
1142        // TODO
1143        // // Create a selection set as client B and see that selection set as client A.
1144        // buffer_a
1145        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1146        //     .await;
1147
1148        // Edit the buffer as client B and see that edit as client A.
1149        editor_b.update(cx_b, |editor, cx| {
1150            editor.handle_input(&Input("ok, ".into()), cx)
1151        });
1152        buffer_a
1153            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1154            .await;
1155
1156        // TODO
1157        // // Remove the selection set as client B, see those selections disappear as client A.
1158        cx_b.update(move |_| drop(editor_b));
1159        // buffer_a
1160        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1161        //     .await;
1162
1163        // Dropping the client B's project removes client B from client A's collaborators.
1164        cx_b.update(move |_| drop(project_b));
1165        project_a
1166            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1167            .await;
1168    }
1169
1170    #[gpui::test(iterations = 10)]
1171    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1172        let lang_registry = Arc::new(LanguageRegistry::test());
1173        let fs = FakeFs::new(cx_a.background());
1174        cx_a.foreground().forbid_parking();
1175
1176        // Connect to a server as 2 clients.
1177        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1178        let client_a = server.create_client(cx_a, "user_a").await;
1179        let client_b = server.create_client(cx_b, "user_b").await;
1180
1181        // Share a project as client A
1182        fs.insert_tree(
1183            "/a",
1184            json!({
1185                ".zed.toml": r#"collaborators = ["user_b"]"#,
1186                "a.txt": "a-contents",
1187                "b.txt": "b-contents",
1188            }),
1189        )
1190        .await;
1191        let project_a = cx_a.update(|cx| {
1192            Project::local(
1193                client_a.clone(),
1194                client_a.user_store.clone(),
1195                lang_registry.clone(),
1196                fs.clone(),
1197                cx,
1198            )
1199        });
1200        let (worktree_a, _) = project_a
1201            .update(cx_a, |p, cx| {
1202                p.find_or_create_local_worktree("/a", true, cx)
1203            })
1204            .await
1205            .unwrap();
1206        worktree_a
1207            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1208            .await;
1209        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1210        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1211        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1212        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1213
1214        // Join that project as client B
1215        let project_b = Project::remote(
1216            project_id,
1217            client_b.clone(),
1218            client_b.user_store.clone(),
1219            lang_registry.clone(),
1220            fs.clone(),
1221            &mut cx_b.to_async(),
1222        )
1223        .await
1224        .unwrap();
1225        project_b
1226            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1227            .await
1228            .unwrap();
1229
1230        // Unshare the project as client A
1231        project_a
1232            .update(cx_a, |project, cx| project.unshare(cx))
1233            .await
1234            .unwrap();
1235        project_b
1236            .condition(cx_b, |project, _| project.is_read_only())
1237            .await;
1238        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1239        cx_b.update(|_| {
1240            drop(project_b);
1241        });
1242
1243        // Share the project again and ensure guests can still join.
1244        project_a
1245            .update(cx_a, |project, cx| project.share(cx))
1246            .await
1247            .unwrap();
1248        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1249
1250        let project_b2 = Project::remote(
1251            project_id,
1252            client_b.clone(),
1253            client_b.user_store.clone(),
1254            lang_registry.clone(),
1255            fs.clone(),
1256            &mut cx_b.to_async(),
1257        )
1258        .await
1259        .unwrap();
1260        project_b2
1261            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1262            .await
1263            .unwrap();
1264    }
1265
1266    #[gpui::test(iterations = 10)]
1267    async fn test_propagate_saves_and_fs_changes(
1268        cx_a: &mut TestAppContext,
1269        cx_b: &mut TestAppContext,
1270        cx_c: &mut TestAppContext,
1271    ) {
1272        let lang_registry = Arc::new(LanguageRegistry::test());
1273        let fs = FakeFs::new(cx_a.background());
1274        cx_a.foreground().forbid_parking();
1275
1276        // Connect to a server as 3 clients.
1277        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1278        let client_a = server.create_client(cx_a, "user_a").await;
1279        let client_b = server.create_client(cx_b, "user_b").await;
1280        let client_c = server.create_client(cx_c, "user_c").await;
1281
1282        // Share a worktree as client A.
1283        fs.insert_tree(
1284            "/a",
1285            json!({
1286                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1287                "file1": "",
1288                "file2": ""
1289            }),
1290        )
1291        .await;
1292        let project_a = cx_a.update(|cx| {
1293            Project::local(
1294                client_a.clone(),
1295                client_a.user_store.clone(),
1296                lang_registry.clone(),
1297                fs.clone(),
1298                cx,
1299            )
1300        });
1301        let (worktree_a, _) = project_a
1302            .update(cx_a, |p, cx| {
1303                p.find_or_create_local_worktree("/a", true, cx)
1304            })
1305            .await
1306            .unwrap();
1307        worktree_a
1308            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1309            .await;
1310        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1311        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1312        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1313
1314        // Join that worktree as clients B and C.
1315        let project_b = Project::remote(
1316            project_id,
1317            client_b.clone(),
1318            client_b.user_store.clone(),
1319            lang_registry.clone(),
1320            fs.clone(),
1321            &mut cx_b.to_async(),
1322        )
1323        .await
1324        .unwrap();
1325        let project_c = Project::remote(
1326            project_id,
1327            client_c.clone(),
1328            client_c.user_store.clone(),
1329            lang_registry.clone(),
1330            fs.clone(),
1331            &mut cx_c.to_async(),
1332        )
1333        .await
1334        .unwrap();
1335        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1336        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1337
1338        // Open and edit a buffer as both guests B and C.
1339        let buffer_b = project_b
1340            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1341            .await
1342            .unwrap();
1343        let buffer_c = project_c
1344            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1345            .await
1346            .unwrap();
1347        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1348        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1349
1350        // Open and edit that buffer as the host.
1351        let buffer_a = project_a
1352            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1353            .await
1354            .unwrap();
1355
1356        buffer_a
1357            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1358            .await;
1359        buffer_a.update(cx_a, |buf, cx| {
1360            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1361        });
1362
1363        // Wait for edits to propagate
1364        buffer_a
1365            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1366            .await;
1367        buffer_b
1368            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1369            .await;
1370        buffer_c
1371            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1372            .await;
1373
1374        // Edit the buffer as the host and concurrently save as guest B.
1375        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1376        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1377        save_b.await.unwrap();
1378        assert_eq!(
1379            fs.load("/a/file1".as_ref()).await.unwrap(),
1380            "hi-a, i-am-c, i-am-b, i-am-a"
1381        );
1382        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1383        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1384        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1385
1386        worktree_a.flush_fs_events(cx_a).await;
1387
1388        // Make changes on host's file system, see those changes on guest worktrees.
1389        fs.rename(
1390            "/a/file1".as_ref(),
1391            "/a/file1-renamed".as_ref(),
1392            Default::default(),
1393        )
1394        .await
1395        .unwrap();
1396
1397        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1398            .await
1399            .unwrap();
1400        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1401
1402        worktree_a
1403            .condition(&cx_a, |tree, _| {
1404                tree.paths()
1405                    .map(|p| p.to_string_lossy())
1406                    .collect::<Vec<_>>()
1407                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1408            })
1409            .await;
1410        worktree_b
1411            .condition(&cx_b, |tree, _| {
1412                tree.paths()
1413                    .map(|p| p.to_string_lossy())
1414                    .collect::<Vec<_>>()
1415                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1416            })
1417            .await;
1418        worktree_c
1419            .condition(&cx_c, |tree, _| {
1420                tree.paths()
1421                    .map(|p| p.to_string_lossy())
1422                    .collect::<Vec<_>>()
1423                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1424            })
1425            .await;
1426
1427        // Ensure buffer files are updated as well.
1428        buffer_a
1429            .condition(&cx_a, |buf, _| {
1430                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1431            })
1432            .await;
1433        buffer_b
1434            .condition(&cx_b, |buf, _| {
1435                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1436            })
1437            .await;
1438        buffer_c
1439            .condition(&cx_c, |buf, _| {
1440                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1441            })
1442            .await;
1443    }
1444
1445    #[gpui::test(iterations = 10)]
1446    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1447        cx_a.foreground().forbid_parking();
1448        let lang_registry = Arc::new(LanguageRegistry::test());
1449        let fs = FakeFs::new(cx_a.background());
1450
1451        // Connect to a server as 2 clients.
1452        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1453        let client_a = server.create_client(cx_a, "user_a").await;
1454        let client_b = server.create_client(cx_b, "user_b").await;
1455
1456        // Share a project as client A
1457        fs.insert_tree(
1458            "/dir",
1459            json!({
1460                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1461                "a.txt": "a-contents",
1462            }),
1463        )
1464        .await;
1465
1466        let project_a = cx_a.update(|cx| {
1467            Project::local(
1468                client_a.clone(),
1469                client_a.user_store.clone(),
1470                lang_registry.clone(),
1471                fs.clone(),
1472                cx,
1473            )
1474        });
1475        let (worktree_a, _) = project_a
1476            .update(cx_a, |p, cx| {
1477                p.find_or_create_local_worktree("/dir", true, cx)
1478            })
1479            .await
1480            .unwrap();
1481        worktree_a
1482            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1483            .await;
1484        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1485        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1486        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1487
1488        // Join that project as client B
1489        let project_b = Project::remote(
1490            project_id,
1491            client_b.clone(),
1492            client_b.user_store.clone(),
1493            lang_registry.clone(),
1494            fs.clone(),
1495            &mut cx_b.to_async(),
1496        )
1497        .await
1498        .unwrap();
1499
1500        // Open a buffer as client B
1501        let buffer_b = project_b
1502            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1503            .await
1504            .unwrap();
1505
1506        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1507        buffer_b.read_with(cx_b, |buf, _| {
1508            assert!(buf.is_dirty());
1509            assert!(!buf.has_conflict());
1510        });
1511
1512        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1513        buffer_b
1514            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1515            .await;
1516        buffer_b.read_with(cx_b, |buf, _| {
1517            assert!(!buf.has_conflict());
1518        });
1519
1520        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1521        buffer_b.read_with(cx_b, |buf, _| {
1522            assert!(buf.is_dirty());
1523            assert!(!buf.has_conflict());
1524        });
1525    }
1526
1527    #[gpui::test(iterations = 10)]
1528    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1529        cx_a.foreground().forbid_parking();
1530        let lang_registry = Arc::new(LanguageRegistry::test());
1531        let fs = FakeFs::new(cx_a.background());
1532
1533        // Connect to a server as 2 clients.
1534        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1535        let client_a = server.create_client(cx_a, "user_a").await;
1536        let client_b = server.create_client(cx_b, "user_b").await;
1537
1538        // Share a project as client A
1539        fs.insert_tree(
1540            "/dir",
1541            json!({
1542                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1543                "a.txt": "a-contents",
1544            }),
1545        )
1546        .await;
1547
1548        let project_a = cx_a.update(|cx| {
1549            Project::local(
1550                client_a.clone(),
1551                client_a.user_store.clone(),
1552                lang_registry.clone(),
1553                fs.clone(),
1554                cx,
1555            )
1556        });
1557        let (worktree_a, _) = project_a
1558            .update(cx_a, |p, cx| {
1559                p.find_or_create_local_worktree("/dir", true, cx)
1560            })
1561            .await
1562            .unwrap();
1563        worktree_a
1564            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1565            .await;
1566        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1567        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1568        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1569
1570        // Join that project as client B
1571        let project_b = Project::remote(
1572            project_id,
1573            client_b.clone(),
1574            client_b.user_store.clone(),
1575            lang_registry.clone(),
1576            fs.clone(),
1577            &mut cx_b.to_async(),
1578        )
1579        .await
1580        .unwrap();
1581        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1582
1583        // Open a buffer as client B
1584        let buffer_b = project_b
1585            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1586            .await
1587            .unwrap();
1588        buffer_b.read_with(cx_b, |buf, _| {
1589            assert!(!buf.is_dirty());
1590            assert!(!buf.has_conflict());
1591        });
1592
1593        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1594            .await
1595            .unwrap();
1596        buffer_b
1597            .condition(&cx_b, |buf, _| {
1598                buf.text() == "new contents" && !buf.is_dirty()
1599            })
1600            .await;
1601        buffer_b.read_with(cx_b, |buf, _| {
1602            assert!(!buf.has_conflict());
1603        });
1604    }
1605
1606    #[gpui::test(iterations = 10)]
1607    async fn test_editing_while_guest_opens_buffer(
1608        cx_a: &mut TestAppContext,
1609        cx_b: &mut TestAppContext,
1610    ) {
1611        cx_a.foreground().forbid_parking();
1612        let lang_registry = Arc::new(LanguageRegistry::test());
1613        let fs = FakeFs::new(cx_a.background());
1614
1615        // Connect to a server as 2 clients.
1616        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1617        let client_a = server.create_client(cx_a, "user_a").await;
1618        let client_b = server.create_client(cx_b, "user_b").await;
1619
1620        // Share a project as client A
1621        fs.insert_tree(
1622            "/dir",
1623            json!({
1624                ".zed.toml": r#"collaborators = ["user_b"]"#,
1625                "a.txt": "a-contents",
1626            }),
1627        )
1628        .await;
1629        let project_a = cx_a.update(|cx| {
1630            Project::local(
1631                client_a.clone(),
1632                client_a.user_store.clone(),
1633                lang_registry.clone(),
1634                fs.clone(),
1635                cx,
1636            )
1637        });
1638        let (worktree_a, _) = project_a
1639            .update(cx_a, |p, cx| {
1640                p.find_or_create_local_worktree("/dir", true, cx)
1641            })
1642            .await
1643            .unwrap();
1644        worktree_a
1645            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1646            .await;
1647        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1648        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1649        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1650
1651        // Join that project as client B
1652        let project_b = Project::remote(
1653            project_id,
1654            client_b.clone(),
1655            client_b.user_store.clone(),
1656            lang_registry.clone(),
1657            fs.clone(),
1658            &mut cx_b.to_async(),
1659        )
1660        .await
1661        .unwrap();
1662
1663        // Open a buffer as client A
1664        let buffer_a = project_a
1665            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1666            .await
1667            .unwrap();
1668
1669        // Start opening the same buffer as client B
1670        let buffer_b = cx_b
1671            .background()
1672            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1673
1674        // Edit the buffer as client A while client B is still opening it.
1675        cx_b.background().simulate_random_delay().await;
1676        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1677        cx_b.background().simulate_random_delay().await;
1678        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1679
1680        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1681        let buffer_b = buffer_b.await.unwrap();
1682        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1683    }
1684
1685    #[gpui::test(iterations = 10)]
1686    async fn test_leaving_worktree_while_opening_buffer(
1687        cx_a: &mut TestAppContext,
1688        cx_b: &mut TestAppContext,
1689    ) {
1690        cx_a.foreground().forbid_parking();
1691        let lang_registry = Arc::new(LanguageRegistry::test());
1692        let fs = FakeFs::new(cx_a.background());
1693
1694        // Connect to a server as 2 clients.
1695        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1696        let client_a = server.create_client(cx_a, "user_a").await;
1697        let client_b = server.create_client(cx_b, "user_b").await;
1698
1699        // Share a project as client A
1700        fs.insert_tree(
1701            "/dir",
1702            json!({
1703                ".zed.toml": r#"collaborators = ["user_b"]"#,
1704                "a.txt": "a-contents",
1705            }),
1706        )
1707        .await;
1708        let project_a = cx_a.update(|cx| {
1709            Project::local(
1710                client_a.clone(),
1711                client_a.user_store.clone(),
1712                lang_registry.clone(),
1713                fs.clone(),
1714                cx,
1715            )
1716        });
1717        let (worktree_a, _) = project_a
1718            .update(cx_a, |p, cx| {
1719                p.find_or_create_local_worktree("/dir", true, cx)
1720            })
1721            .await
1722            .unwrap();
1723        worktree_a
1724            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1725            .await;
1726        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1727        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1728        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1729
1730        // Join that project as client B
1731        let project_b = Project::remote(
1732            project_id,
1733            client_b.clone(),
1734            client_b.user_store.clone(),
1735            lang_registry.clone(),
1736            fs.clone(),
1737            &mut cx_b.to_async(),
1738        )
1739        .await
1740        .unwrap();
1741
1742        // See that a guest has joined as client A.
1743        project_a
1744            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1745            .await;
1746
1747        // Begin opening a buffer as client B, but leave the project before the open completes.
1748        let buffer_b = cx_b
1749            .background()
1750            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1751        cx_b.update(|_| drop(project_b));
1752        drop(buffer_b);
1753
1754        // See that the guest has left.
1755        project_a
1756            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1757            .await;
1758    }
1759
1760    #[gpui::test(iterations = 10)]
1761    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1762        cx_a.foreground().forbid_parking();
1763        let lang_registry = Arc::new(LanguageRegistry::test());
1764        let fs = FakeFs::new(cx_a.background());
1765
1766        // Connect to a server as 2 clients.
1767        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1768        let client_a = server.create_client(cx_a, "user_a").await;
1769        let client_b = server.create_client(cx_b, "user_b").await;
1770
1771        // Share a project as client A
1772        fs.insert_tree(
1773            "/a",
1774            json!({
1775                ".zed.toml": r#"collaborators = ["user_b"]"#,
1776                "a.txt": "a-contents",
1777                "b.txt": "b-contents",
1778            }),
1779        )
1780        .await;
1781        let project_a = cx_a.update(|cx| {
1782            Project::local(
1783                client_a.clone(),
1784                client_a.user_store.clone(),
1785                lang_registry.clone(),
1786                fs.clone(),
1787                cx,
1788            )
1789        });
1790        let (worktree_a, _) = project_a
1791            .update(cx_a, |p, cx| {
1792                p.find_or_create_local_worktree("/a", true, cx)
1793            })
1794            .await
1795            .unwrap();
1796        worktree_a
1797            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1798            .await;
1799        let project_id = project_a
1800            .update(cx_a, |project, _| project.next_remote_id())
1801            .await;
1802        project_a
1803            .update(cx_a, |project, cx| project.share(cx))
1804            .await
1805            .unwrap();
1806
1807        // Join that project as client B
1808        let _project_b = Project::remote(
1809            project_id,
1810            client_b.clone(),
1811            client_b.user_store.clone(),
1812            lang_registry.clone(),
1813            fs.clone(),
1814            &mut cx_b.to_async(),
1815        )
1816        .await
1817        .unwrap();
1818
1819        // Client A sees that a guest has joined.
1820        project_a
1821            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1822            .await;
1823
1824        // Drop client B's connection and ensure client A observes client B leaving the project.
1825        client_b.disconnect(&cx_b.to_async()).unwrap();
1826        project_a
1827            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1828            .await;
1829
1830        // Rejoin the project as client B
1831        let _project_b = Project::remote(
1832            project_id,
1833            client_b.clone(),
1834            client_b.user_store.clone(),
1835            lang_registry.clone(),
1836            fs.clone(),
1837            &mut cx_b.to_async(),
1838        )
1839        .await
1840        .unwrap();
1841
1842        // Client A sees that a guest has re-joined.
1843        project_a
1844            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1845            .await;
1846
1847        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1848        client_b.wait_for_current_user(cx_b).await;
1849        server.disconnect_client(client_b.current_user_id(cx_b));
1850        cx_a.foreground().advance_clock(Duration::from_secs(3));
1851        project_a
1852            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1853            .await;
1854    }
1855
1856    #[gpui::test(iterations = 10)]
1857    async fn test_collaborating_with_diagnostics(
1858        cx_a: &mut TestAppContext,
1859        cx_b: &mut TestAppContext,
1860    ) {
1861        cx_a.foreground().forbid_parking();
1862        let mut lang_registry = Arc::new(LanguageRegistry::test());
1863        let fs = FakeFs::new(cx_a.background());
1864
1865        // Set up a fake language server.
1866        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1867        Arc::get_mut(&mut lang_registry)
1868            .unwrap()
1869            .add(Arc::new(Language::new(
1870                LanguageConfig {
1871                    name: "Rust".into(),
1872                    path_suffixes: vec!["rs".to_string()],
1873                    language_server: Some(language_server_config),
1874                    ..Default::default()
1875                },
1876                Some(tree_sitter_rust::language()),
1877            )));
1878
1879        // Connect to a server as 2 clients.
1880        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1881        let client_a = server.create_client(cx_a, "user_a").await;
1882        let client_b = server.create_client(cx_b, "user_b").await;
1883
1884        // Share a project as client A
1885        fs.insert_tree(
1886            "/a",
1887            json!({
1888                ".zed.toml": r#"collaborators = ["user_b"]"#,
1889                "a.rs": "let one = two",
1890                "other.rs": "",
1891            }),
1892        )
1893        .await;
1894        let project_a = cx_a.update(|cx| {
1895            Project::local(
1896                client_a.clone(),
1897                client_a.user_store.clone(),
1898                lang_registry.clone(),
1899                fs.clone(),
1900                cx,
1901            )
1902        });
1903        let (worktree_a, _) = project_a
1904            .update(cx_a, |p, cx| {
1905                p.find_or_create_local_worktree("/a", true, cx)
1906            })
1907            .await
1908            .unwrap();
1909        worktree_a
1910            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1911            .await;
1912        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1913        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1914        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1915
1916        // Cause the language server to start.
1917        let _ = cx_a
1918            .background()
1919            .spawn(project_a.update(cx_a, |project, cx| {
1920                project.open_buffer(
1921                    ProjectPath {
1922                        worktree_id,
1923                        path: Path::new("other.rs").into(),
1924                    },
1925                    cx,
1926                )
1927            }))
1928            .await
1929            .unwrap();
1930
1931        // Simulate a language server reporting errors for a file.
1932        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1933        fake_language_server
1934            .receive_notification::<lsp::notification::DidOpenTextDocument>()
1935            .await;
1936        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1937            lsp::PublishDiagnosticsParams {
1938                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1939                version: None,
1940                diagnostics: vec![lsp::Diagnostic {
1941                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1942                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1943                    message: "message 1".to_string(),
1944                    ..Default::default()
1945                }],
1946            },
1947        );
1948
1949        // Wait for server to see the diagnostics update.
1950        server
1951            .condition(|store| {
1952                let worktree = store
1953                    .project(project_id)
1954                    .unwrap()
1955                    .share
1956                    .as_ref()
1957                    .unwrap()
1958                    .worktrees
1959                    .get(&worktree_id.to_proto())
1960                    .unwrap();
1961
1962                !worktree.diagnostic_summaries.is_empty()
1963            })
1964            .await;
1965
1966        // Join the worktree as client B.
1967        let project_b = Project::remote(
1968            project_id,
1969            client_b.clone(),
1970            client_b.user_store.clone(),
1971            lang_registry.clone(),
1972            fs.clone(),
1973            &mut cx_b.to_async(),
1974        )
1975        .await
1976        .unwrap();
1977
1978        project_b.read_with(cx_b, |project, cx| {
1979            assert_eq!(
1980                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
1981                &[(
1982                    ProjectPath {
1983                        worktree_id,
1984                        path: Arc::from(Path::new("a.rs")),
1985                    },
1986                    DiagnosticSummary {
1987                        error_count: 1,
1988                        warning_count: 0,
1989                        ..Default::default()
1990                    },
1991                )]
1992            )
1993        });
1994
1995        // Simulate a language server reporting more errors for a file.
1996        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1997            lsp::PublishDiagnosticsParams {
1998                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1999                version: None,
2000                diagnostics: vec![
2001                    lsp::Diagnostic {
2002                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2003                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2004                        message: "message 1".to_string(),
2005                        ..Default::default()
2006                    },
2007                    lsp::Diagnostic {
2008                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2009                        range: lsp::Range::new(
2010                            lsp::Position::new(0, 10),
2011                            lsp::Position::new(0, 13),
2012                        ),
2013                        message: "message 2".to_string(),
2014                        ..Default::default()
2015                    },
2016                ],
2017            },
2018        );
2019
2020        // Client b gets the updated summaries
2021        project_b
2022            .condition(&cx_b, |project, cx| {
2023                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2024                    == &[(
2025                        ProjectPath {
2026                            worktree_id,
2027                            path: Arc::from(Path::new("a.rs")),
2028                        },
2029                        DiagnosticSummary {
2030                            error_count: 1,
2031                            warning_count: 1,
2032                            ..Default::default()
2033                        },
2034                    )]
2035            })
2036            .await;
2037
2038        // Open the file with the errors on client B. They should be present.
2039        let buffer_b = cx_b
2040            .background()
2041            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2042            .await
2043            .unwrap();
2044
2045        buffer_b.read_with(cx_b, |buffer, _| {
2046            assert_eq!(
2047                buffer
2048                    .snapshot()
2049                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2050                    .map(|entry| entry)
2051                    .collect::<Vec<_>>(),
2052                &[
2053                    DiagnosticEntry {
2054                        range: Point::new(0, 4)..Point::new(0, 7),
2055                        diagnostic: Diagnostic {
2056                            group_id: 0,
2057                            message: "message 1".to_string(),
2058                            severity: lsp::DiagnosticSeverity::ERROR,
2059                            is_primary: true,
2060                            ..Default::default()
2061                        }
2062                    },
2063                    DiagnosticEntry {
2064                        range: Point::new(0, 10)..Point::new(0, 13),
2065                        diagnostic: Diagnostic {
2066                            group_id: 1,
2067                            severity: lsp::DiagnosticSeverity::WARNING,
2068                            message: "message 2".to_string(),
2069                            is_primary: true,
2070                            ..Default::default()
2071                        }
2072                    }
2073                ]
2074            );
2075        });
2076    }
2077
2078    #[gpui::test(iterations = 10)]
2079    async fn test_collaborating_with_completion(
2080        cx_a: &mut TestAppContext,
2081        cx_b: &mut TestAppContext,
2082    ) {
2083        cx_a.foreground().forbid_parking();
2084        let mut lang_registry = Arc::new(LanguageRegistry::test());
2085        let fs = FakeFs::new(cx_a.background());
2086
2087        // Set up a fake language server.
2088        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2089        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2090            completion_provider: Some(lsp::CompletionOptions {
2091                trigger_characters: Some(vec![".".to_string()]),
2092                ..Default::default()
2093            }),
2094            ..Default::default()
2095        });
2096        Arc::get_mut(&mut lang_registry)
2097            .unwrap()
2098            .add(Arc::new(Language::new(
2099                LanguageConfig {
2100                    name: "Rust".into(),
2101                    path_suffixes: vec!["rs".to_string()],
2102                    language_server: Some(language_server_config),
2103                    ..Default::default()
2104                },
2105                Some(tree_sitter_rust::language()),
2106            )));
2107
2108        // Connect to a server as 2 clients.
2109        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2110        let client_a = server.create_client(cx_a, "user_a").await;
2111        let client_b = server.create_client(cx_b, "user_b").await;
2112
2113        // Share a project as client A
2114        fs.insert_tree(
2115            "/a",
2116            json!({
2117                ".zed.toml": r#"collaborators = ["user_b"]"#,
2118                "main.rs": "fn main() { a }",
2119                "other.rs": "",
2120            }),
2121        )
2122        .await;
2123        let project_a = cx_a.update(|cx| {
2124            Project::local(
2125                client_a.clone(),
2126                client_a.user_store.clone(),
2127                lang_registry.clone(),
2128                fs.clone(),
2129                cx,
2130            )
2131        });
2132        let (worktree_a, _) = project_a
2133            .update(cx_a, |p, cx| {
2134                p.find_or_create_local_worktree("/a", true, cx)
2135            })
2136            .await
2137            .unwrap();
2138        worktree_a
2139            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2140            .await;
2141        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2142        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2143        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2144
2145        // Join the worktree as client B.
2146        let project_b = Project::remote(
2147            project_id,
2148            client_b.clone(),
2149            client_b.user_store.clone(),
2150            lang_registry.clone(),
2151            fs.clone(),
2152            &mut cx_b.to_async(),
2153        )
2154        .await
2155        .unwrap();
2156
2157        // Open a file in an editor as the guest.
2158        let buffer_b = project_b
2159            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2160            .await
2161            .unwrap();
2162        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2163        let editor_b = cx_b.add_view(window_b, |cx| {
2164            Editor::for_buffer(
2165                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2166                Some(project_b.clone()),
2167                watch::channel_with(Settings::test(cx)).1,
2168                cx,
2169            )
2170        });
2171
2172        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2173        buffer_b
2174            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2175            .await;
2176
2177        // Type a completion trigger character as the guest.
2178        editor_b.update(cx_b, |editor, cx| {
2179            editor.select_ranges([13..13], None, cx);
2180            editor.handle_input(&Input(".".into()), cx);
2181            cx.focus(&editor_b);
2182        });
2183
2184        // Receive a completion request as the host's language server.
2185        // Return some completions from the host's language server.
2186        cx_a.foreground().start_waiting();
2187        fake_language_server
2188            .handle_request::<lsp::request::Completion, _>(|params, _| {
2189                assert_eq!(
2190                    params.text_document_position.text_document.uri,
2191                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2192                );
2193                assert_eq!(
2194                    params.text_document_position.position,
2195                    lsp::Position::new(0, 14),
2196                );
2197
2198                Some(lsp::CompletionResponse::Array(vec![
2199                    lsp::CompletionItem {
2200                        label: "first_method(…)".into(),
2201                        detail: Some("fn(&mut self, B) -> C".into()),
2202                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2203                            new_text: "first_method($1)".to_string(),
2204                            range: lsp::Range::new(
2205                                lsp::Position::new(0, 14),
2206                                lsp::Position::new(0, 14),
2207                            ),
2208                        })),
2209                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2210                        ..Default::default()
2211                    },
2212                    lsp::CompletionItem {
2213                        label: "second_method(…)".into(),
2214                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2215                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2216                            new_text: "second_method()".to_string(),
2217                            range: lsp::Range::new(
2218                                lsp::Position::new(0, 14),
2219                                lsp::Position::new(0, 14),
2220                            ),
2221                        })),
2222                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2223                        ..Default::default()
2224                    },
2225                ]))
2226            })
2227            .next()
2228            .await
2229            .unwrap();
2230        cx_a.foreground().finish_waiting();
2231
2232        // Open the buffer on the host.
2233        let buffer_a = project_a
2234            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2235            .await
2236            .unwrap();
2237        buffer_a
2238            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2239            .await;
2240
2241        // Confirm a completion on the guest.
2242        editor_b
2243            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2244            .await;
2245        editor_b.update(cx_b, |editor, cx| {
2246            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2247            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2248        });
2249
2250        // Return a resolved completion from the host's language server.
2251        // The resolved completion has an additional text edit.
2252        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2253            |params, _| {
2254                assert_eq!(params.label, "first_method(…)");
2255                lsp::CompletionItem {
2256                    label: "first_method(…)".into(),
2257                    detail: Some("fn(&mut self, B) -> C".into()),
2258                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2259                        new_text: "first_method($1)".to_string(),
2260                        range: lsp::Range::new(
2261                            lsp::Position::new(0, 14),
2262                            lsp::Position::new(0, 14),
2263                        ),
2264                    })),
2265                    additional_text_edits: Some(vec![lsp::TextEdit {
2266                        new_text: "use d::SomeTrait;\n".to_string(),
2267                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2268                    }]),
2269                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2270                    ..Default::default()
2271                }
2272            },
2273        );
2274
2275        // The additional edit is applied.
2276        buffer_a
2277            .condition(&cx_a, |buffer, _| {
2278                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2279            })
2280            .await;
2281        buffer_b
2282            .condition(&cx_b, |buffer, _| {
2283                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2284            })
2285            .await;
2286    }
2287
2288    #[gpui::test(iterations = 10)]
2289    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2290        cx_a.foreground().forbid_parking();
2291        let mut lang_registry = Arc::new(LanguageRegistry::test());
2292        let fs = FakeFs::new(cx_a.background());
2293
2294        // Set up a fake language server.
2295        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2296        Arc::get_mut(&mut lang_registry)
2297            .unwrap()
2298            .add(Arc::new(Language::new(
2299                LanguageConfig {
2300                    name: "Rust".into(),
2301                    path_suffixes: vec!["rs".to_string()],
2302                    language_server: Some(language_server_config),
2303                    ..Default::default()
2304                },
2305                Some(tree_sitter_rust::language()),
2306            )));
2307
2308        // Connect to a server as 2 clients.
2309        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2310        let client_a = server.create_client(cx_a, "user_a").await;
2311        let client_b = server.create_client(cx_b, "user_b").await;
2312
2313        // Share a project as client A
2314        fs.insert_tree(
2315            "/a",
2316            json!({
2317                ".zed.toml": r#"collaborators = ["user_b"]"#,
2318                "a.rs": "let one = two",
2319            }),
2320        )
2321        .await;
2322        let project_a = cx_a.update(|cx| {
2323            Project::local(
2324                client_a.clone(),
2325                client_a.user_store.clone(),
2326                lang_registry.clone(),
2327                fs.clone(),
2328                cx,
2329            )
2330        });
2331        let (worktree_a, _) = project_a
2332            .update(cx_a, |p, cx| {
2333                p.find_or_create_local_worktree("/a", true, cx)
2334            })
2335            .await
2336            .unwrap();
2337        worktree_a
2338            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2339            .await;
2340        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2341        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2342        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2343
2344        // Join the worktree as client B.
2345        let project_b = Project::remote(
2346            project_id,
2347            client_b.clone(),
2348            client_b.user_store.clone(),
2349            lang_registry.clone(),
2350            fs.clone(),
2351            &mut cx_b.to_async(),
2352        )
2353        .await
2354        .unwrap();
2355
2356        let buffer_b = cx_b
2357            .background()
2358            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2359            .await
2360            .unwrap();
2361
2362        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2363        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2364            Some(vec![
2365                lsp::TextEdit {
2366                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2367                    new_text: "h".to_string(),
2368                },
2369                lsp::TextEdit {
2370                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2371                    new_text: "y".to_string(),
2372                },
2373            ])
2374        });
2375
2376        project_b
2377            .update(cx_b, |project, cx| {
2378                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2379            })
2380            .await
2381            .unwrap();
2382        assert_eq!(
2383            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2384            "let honey = two"
2385        );
2386    }
2387
2388    #[gpui::test(iterations = 10)]
2389    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2390        cx_a.foreground().forbid_parking();
2391        let mut lang_registry = Arc::new(LanguageRegistry::test());
2392        let fs = FakeFs::new(cx_a.background());
2393        fs.insert_tree(
2394            "/root-1",
2395            json!({
2396                ".zed.toml": r#"collaborators = ["user_b"]"#,
2397                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2398            }),
2399        )
2400        .await;
2401        fs.insert_tree(
2402            "/root-2",
2403            json!({
2404                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2405            }),
2406        )
2407        .await;
2408
2409        // Set up a fake language server.
2410        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2411        Arc::get_mut(&mut lang_registry)
2412            .unwrap()
2413            .add(Arc::new(Language::new(
2414                LanguageConfig {
2415                    name: "Rust".into(),
2416                    path_suffixes: vec!["rs".to_string()],
2417                    language_server: Some(language_server_config),
2418                    ..Default::default()
2419                },
2420                Some(tree_sitter_rust::language()),
2421            )));
2422
2423        // Connect to a server as 2 clients.
2424        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2425        let client_a = server.create_client(cx_a, "user_a").await;
2426        let client_b = server.create_client(cx_b, "user_b").await;
2427
2428        // Share a project as client A
2429        let project_a = cx_a.update(|cx| {
2430            Project::local(
2431                client_a.clone(),
2432                client_a.user_store.clone(),
2433                lang_registry.clone(),
2434                fs.clone(),
2435                cx,
2436            )
2437        });
2438        let (worktree_a, _) = project_a
2439            .update(cx_a, |p, cx| {
2440                p.find_or_create_local_worktree("/root-1", true, cx)
2441            })
2442            .await
2443            .unwrap();
2444        worktree_a
2445            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2446            .await;
2447        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2448        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2449        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2450
2451        // Join the worktree as client B.
2452        let project_b = Project::remote(
2453            project_id,
2454            client_b.clone(),
2455            client_b.user_store.clone(),
2456            lang_registry.clone(),
2457            fs.clone(),
2458            &mut cx_b.to_async(),
2459        )
2460        .await
2461        .unwrap();
2462
2463        // Open the file on client B.
2464        let buffer_b = cx_b
2465            .background()
2466            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2467            .await
2468            .unwrap();
2469
2470        // Request the definition of a symbol as the guest.
2471        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2472        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2473            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2474                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2475                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2476            )))
2477        });
2478
2479        let definitions_1 = project_b
2480            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2481            .await
2482            .unwrap();
2483        cx_b.read(|cx| {
2484            assert_eq!(definitions_1.len(), 1);
2485            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2486            let target_buffer = definitions_1[0].buffer.read(cx);
2487            assert_eq!(
2488                target_buffer.text(),
2489                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2490            );
2491            assert_eq!(
2492                definitions_1[0].range.to_point(target_buffer),
2493                Point::new(0, 6)..Point::new(0, 9)
2494            );
2495        });
2496
2497        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2498        // the previous call to `definition`.
2499        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2500            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2501                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2502                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2503            )))
2504        });
2505
2506        let definitions_2 = project_b
2507            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2508            .await
2509            .unwrap();
2510        cx_b.read(|cx| {
2511            assert_eq!(definitions_2.len(), 1);
2512            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2513            let target_buffer = definitions_2[0].buffer.read(cx);
2514            assert_eq!(
2515                target_buffer.text(),
2516                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2517            );
2518            assert_eq!(
2519                definitions_2[0].range.to_point(target_buffer),
2520                Point::new(1, 6)..Point::new(1, 11)
2521            );
2522        });
2523        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2524    }
2525
2526    #[gpui::test(iterations = 10)]
2527    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2528        cx_a.foreground().forbid_parking();
2529        let mut lang_registry = Arc::new(LanguageRegistry::test());
2530        let fs = FakeFs::new(cx_a.background());
2531        fs.insert_tree(
2532            "/root-1",
2533            json!({
2534                ".zed.toml": r#"collaborators = ["user_b"]"#,
2535                "one.rs": "const ONE: usize = 1;",
2536                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2537            }),
2538        )
2539        .await;
2540        fs.insert_tree(
2541            "/root-2",
2542            json!({
2543                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2544            }),
2545        )
2546        .await;
2547
2548        // Set up a fake language server.
2549        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2550        Arc::get_mut(&mut lang_registry)
2551            .unwrap()
2552            .add(Arc::new(Language::new(
2553                LanguageConfig {
2554                    name: "Rust".into(),
2555                    path_suffixes: vec!["rs".to_string()],
2556                    language_server: Some(language_server_config),
2557                    ..Default::default()
2558                },
2559                Some(tree_sitter_rust::language()),
2560            )));
2561
2562        // Connect to a server as 2 clients.
2563        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2564        let client_a = server.create_client(cx_a, "user_a").await;
2565        let client_b = server.create_client(cx_b, "user_b").await;
2566
2567        // Share a project as client A
2568        let project_a = cx_a.update(|cx| {
2569            Project::local(
2570                client_a.clone(),
2571                client_a.user_store.clone(),
2572                lang_registry.clone(),
2573                fs.clone(),
2574                cx,
2575            )
2576        });
2577        let (worktree_a, _) = project_a
2578            .update(cx_a, |p, cx| {
2579                p.find_or_create_local_worktree("/root-1", true, cx)
2580            })
2581            .await
2582            .unwrap();
2583        worktree_a
2584            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2585            .await;
2586        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2587        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2588        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2589
2590        // Join the worktree as client B.
2591        let project_b = Project::remote(
2592            project_id,
2593            client_b.clone(),
2594            client_b.user_store.clone(),
2595            lang_registry.clone(),
2596            fs.clone(),
2597            &mut cx_b.to_async(),
2598        )
2599        .await
2600        .unwrap();
2601
2602        // Open the file on client B.
2603        let buffer_b = cx_b
2604            .background()
2605            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2606            .await
2607            .unwrap();
2608
2609        // Request references to a symbol as the guest.
2610        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2611        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2612            assert_eq!(
2613                params.text_document_position.text_document.uri.as_str(),
2614                "file:///root-1/one.rs"
2615            );
2616            Some(vec![
2617                lsp::Location {
2618                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2619                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2620                },
2621                lsp::Location {
2622                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2623                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2624                },
2625                lsp::Location {
2626                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2627                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2628                },
2629            ])
2630        });
2631
2632        let references = project_b
2633            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2634            .await
2635            .unwrap();
2636        cx_b.read(|cx| {
2637            assert_eq!(references.len(), 3);
2638            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2639
2640            let two_buffer = references[0].buffer.read(cx);
2641            let three_buffer = references[2].buffer.read(cx);
2642            assert_eq!(
2643                two_buffer.file().unwrap().path().as_ref(),
2644                Path::new("two.rs")
2645            );
2646            assert_eq!(references[1].buffer, references[0].buffer);
2647            assert_eq!(
2648                three_buffer.file().unwrap().full_path(cx),
2649                Path::new("three.rs")
2650            );
2651
2652            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2653            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2654            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2655        });
2656    }
2657
2658    #[gpui::test(iterations = 10)]
2659    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2660        cx_a.foreground().forbid_parking();
2661        let lang_registry = Arc::new(LanguageRegistry::test());
2662        let fs = FakeFs::new(cx_a.background());
2663        fs.insert_tree(
2664            "/root-1",
2665            json!({
2666                ".zed.toml": r#"collaborators = ["user_b"]"#,
2667                "a": "hello world",
2668                "b": "goodnight moon",
2669                "c": "a world of goo",
2670                "d": "world champion of clown world",
2671            }),
2672        )
2673        .await;
2674        fs.insert_tree(
2675            "/root-2",
2676            json!({
2677                "e": "disney world is fun",
2678            }),
2679        )
2680        .await;
2681
2682        // Connect to a server as 2 clients.
2683        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2684        let client_a = server.create_client(cx_a, "user_a").await;
2685        let client_b = server.create_client(cx_b, "user_b").await;
2686
2687        // Share a project as client A
2688        let project_a = cx_a.update(|cx| {
2689            Project::local(
2690                client_a.clone(),
2691                client_a.user_store.clone(),
2692                lang_registry.clone(),
2693                fs.clone(),
2694                cx,
2695            )
2696        });
2697        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2698
2699        let (worktree_1, _) = project_a
2700            .update(cx_a, |p, cx| {
2701                p.find_or_create_local_worktree("/root-1", true, cx)
2702            })
2703            .await
2704            .unwrap();
2705        worktree_1
2706            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2707            .await;
2708        let (worktree_2, _) = project_a
2709            .update(cx_a, |p, cx| {
2710                p.find_or_create_local_worktree("/root-2", true, cx)
2711            })
2712            .await
2713            .unwrap();
2714        worktree_2
2715            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2716            .await;
2717
2718        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2719
2720        // Join the worktree as client B.
2721        let project_b = Project::remote(
2722            project_id,
2723            client_b.clone(),
2724            client_b.user_store.clone(),
2725            lang_registry.clone(),
2726            fs.clone(),
2727            &mut cx_b.to_async(),
2728        )
2729        .await
2730        .unwrap();
2731
2732        let results = project_b
2733            .update(cx_b, |project, cx| {
2734                project.search(SearchQuery::text("world", false, false), cx)
2735            })
2736            .await
2737            .unwrap();
2738
2739        let mut ranges_by_path = results
2740            .into_iter()
2741            .map(|(buffer, ranges)| {
2742                buffer.read_with(cx_b, |buffer, cx| {
2743                    let path = buffer.file().unwrap().full_path(cx);
2744                    let offset_ranges = ranges
2745                        .into_iter()
2746                        .map(|range| range.to_offset(buffer))
2747                        .collect::<Vec<_>>();
2748                    (path, offset_ranges)
2749                })
2750            })
2751            .collect::<Vec<_>>();
2752        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2753
2754        assert_eq!(
2755            ranges_by_path,
2756            &[
2757                (PathBuf::from("root-1/a"), vec![6..11]),
2758                (PathBuf::from("root-1/c"), vec![2..7]),
2759                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2760                (PathBuf::from("root-2/e"), vec![7..12]),
2761            ]
2762        );
2763    }
2764
2765    #[gpui::test(iterations = 10)]
2766    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2767        cx_a.foreground().forbid_parking();
2768        let lang_registry = Arc::new(LanguageRegistry::test());
2769        let fs = FakeFs::new(cx_a.background());
2770        fs.insert_tree(
2771            "/root-1",
2772            json!({
2773                ".zed.toml": r#"collaborators = ["user_b"]"#,
2774                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2775            }),
2776        )
2777        .await;
2778
2779        // Set up a fake language server.
2780        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2781        lang_registry.add(Arc::new(Language::new(
2782            LanguageConfig {
2783                name: "Rust".into(),
2784                path_suffixes: vec!["rs".to_string()],
2785                language_server: Some(language_server_config),
2786                ..Default::default()
2787            },
2788            Some(tree_sitter_rust::language()),
2789        )));
2790
2791        // Connect to a server as 2 clients.
2792        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2793        let client_a = server.create_client(cx_a, "user_a").await;
2794        let client_b = server.create_client(cx_b, "user_b").await;
2795
2796        // Share a project as client A
2797        let project_a = cx_a.update(|cx| {
2798            Project::local(
2799                client_a.clone(),
2800                client_a.user_store.clone(),
2801                lang_registry.clone(),
2802                fs.clone(),
2803                cx,
2804            )
2805        });
2806        let (worktree_a, _) = project_a
2807            .update(cx_a, |p, cx| {
2808                p.find_or_create_local_worktree("/root-1", true, cx)
2809            })
2810            .await
2811            .unwrap();
2812        worktree_a
2813            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2814            .await;
2815        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2816        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2817        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2818
2819        // Join the worktree as client B.
2820        let project_b = Project::remote(
2821            project_id,
2822            client_b.clone(),
2823            client_b.user_store.clone(),
2824            lang_registry.clone(),
2825            fs.clone(),
2826            &mut cx_b.to_async(),
2827        )
2828        .await
2829        .unwrap();
2830
2831        // Open the file on client B.
2832        let buffer_b = cx_b
2833            .background()
2834            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2835            .await
2836            .unwrap();
2837
2838        // Request document highlights as the guest.
2839        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2840        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2841            |params, _| {
2842                assert_eq!(
2843                    params
2844                        .text_document_position_params
2845                        .text_document
2846                        .uri
2847                        .as_str(),
2848                    "file:///root-1/main.rs"
2849                );
2850                assert_eq!(
2851                    params.text_document_position_params.position,
2852                    lsp::Position::new(0, 34)
2853                );
2854                Some(vec![
2855                    lsp::DocumentHighlight {
2856                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2857                        range: lsp::Range::new(
2858                            lsp::Position::new(0, 10),
2859                            lsp::Position::new(0, 16),
2860                        ),
2861                    },
2862                    lsp::DocumentHighlight {
2863                        kind: Some(lsp::DocumentHighlightKind::READ),
2864                        range: lsp::Range::new(
2865                            lsp::Position::new(0, 32),
2866                            lsp::Position::new(0, 38),
2867                        ),
2868                    },
2869                    lsp::DocumentHighlight {
2870                        kind: Some(lsp::DocumentHighlightKind::READ),
2871                        range: lsp::Range::new(
2872                            lsp::Position::new(0, 41),
2873                            lsp::Position::new(0, 47),
2874                        ),
2875                    },
2876                ])
2877            },
2878        );
2879
2880        let highlights = project_b
2881            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2882            .await
2883            .unwrap();
2884        buffer_b.read_with(cx_b, |buffer, _| {
2885            let snapshot = buffer.snapshot();
2886
2887            let highlights = highlights
2888                .into_iter()
2889                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2890                .collect::<Vec<_>>();
2891            assert_eq!(
2892                highlights,
2893                &[
2894                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2895                    (lsp::DocumentHighlightKind::READ, 32..38),
2896                    (lsp::DocumentHighlightKind::READ, 41..47)
2897                ]
2898            )
2899        });
2900    }
2901
2902    #[gpui::test(iterations = 10)]
2903    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2904        cx_a.foreground().forbid_parking();
2905        let mut lang_registry = Arc::new(LanguageRegistry::test());
2906        let fs = FakeFs::new(cx_a.background());
2907        fs.insert_tree(
2908            "/code",
2909            json!({
2910                "crate-1": {
2911                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2912                    "one.rs": "const ONE: usize = 1;",
2913                },
2914                "crate-2": {
2915                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2916                },
2917                "private": {
2918                    "passwords.txt": "the-password",
2919                }
2920            }),
2921        )
2922        .await;
2923
2924        // Set up a fake language server.
2925        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2926        Arc::get_mut(&mut lang_registry)
2927            .unwrap()
2928            .add(Arc::new(Language::new(
2929                LanguageConfig {
2930                    name: "Rust".into(),
2931                    path_suffixes: vec!["rs".to_string()],
2932                    language_server: Some(language_server_config),
2933                    ..Default::default()
2934                },
2935                Some(tree_sitter_rust::language()),
2936            )));
2937
2938        // Connect to a server as 2 clients.
2939        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2940        let client_a = server.create_client(cx_a, "user_a").await;
2941        let client_b = server.create_client(cx_b, "user_b").await;
2942
2943        // Share a project as client A
2944        let project_a = cx_a.update(|cx| {
2945            Project::local(
2946                client_a.clone(),
2947                client_a.user_store.clone(),
2948                lang_registry.clone(),
2949                fs.clone(),
2950                cx,
2951            )
2952        });
2953        let (worktree_a, _) = project_a
2954            .update(cx_a, |p, cx| {
2955                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2956            })
2957            .await
2958            .unwrap();
2959        worktree_a
2960            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2961            .await;
2962        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2963        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2964        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2965
2966        // Join the worktree as client B.
2967        let project_b = Project::remote(
2968            project_id,
2969            client_b.clone(),
2970            client_b.user_store.clone(),
2971            lang_registry.clone(),
2972            fs.clone(),
2973            &mut cx_b.to_async(),
2974        )
2975        .await
2976        .unwrap();
2977
2978        // Cause the language server to start.
2979        let _buffer = cx_b
2980            .background()
2981            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2982            .await
2983            .unwrap();
2984
2985        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2986        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
2987            #[allow(deprecated)]
2988            Some(vec![lsp::SymbolInformation {
2989                name: "TWO".into(),
2990                location: lsp::Location {
2991                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
2992                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2993                },
2994                kind: lsp::SymbolKind::CONSTANT,
2995                tags: None,
2996                container_name: None,
2997                deprecated: None,
2998            }])
2999        });
3000
3001        // Request the definition of a symbol as the guest.
3002        let symbols = project_b
3003            .update(cx_b, |p, cx| p.symbols("two", cx))
3004            .await
3005            .unwrap();
3006        assert_eq!(symbols.len(), 1);
3007        assert_eq!(symbols[0].name, "TWO");
3008
3009        // Open one of the returned symbols.
3010        let buffer_b_2 = project_b
3011            .update(cx_b, |project, cx| {
3012                project.open_buffer_for_symbol(&symbols[0], cx)
3013            })
3014            .await
3015            .unwrap();
3016        buffer_b_2.read_with(cx_b, |buffer, _| {
3017            assert_eq!(
3018                buffer.file().unwrap().path().as_ref(),
3019                Path::new("../crate-2/two.rs")
3020            );
3021        });
3022
3023        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3024        let mut fake_symbol = symbols[0].clone();
3025        fake_symbol.path = Path::new("/code/secrets").into();
3026        let error = project_b
3027            .update(cx_b, |project, cx| {
3028                project.open_buffer_for_symbol(&fake_symbol, cx)
3029            })
3030            .await
3031            .unwrap_err();
3032        assert!(error.to_string().contains("invalid symbol signature"));
3033    }
3034
3035    #[gpui::test(iterations = 10)]
3036    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3037        cx_a: &mut TestAppContext,
3038        cx_b: &mut TestAppContext,
3039        mut rng: StdRng,
3040    ) {
3041        cx_a.foreground().forbid_parking();
3042        let mut lang_registry = Arc::new(LanguageRegistry::test());
3043        let fs = FakeFs::new(cx_a.background());
3044        fs.insert_tree(
3045            "/root",
3046            json!({
3047                ".zed.toml": r#"collaborators = ["user_b"]"#,
3048                "a.rs": "const ONE: usize = b::TWO;",
3049                "b.rs": "const TWO: usize = 2",
3050            }),
3051        )
3052        .await;
3053
3054        // Set up a fake language server.
3055        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3056
3057        Arc::get_mut(&mut lang_registry)
3058            .unwrap()
3059            .add(Arc::new(Language::new(
3060                LanguageConfig {
3061                    name: "Rust".into(),
3062                    path_suffixes: vec!["rs".to_string()],
3063                    language_server: Some(language_server_config),
3064                    ..Default::default()
3065                },
3066                Some(tree_sitter_rust::language()),
3067            )));
3068
3069        // Connect to a server as 2 clients.
3070        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3071        let client_a = server.create_client(cx_a, "user_a").await;
3072        let client_b = server.create_client(cx_b, "user_b").await;
3073
3074        // Share a project as client A
3075        let project_a = cx_a.update(|cx| {
3076            Project::local(
3077                client_a.clone(),
3078                client_a.user_store.clone(),
3079                lang_registry.clone(),
3080                fs.clone(),
3081                cx,
3082            )
3083        });
3084
3085        let (worktree_a, _) = project_a
3086            .update(cx_a, |p, cx| {
3087                p.find_or_create_local_worktree("/root", true, cx)
3088            })
3089            .await
3090            .unwrap();
3091        worktree_a
3092            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3093            .await;
3094        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3095        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3096        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3097
3098        // Join the worktree as client B.
3099        let project_b = Project::remote(
3100            project_id,
3101            client_b.clone(),
3102            client_b.user_store.clone(),
3103            lang_registry.clone(),
3104            fs.clone(),
3105            &mut cx_b.to_async(),
3106        )
3107        .await
3108        .unwrap();
3109
3110        let buffer_b1 = cx_b
3111            .background()
3112            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3113            .await
3114            .unwrap();
3115
3116        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3117        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3118            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3119                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3120                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3121            )))
3122        });
3123
3124        let definitions;
3125        let buffer_b2;
3126        if rng.gen() {
3127            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3128            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3129        } else {
3130            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3131            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3132        }
3133
3134        let buffer_b2 = buffer_b2.await.unwrap();
3135        let definitions = definitions.await.unwrap();
3136        assert_eq!(definitions.len(), 1);
3137        assert_eq!(definitions[0].buffer, buffer_b2);
3138    }
3139
3140    #[gpui::test(iterations = 10)]
3141    async fn test_collaborating_with_code_actions(
3142        cx_a: &mut TestAppContext,
3143        cx_b: &mut TestAppContext,
3144    ) {
3145        cx_a.foreground().forbid_parking();
3146        let mut lang_registry = Arc::new(LanguageRegistry::test());
3147        let fs = FakeFs::new(cx_a.background());
3148        let mut path_openers_b = Vec::new();
3149        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3150
3151        // Set up a fake language server.
3152        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3153        Arc::get_mut(&mut lang_registry)
3154            .unwrap()
3155            .add(Arc::new(Language::new(
3156                LanguageConfig {
3157                    name: "Rust".into(),
3158                    path_suffixes: vec!["rs".to_string()],
3159                    language_server: Some(language_server_config),
3160                    ..Default::default()
3161                },
3162                Some(tree_sitter_rust::language()),
3163            )));
3164
3165        // Connect to a server as 2 clients.
3166        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3167        let client_a = server.create_client(cx_a, "user_a").await;
3168        let client_b = server.create_client(cx_b, "user_b").await;
3169
3170        // Share a project as client A
3171        fs.insert_tree(
3172            "/a",
3173            json!({
3174                ".zed.toml": r#"collaborators = ["user_b"]"#,
3175                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3176                "other.rs": "pub fn foo() -> usize { 4 }",
3177            }),
3178        )
3179        .await;
3180        let project_a = cx_a.update(|cx| {
3181            Project::local(
3182                client_a.clone(),
3183                client_a.user_store.clone(),
3184                lang_registry.clone(),
3185                fs.clone(),
3186                cx,
3187            )
3188        });
3189        let (worktree_a, _) = project_a
3190            .update(cx_a, |p, cx| {
3191                p.find_or_create_local_worktree("/a", true, cx)
3192            })
3193            .await
3194            .unwrap();
3195        worktree_a
3196            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3197            .await;
3198        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3199        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3200        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3201
3202        // Join the worktree as client B.
3203        let project_b = Project::remote(
3204            project_id,
3205            client_b.clone(),
3206            client_b.user_store.clone(),
3207            lang_registry.clone(),
3208            fs.clone(),
3209            &mut cx_b.to_async(),
3210        )
3211        .await
3212        .unwrap();
3213        let mut params = cx_b.update(WorkspaceParams::test);
3214        params.languages = lang_registry.clone();
3215        params.client = client_b.client.clone();
3216        params.user_store = client_b.user_store.clone();
3217        params.project = project_b;
3218        params.path_openers = path_openers_b.into();
3219
3220        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3221        let editor_b = workspace_b
3222            .update(cx_b, |workspace, cx| {
3223                workspace.open_path((worktree_id, "main.rs").into(), cx)
3224            })
3225            .await
3226            .unwrap()
3227            .downcast::<Editor>()
3228            .unwrap();
3229
3230        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3231        fake_language_server
3232            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3233                assert_eq!(
3234                    params.text_document.uri,
3235                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3236                );
3237                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3238                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3239                None
3240            })
3241            .next()
3242            .await;
3243
3244        // Move cursor to a location that contains code actions.
3245        editor_b.update(cx_b, |editor, cx| {
3246            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3247            cx.focus(&editor_b);
3248        });
3249
3250        fake_language_server
3251            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3252                assert_eq!(
3253                    params.text_document.uri,
3254                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3255                );
3256                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3257                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3258
3259                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3260                    lsp::CodeAction {
3261                        title: "Inline into all callers".to_string(),
3262                        edit: Some(lsp::WorkspaceEdit {
3263                            changes: Some(
3264                                [
3265                                    (
3266                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3267                                        vec![lsp::TextEdit::new(
3268                                            lsp::Range::new(
3269                                                lsp::Position::new(1, 22),
3270                                                lsp::Position::new(1, 34),
3271                                            ),
3272                                            "4".to_string(),
3273                                        )],
3274                                    ),
3275                                    (
3276                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3277                                        vec![lsp::TextEdit::new(
3278                                            lsp::Range::new(
3279                                                lsp::Position::new(0, 0),
3280                                                lsp::Position::new(0, 27),
3281                                            ),
3282                                            "".to_string(),
3283                                        )],
3284                                    ),
3285                                ]
3286                                .into_iter()
3287                                .collect(),
3288                            ),
3289                            ..Default::default()
3290                        }),
3291                        data: Some(json!({
3292                            "codeActionParams": {
3293                                "range": {
3294                                    "start": {"line": 1, "column": 31},
3295                                    "end": {"line": 1, "column": 31},
3296                                }
3297                            }
3298                        })),
3299                        ..Default::default()
3300                    },
3301                )])
3302            })
3303            .next()
3304            .await;
3305
3306        // Toggle code actions and wait for them to display.
3307        editor_b.update(cx_b, |editor, cx| {
3308            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3309        });
3310        editor_b
3311            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3312            .await;
3313
3314        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3315
3316        // Confirming the code action will trigger a resolve request.
3317        let confirm_action = workspace_b
3318            .update(cx_b, |workspace, cx| {
3319                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3320            })
3321            .unwrap();
3322        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3323            lsp::CodeAction {
3324                title: "Inline into all callers".to_string(),
3325                edit: Some(lsp::WorkspaceEdit {
3326                    changes: Some(
3327                        [
3328                            (
3329                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3330                                vec![lsp::TextEdit::new(
3331                                    lsp::Range::new(
3332                                        lsp::Position::new(1, 22),
3333                                        lsp::Position::new(1, 34),
3334                                    ),
3335                                    "4".to_string(),
3336                                )],
3337                            ),
3338                            (
3339                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3340                                vec![lsp::TextEdit::new(
3341                                    lsp::Range::new(
3342                                        lsp::Position::new(0, 0),
3343                                        lsp::Position::new(0, 27),
3344                                    ),
3345                                    "".to_string(),
3346                                )],
3347                            ),
3348                        ]
3349                        .into_iter()
3350                        .collect(),
3351                    ),
3352                    ..Default::default()
3353                }),
3354                ..Default::default()
3355            }
3356        });
3357
3358        // After the action is confirmed, an editor containing both modified files is opened.
3359        confirm_action.await.unwrap();
3360        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3361            workspace
3362                .active_item(cx)
3363                .unwrap()
3364                .downcast::<Editor>()
3365                .unwrap()
3366        });
3367        code_action_editor.update(cx_b, |editor, cx| {
3368            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3369            editor.undo(&Undo, cx);
3370            assert_eq!(
3371                editor.text(cx),
3372                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3373            );
3374            editor.redo(&Redo, cx);
3375            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3376        });
3377    }
3378
3379    #[gpui::test(iterations = 10)]
3380    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3381        cx_a.foreground().forbid_parking();
3382        let mut lang_registry = Arc::new(LanguageRegistry::test());
3383        let fs = FakeFs::new(cx_a.background());
3384        let mut path_openers_b = Vec::new();
3385        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
3386
3387        // Set up a fake language server.
3388        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3389        Arc::get_mut(&mut lang_registry)
3390            .unwrap()
3391            .add(Arc::new(Language::new(
3392                LanguageConfig {
3393                    name: "Rust".into(),
3394                    path_suffixes: vec!["rs".to_string()],
3395                    language_server: Some(language_server_config),
3396                    ..Default::default()
3397                },
3398                Some(tree_sitter_rust::language()),
3399            )));
3400
3401        // Connect to a server as 2 clients.
3402        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3403        let client_a = server.create_client(cx_a, "user_a").await;
3404        let client_b = server.create_client(cx_b, "user_b").await;
3405
3406        // Share a project as client A
3407        fs.insert_tree(
3408            "/dir",
3409            json!({
3410                ".zed.toml": r#"collaborators = ["user_b"]"#,
3411                "one.rs": "const ONE: usize = 1;",
3412                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3413            }),
3414        )
3415        .await;
3416        let project_a = cx_a.update(|cx| {
3417            Project::local(
3418                client_a.clone(),
3419                client_a.user_store.clone(),
3420                lang_registry.clone(),
3421                fs.clone(),
3422                cx,
3423            )
3424        });
3425        let (worktree_a, _) = project_a
3426            .update(cx_a, |p, cx| {
3427                p.find_or_create_local_worktree("/dir", true, cx)
3428            })
3429            .await
3430            .unwrap();
3431        worktree_a
3432            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3433            .await;
3434        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3435        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3436        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3437
3438        // Join the worktree as client B.
3439        let project_b = Project::remote(
3440            project_id,
3441            client_b.clone(),
3442            client_b.user_store.clone(),
3443            lang_registry.clone(),
3444            fs.clone(),
3445            &mut cx_b.to_async(),
3446        )
3447        .await
3448        .unwrap();
3449        let mut params = cx_b.update(WorkspaceParams::test);
3450        params.languages = lang_registry.clone();
3451        params.client = client_b.client.clone();
3452        params.user_store = client_b.user_store.clone();
3453        params.project = project_b;
3454        params.path_openers = path_openers_b.into();
3455
3456        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3457        let editor_b = workspace_b
3458            .update(cx_b, |workspace, cx| {
3459                workspace.open_path((worktree_id, "one.rs").into(), cx)
3460            })
3461            .await
3462            .unwrap()
3463            .downcast::<Editor>()
3464            .unwrap();
3465        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3466
3467        // Move cursor to a location that can be renamed.
3468        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3469            editor.select_ranges([7..7], None, cx);
3470            editor.rename(&Rename, cx).unwrap()
3471        });
3472
3473        fake_language_server
3474            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3475                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3476                assert_eq!(params.position, lsp::Position::new(0, 7));
3477                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3478                    lsp::Position::new(0, 6),
3479                    lsp::Position::new(0, 9),
3480                )))
3481            })
3482            .next()
3483            .await
3484            .unwrap();
3485        prepare_rename.await.unwrap();
3486        editor_b.update(cx_b, |editor, cx| {
3487            let rename = editor.pending_rename().unwrap();
3488            let buffer = editor.buffer().read(cx).snapshot(cx);
3489            assert_eq!(
3490                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3491                6..9
3492            );
3493            rename.editor.update(cx, |rename_editor, cx| {
3494                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3495                    rename_buffer.edit([0..3], "THREE", cx);
3496                });
3497            });
3498        });
3499
3500        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3501            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3502        });
3503        fake_language_server
3504            .handle_request::<lsp::request::Rename, _>(|params, _| {
3505                assert_eq!(
3506                    params.text_document_position.text_document.uri.as_str(),
3507                    "file:///dir/one.rs"
3508                );
3509                assert_eq!(
3510                    params.text_document_position.position,
3511                    lsp::Position::new(0, 6)
3512                );
3513                assert_eq!(params.new_name, "THREE");
3514                Some(lsp::WorkspaceEdit {
3515                    changes: Some(
3516                        [
3517                            (
3518                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3519                                vec![lsp::TextEdit::new(
3520                                    lsp::Range::new(
3521                                        lsp::Position::new(0, 6),
3522                                        lsp::Position::new(0, 9),
3523                                    ),
3524                                    "THREE".to_string(),
3525                                )],
3526                            ),
3527                            (
3528                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3529                                vec![
3530                                    lsp::TextEdit::new(
3531                                        lsp::Range::new(
3532                                            lsp::Position::new(0, 24),
3533                                            lsp::Position::new(0, 27),
3534                                        ),
3535                                        "THREE".to_string(),
3536                                    ),
3537                                    lsp::TextEdit::new(
3538                                        lsp::Range::new(
3539                                            lsp::Position::new(0, 35),
3540                                            lsp::Position::new(0, 38),
3541                                        ),
3542                                        "THREE".to_string(),
3543                                    ),
3544                                ],
3545                            ),
3546                        ]
3547                        .into_iter()
3548                        .collect(),
3549                    ),
3550                    ..Default::default()
3551                })
3552            })
3553            .next()
3554            .await
3555            .unwrap();
3556        confirm_rename.await.unwrap();
3557
3558        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3559            workspace
3560                .active_item(cx)
3561                .unwrap()
3562                .downcast::<Editor>()
3563                .unwrap()
3564        });
3565        rename_editor.update(cx_b, |editor, cx| {
3566            assert_eq!(
3567                editor.text(cx),
3568                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3569            );
3570            editor.undo(&Undo, cx);
3571            assert_eq!(
3572                editor.text(cx),
3573                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3574            );
3575            editor.redo(&Redo, cx);
3576            assert_eq!(
3577                editor.text(cx),
3578                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3579            );
3580        });
3581
3582        // Ensure temporary rename edits cannot be undone/redone.
3583        editor_b.update(cx_b, |editor, cx| {
3584            editor.undo(&Undo, cx);
3585            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3586            editor.undo(&Undo, cx);
3587            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3588            editor.redo(&Redo, cx);
3589            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3590        })
3591    }
3592
3593    #[gpui::test(iterations = 10)]
3594    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3595        cx_a.foreground().forbid_parking();
3596
3597        // Connect to a server as 2 clients.
3598        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3599        let client_a = server.create_client(cx_a, "user_a").await;
3600        let client_b = server.create_client(cx_b, "user_b").await;
3601
3602        // Create an org that includes these 2 users.
3603        let db = &server.app_state.db;
3604        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3605        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3606            .await
3607            .unwrap();
3608        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3609            .await
3610            .unwrap();
3611
3612        // Create a channel that includes all the users.
3613        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3614        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3615            .await
3616            .unwrap();
3617        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3618            .await
3619            .unwrap();
3620        db.create_channel_message(
3621            channel_id,
3622            client_b.current_user_id(&cx_b),
3623            "hello A, it's B.",
3624            OffsetDateTime::now_utc(),
3625            1,
3626        )
3627        .await
3628        .unwrap();
3629
3630        let channels_a = cx_a
3631            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3632        channels_a
3633            .condition(cx_a, |list, _| list.available_channels().is_some())
3634            .await;
3635        channels_a.read_with(cx_a, |list, _| {
3636            assert_eq!(
3637                list.available_channels().unwrap(),
3638                &[ChannelDetails {
3639                    id: channel_id.to_proto(),
3640                    name: "test-channel".to_string()
3641                }]
3642            )
3643        });
3644        let channel_a = channels_a.update(cx_a, |this, cx| {
3645            this.get_channel(channel_id.to_proto(), cx).unwrap()
3646        });
3647        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3648        channel_a
3649            .condition(&cx_a, |channel, _| {
3650                channel_messages(channel)
3651                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3652            })
3653            .await;
3654
3655        let channels_b = cx_b
3656            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3657        channels_b
3658            .condition(cx_b, |list, _| list.available_channels().is_some())
3659            .await;
3660        channels_b.read_with(cx_b, |list, _| {
3661            assert_eq!(
3662                list.available_channels().unwrap(),
3663                &[ChannelDetails {
3664                    id: channel_id.to_proto(),
3665                    name: "test-channel".to_string()
3666                }]
3667            )
3668        });
3669
3670        let channel_b = channels_b.update(cx_b, |this, cx| {
3671            this.get_channel(channel_id.to_proto(), cx).unwrap()
3672        });
3673        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3674        channel_b
3675            .condition(&cx_b, |channel, _| {
3676                channel_messages(channel)
3677                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3678            })
3679            .await;
3680
3681        channel_a
3682            .update(cx_a, |channel, cx| {
3683                channel
3684                    .send_message("oh, hi B.".to_string(), cx)
3685                    .unwrap()
3686                    .detach();
3687                let task = channel.send_message("sup".to_string(), cx).unwrap();
3688                assert_eq!(
3689                    channel_messages(channel),
3690                    &[
3691                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3692                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3693                        ("user_a".to_string(), "sup".to_string(), true)
3694                    ]
3695                );
3696                task
3697            })
3698            .await
3699            .unwrap();
3700
3701        channel_b
3702            .condition(&cx_b, |channel, _| {
3703                channel_messages(channel)
3704                    == [
3705                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3706                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3707                        ("user_a".to_string(), "sup".to_string(), false),
3708                    ]
3709            })
3710            .await;
3711
3712        assert_eq!(
3713            server
3714                .state()
3715                .await
3716                .channel(channel_id)
3717                .unwrap()
3718                .connection_ids
3719                .len(),
3720            2
3721        );
3722        cx_b.update(|_| drop(channel_b));
3723        server
3724            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3725            .await;
3726
3727        cx_a.update(|_| drop(channel_a));
3728        server
3729            .condition(|state| state.channel(channel_id).is_none())
3730            .await;
3731    }
3732
3733    #[gpui::test(iterations = 10)]
3734    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3735        cx_a.foreground().forbid_parking();
3736
3737        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3738        let client_a = server.create_client(cx_a, "user_a").await;
3739
3740        let db = &server.app_state.db;
3741        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3742        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3743        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3744            .await
3745            .unwrap();
3746        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3747            .await
3748            .unwrap();
3749
3750        let channels_a = cx_a
3751            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3752        channels_a
3753            .condition(cx_a, |list, _| list.available_channels().is_some())
3754            .await;
3755        let channel_a = channels_a.update(cx_a, |this, cx| {
3756            this.get_channel(channel_id.to_proto(), cx).unwrap()
3757        });
3758
3759        // Messages aren't allowed to be too long.
3760        channel_a
3761            .update(cx_a, |channel, cx| {
3762                let long_body = "this is long.\n".repeat(1024);
3763                channel.send_message(long_body, cx).unwrap()
3764            })
3765            .await
3766            .unwrap_err();
3767
3768        // Messages aren't allowed to be blank.
3769        channel_a.update(cx_a, |channel, cx| {
3770            channel.send_message(String::new(), cx).unwrap_err()
3771        });
3772
3773        // Leading and trailing whitespace are trimmed.
3774        channel_a
3775            .update(cx_a, |channel, cx| {
3776                channel
3777                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3778                    .unwrap()
3779            })
3780            .await
3781            .unwrap();
3782        assert_eq!(
3783            db.get_channel_messages(channel_id, 10, None)
3784                .await
3785                .unwrap()
3786                .iter()
3787                .map(|m| &m.body)
3788                .collect::<Vec<_>>(),
3789            &["surrounded by whitespace"]
3790        );
3791    }
3792
3793    #[gpui::test(iterations = 10)]
3794    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3795        cx_a.foreground().forbid_parking();
3796
3797        // Connect to a server as 2 clients.
3798        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3799        let client_a = server.create_client(cx_a, "user_a").await;
3800        let client_b = server.create_client(cx_b, "user_b").await;
3801        let mut status_b = client_b.status();
3802
3803        // Create an org that includes these 2 users.
3804        let db = &server.app_state.db;
3805        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3806        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3807            .await
3808            .unwrap();
3809        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3810            .await
3811            .unwrap();
3812
3813        // Create a channel that includes all the users.
3814        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3815        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3816            .await
3817            .unwrap();
3818        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3819            .await
3820            .unwrap();
3821        db.create_channel_message(
3822            channel_id,
3823            client_b.current_user_id(&cx_b),
3824            "hello A, it's B.",
3825            OffsetDateTime::now_utc(),
3826            2,
3827        )
3828        .await
3829        .unwrap();
3830
3831        let channels_a = cx_a
3832            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3833        channels_a
3834            .condition(cx_a, |list, _| list.available_channels().is_some())
3835            .await;
3836
3837        channels_a.read_with(cx_a, |list, _| {
3838            assert_eq!(
3839                list.available_channels().unwrap(),
3840                &[ChannelDetails {
3841                    id: channel_id.to_proto(),
3842                    name: "test-channel".to_string()
3843                }]
3844            )
3845        });
3846        let channel_a = channels_a.update(cx_a, |this, cx| {
3847            this.get_channel(channel_id.to_proto(), cx).unwrap()
3848        });
3849        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3850        channel_a
3851            .condition(&cx_a, |channel, _| {
3852                channel_messages(channel)
3853                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3854            })
3855            .await;
3856
3857        let channels_b = cx_b
3858            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3859        channels_b
3860            .condition(cx_b, |list, _| list.available_channels().is_some())
3861            .await;
3862        channels_b.read_with(cx_b, |list, _| {
3863            assert_eq!(
3864                list.available_channels().unwrap(),
3865                &[ChannelDetails {
3866                    id: channel_id.to_proto(),
3867                    name: "test-channel".to_string()
3868                }]
3869            )
3870        });
3871
3872        let channel_b = channels_b.update(cx_b, |this, cx| {
3873            this.get_channel(channel_id.to_proto(), cx).unwrap()
3874        });
3875        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3876        channel_b
3877            .condition(&cx_b, |channel, _| {
3878                channel_messages(channel)
3879                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3880            })
3881            .await;
3882
3883        // Disconnect client B, ensuring we can still access its cached channel data.
3884        server.forbid_connections();
3885        server.disconnect_client(client_b.current_user_id(&cx_b));
3886        cx_b.foreground().advance_clock(Duration::from_secs(3));
3887        while !matches!(
3888            status_b.next().await,
3889            Some(client::Status::ReconnectionError { .. })
3890        ) {}
3891
3892        channels_b.read_with(cx_b, |channels, _| {
3893            assert_eq!(
3894                channels.available_channels().unwrap(),
3895                [ChannelDetails {
3896                    id: channel_id.to_proto(),
3897                    name: "test-channel".to_string()
3898                }]
3899            )
3900        });
3901        channel_b.read_with(cx_b, |channel, _| {
3902            assert_eq!(
3903                channel_messages(channel),
3904                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3905            )
3906        });
3907
3908        // Send a message from client B while it is disconnected.
3909        channel_b
3910            .update(cx_b, |channel, cx| {
3911                let task = channel
3912                    .send_message("can you see this?".to_string(), cx)
3913                    .unwrap();
3914                assert_eq!(
3915                    channel_messages(channel),
3916                    &[
3917                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3918                        ("user_b".to_string(), "can you see this?".to_string(), true)
3919                    ]
3920                );
3921                task
3922            })
3923            .await
3924            .unwrap_err();
3925
3926        // Send a message from client A while B is disconnected.
3927        channel_a
3928            .update(cx_a, |channel, cx| {
3929                channel
3930                    .send_message("oh, hi B.".to_string(), cx)
3931                    .unwrap()
3932                    .detach();
3933                let task = channel.send_message("sup".to_string(), cx).unwrap();
3934                assert_eq!(
3935                    channel_messages(channel),
3936                    &[
3937                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3938                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3939                        ("user_a".to_string(), "sup".to_string(), true)
3940                    ]
3941                );
3942                task
3943            })
3944            .await
3945            .unwrap();
3946
3947        // Give client B a chance to reconnect.
3948        server.allow_connections();
3949        cx_b.foreground().advance_clock(Duration::from_secs(10));
3950
3951        // Verify that B sees the new messages upon reconnection, as well as the message client B
3952        // sent while offline.
3953        channel_b
3954            .condition(&cx_b, |channel, _| {
3955                channel_messages(channel)
3956                    == [
3957                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3958                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3959                        ("user_a".to_string(), "sup".to_string(), false),
3960                        ("user_b".to_string(), "can you see this?".to_string(), false),
3961                    ]
3962            })
3963            .await;
3964
3965        // Ensure client A and B can communicate normally after reconnection.
3966        channel_a
3967            .update(cx_a, |channel, cx| {
3968                channel.send_message("you online?".to_string(), cx).unwrap()
3969            })
3970            .await
3971            .unwrap();
3972        channel_b
3973            .condition(&cx_b, |channel, _| {
3974                channel_messages(channel)
3975                    == [
3976                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3977                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3978                        ("user_a".to_string(), "sup".to_string(), false),
3979                        ("user_b".to_string(), "can you see this?".to_string(), false),
3980                        ("user_a".to_string(), "you online?".to_string(), false),
3981                    ]
3982            })
3983            .await;
3984
3985        channel_b
3986            .update(cx_b, |channel, cx| {
3987                channel.send_message("yep".to_string(), cx).unwrap()
3988            })
3989            .await
3990            .unwrap();
3991        channel_a
3992            .condition(&cx_a, |channel, _| {
3993                channel_messages(channel)
3994                    == [
3995                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3996                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3997                        ("user_a".to_string(), "sup".to_string(), false),
3998                        ("user_b".to_string(), "can you see this?".to_string(), false),
3999                        ("user_a".to_string(), "you online?".to_string(), false),
4000                        ("user_b".to_string(), "yep".to_string(), false),
4001                    ]
4002            })
4003            .await;
4004    }
4005
4006    #[gpui::test(iterations = 10)]
4007    async fn test_contacts(
4008        cx_a: &mut TestAppContext,
4009        cx_b: &mut TestAppContext,
4010        cx_c: &mut TestAppContext,
4011    ) {
4012        cx_a.foreground().forbid_parking();
4013        let lang_registry = Arc::new(LanguageRegistry::test());
4014        let fs = FakeFs::new(cx_a.background());
4015
4016        // Connect to a server as 3 clients.
4017        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4018        let client_a = server.create_client(cx_a, "user_a").await;
4019        let client_b = server.create_client(cx_b, "user_b").await;
4020        let client_c = server.create_client(cx_c, "user_c").await;
4021
4022        // Share a worktree as client A.
4023        fs.insert_tree(
4024            "/a",
4025            json!({
4026                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4027            }),
4028        )
4029        .await;
4030
4031        let project_a = cx_a.update(|cx| {
4032            Project::local(
4033                client_a.clone(),
4034                client_a.user_store.clone(),
4035                lang_registry.clone(),
4036                fs.clone(),
4037                cx,
4038            )
4039        });
4040        let (worktree_a, _) = project_a
4041            .update(cx_a, |p, cx| {
4042                p.find_or_create_local_worktree("/a", true, cx)
4043            })
4044            .await
4045            .unwrap();
4046        worktree_a
4047            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4048            .await;
4049
4050        client_a
4051            .user_store
4052            .condition(&cx_a, |user_store, _| {
4053                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4054            })
4055            .await;
4056        client_b
4057            .user_store
4058            .condition(&cx_b, |user_store, _| {
4059                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4060            })
4061            .await;
4062        client_c
4063            .user_store
4064            .condition(&cx_c, |user_store, _| {
4065                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4066            })
4067            .await;
4068
4069        let project_id = project_a
4070            .update(cx_a, |project, _| project.next_remote_id())
4071            .await;
4072        project_a
4073            .update(cx_a, |project, cx| project.share(cx))
4074            .await
4075            .unwrap();
4076
4077        let _project_b = Project::remote(
4078            project_id,
4079            client_b.clone(),
4080            client_b.user_store.clone(),
4081            lang_registry.clone(),
4082            fs.clone(),
4083            &mut cx_b.to_async(),
4084        )
4085        .await
4086        .unwrap();
4087
4088        client_a
4089            .user_store
4090            .condition(&cx_a, |user_store, _| {
4091                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4092            })
4093            .await;
4094        client_b
4095            .user_store
4096            .condition(&cx_b, |user_store, _| {
4097                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4098            })
4099            .await;
4100        client_c
4101            .user_store
4102            .condition(&cx_c, |user_store, _| {
4103                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4104            })
4105            .await;
4106
4107        project_a
4108            .condition(&cx_a, |project, _| {
4109                project.collaborators().contains_key(&client_b.peer_id)
4110            })
4111            .await;
4112
4113        cx_a.update(move |_| drop(project_a));
4114        client_a
4115            .user_store
4116            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4117            .await;
4118        client_b
4119            .user_store
4120            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4121            .await;
4122        client_c
4123            .user_store
4124            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4125            .await;
4126
4127        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4128            user_store
4129                .contacts()
4130                .iter()
4131                .map(|contact| {
4132                    let worktrees = contact
4133                        .projects
4134                        .iter()
4135                        .map(|p| {
4136                            (
4137                                p.worktree_root_names[0].as_str(),
4138                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4139                            )
4140                        })
4141                        .collect();
4142                    (contact.user.github_login.as_str(), worktrees)
4143                })
4144                .collect()
4145        }
4146    }
4147
4148    #[gpui::test(iterations = 100)]
4149    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4150        cx.foreground().forbid_parking();
4151        let max_peers = env::var("MAX_PEERS")
4152            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4153            .unwrap_or(5);
4154        let max_operations = env::var("OPERATIONS")
4155            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4156            .unwrap_or(10);
4157
4158        let rng = Arc::new(Mutex::new(rng));
4159
4160        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4161        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4162
4163        let fs = FakeFs::new(cx.background());
4164        fs.insert_tree(
4165            "/_collab",
4166            json!({
4167                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4168            }),
4169        )
4170        .await;
4171
4172        let operations = Rc::new(Cell::new(0));
4173        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4174        let mut clients = Vec::new();
4175
4176        let mut next_entity_id = 100000;
4177        let mut host_cx = TestAppContext::new(
4178            cx.foreground_platform(),
4179            cx.platform(),
4180            cx.foreground(),
4181            cx.background(),
4182            cx.font_cache(),
4183            cx.leak_detector(),
4184            next_entity_id,
4185        );
4186        let host = server.create_client(&mut host_cx, "host").await;
4187        let host_project = host_cx.update(|cx| {
4188            Project::local(
4189                host.client.clone(),
4190                host.user_store.clone(),
4191                Arc::new(LanguageRegistry::test()),
4192                fs.clone(),
4193                cx,
4194            )
4195        });
4196        let host_project_id = host_project
4197            .update(&mut host_cx, |p, _| p.next_remote_id())
4198            .await;
4199
4200        let (collab_worktree, _) = host_project
4201            .update(&mut host_cx, |project, cx| {
4202                project.find_or_create_local_worktree("/_collab", true, cx)
4203            })
4204            .await
4205            .unwrap();
4206        collab_worktree
4207            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4208            .await;
4209        host_project
4210            .update(&mut host_cx, |project, cx| project.share(cx))
4211            .await
4212            .unwrap();
4213
4214        clients.push(cx.foreground().spawn(host.simulate_host(
4215            host_project,
4216            language_server_config,
4217            operations.clone(),
4218            max_operations,
4219            rng.clone(),
4220            host_cx,
4221        )));
4222
4223        while operations.get() < max_operations {
4224            cx.background().simulate_random_delay().await;
4225            if clients.len() >= max_peers {
4226                break;
4227            } else if rng.lock().gen_bool(0.05) {
4228                operations.set(operations.get() + 1);
4229
4230                let guest_id = clients.len();
4231                log::info!("Adding guest {}", guest_id);
4232                next_entity_id += 100000;
4233                let mut guest_cx = TestAppContext::new(
4234                    cx.foreground_platform(),
4235                    cx.platform(),
4236                    cx.foreground(),
4237                    cx.background(),
4238                    cx.font_cache(),
4239                    cx.leak_detector(),
4240                    next_entity_id,
4241                );
4242                let guest = server
4243                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4244                    .await;
4245                let guest_project = Project::remote(
4246                    host_project_id,
4247                    guest.client.clone(),
4248                    guest.user_store.clone(),
4249                    guest_lang_registry.clone(),
4250                    FakeFs::new(cx.background()),
4251                    &mut guest_cx.to_async(),
4252                )
4253                .await
4254                .unwrap();
4255                clients.push(cx.foreground().spawn(guest.simulate_guest(
4256                    guest_id,
4257                    guest_project,
4258                    operations.clone(),
4259                    max_operations,
4260                    rng.clone(),
4261                    guest_cx,
4262                )));
4263
4264                log::info!("Guest {} added", guest_id);
4265            }
4266        }
4267
4268        let mut clients = futures::future::join_all(clients).await;
4269        cx.foreground().run_until_parked();
4270
4271        let (host_client, mut host_cx) = clients.remove(0);
4272        let host_project = host_client.project.as_ref().unwrap();
4273        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4274            project
4275                .worktrees(cx)
4276                .map(|worktree| {
4277                    let snapshot = worktree.read(cx).snapshot();
4278                    (snapshot.id(), snapshot)
4279                })
4280                .collect::<BTreeMap<_, _>>()
4281        });
4282
4283        host_client
4284            .project
4285            .as_ref()
4286            .unwrap()
4287            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4288
4289        for (guest_client, mut guest_cx) in clients.into_iter() {
4290            let guest_id = guest_client.client.id();
4291            let worktree_snapshots =
4292                guest_client
4293                    .project
4294                    .as_ref()
4295                    .unwrap()
4296                    .read_with(&guest_cx, |project, cx| {
4297                        project
4298                            .worktrees(cx)
4299                            .map(|worktree| {
4300                                let worktree = worktree.read(cx);
4301                                (worktree.id(), worktree.snapshot())
4302                            })
4303                            .collect::<BTreeMap<_, _>>()
4304                    });
4305
4306            assert_eq!(
4307                worktree_snapshots.keys().collect::<Vec<_>>(),
4308                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4309                "guest {} has different worktrees than the host",
4310                guest_id
4311            );
4312            for (id, host_snapshot) in &host_worktree_snapshots {
4313                let guest_snapshot = &worktree_snapshots[id];
4314                assert_eq!(
4315                    guest_snapshot.root_name(),
4316                    host_snapshot.root_name(),
4317                    "guest {} has different root name than the host for worktree {}",
4318                    guest_id,
4319                    id
4320                );
4321                assert_eq!(
4322                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4323                    host_snapshot.entries(false).collect::<Vec<_>>(),
4324                    "guest {} has different snapshot than the host for worktree {}",
4325                    guest_id,
4326                    id
4327                );
4328            }
4329
4330            guest_client
4331                .project
4332                .as_ref()
4333                .unwrap()
4334                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4335
4336            for guest_buffer in &guest_client.buffers {
4337                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4338                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4339                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4340                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4341                        guest_id, guest_client.peer_id, buffer_id
4342                    ))
4343                });
4344                let path = host_buffer
4345                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4346
4347                assert_eq!(
4348                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4349                    0,
4350                    "guest {}, buffer {}, path {:?} has deferred operations",
4351                    guest_id,
4352                    buffer_id,
4353                    path,
4354                );
4355                assert_eq!(
4356                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4357                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4358                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4359                    guest_id,
4360                    buffer_id,
4361                    path
4362                );
4363            }
4364
4365            guest_cx.update(|_| drop(guest_client));
4366        }
4367
4368        host_cx.update(|_| drop(host_client));
4369    }
4370
4371    struct TestServer {
4372        peer: Arc<Peer>,
4373        app_state: Arc<AppState>,
4374        server: Arc<Server>,
4375        foreground: Rc<executor::Foreground>,
4376        notifications: mpsc::UnboundedReceiver<()>,
4377        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4378        forbid_connections: Arc<AtomicBool>,
4379        _test_db: TestDb,
4380    }
4381
4382    impl TestServer {
4383        async fn start(
4384            foreground: Rc<executor::Foreground>,
4385            background: Arc<executor::Background>,
4386        ) -> Self {
4387            let test_db = TestDb::fake(background);
4388            let app_state = Self::build_app_state(&test_db).await;
4389            let peer = Peer::new();
4390            let notifications = mpsc::unbounded();
4391            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4392            Self {
4393                peer,
4394                app_state,
4395                server,
4396                foreground,
4397                notifications: notifications.1,
4398                connection_killers: Default::default(),
4399                forbid_connections: Default::default(),
4400                _test_db: test_db,
4401            }
4402        }
4403
4404        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4405            let http = FakeHttpClient::with_404_response();
4406            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4407            let client_name = name.to_string();
4408            let mut client = Client::new(http.clone());
4409            let server = self.server.clone();
4410            let connection_killers = self.connection_killers.clone();
4411            let forbid_connections = self.forbid_connections.clone();
4412            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4413
4414            Arc::get_mut(&mut client)
4415                .unwrap()
4416                .override_authenticate(move |cx| {
4417                    cx.spawn(|_| async move {
4418                        let access_token = "the-token".to_string();
4419                        Ok(Credentials {
4420                            user_id: user_id.0 as u64,
4421                            access_token,
4422                        })
4423                    })
4424                })
4425                .override_establish_connection(move |credentials, cx| {
4426                    assert_eq!(credentials.user_id, user_id.0 as u64);
4427                    assert_eq!(credentials.access_token, "the-token");
4428
4429                    let server = server.clone();
4430                    let connection_killers = connection_killers.clone();
4431                    let forbid_connections = forbid_connections.clone();
4432                    let client_name = client_name.clone();
4433                    let connection_id_tx = connection_id_tx.clone();
4434                    cx.spawn(move |cx| async move {
4435                        if forbid_connections.load(SeqCst) {
4436                            Err(EstablishConnectionError::other(anyhow!(
4437                                "server is forbidding connections"
4438                            )))
4439                        } else {
4440                            let (client_conn, server_conn, kill_conn) =
4441                                Connection::in_memory(cx.background());
4442                            connection_killers.lock().insert(user_id, kill_conn);
4443                            cx.background()
4444                                .spawn(server.handle_connection(
4445                                    server_conn,
4446                                    client_name,
4447                                    user_id,
4448                                    Some(connection_id_tx),
4449                                    cx.background(),
4450                                ))
4451                                .detach();
4452                            Ok(client_conn)
4453                        }
4454                    })
4455                });
4456
4457            client
4458                .authenticate_and_connect(&cx.to_async())
4459                .await
4460                .unwrap();
4461
4462            Channel::init(&client);
4463            Project::init(&client);
4464
4465            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4466            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4467
4468            let client = TestClient {
4469                client,
4470                peer_id,
4471                user_store,
4472                project: Default::default(),
4473                buffers: Default::default(),
4474            };
4475            client.wait_for_current_user(cx).await;
4476            client
4477        }
4478
4479        fn disconnect_client(&self, user_id: UserId) {
4480            self.connection_killers.lock().remove(&user_id);
4481        }
4482
4483        fn forbid_connections(&self) {
4484            self.forbid_connections.store(true, SeqCst);
4485        }
4486
4487        fn allow_connections(&self) {
4488            self.forbid_connections.store(false, SeqCst);
4489        }
4490
4491        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4492            let mut config = Config::default();
4493            config.session_secret = "a".repeat(32);
4494            config.database_url = test_db.url.clone();
4495            let github_client = github::AppClient::test();
4496            Arc::new(AppState {
4497                db: test_db.db().clone(),
4498                handlebars: Default::default(),
4499                auth_client: auth::build_client("", ""),
4500                repo_client: github::RepoClient::test(&github_client),
4501                github_client,
4502                config,
4503            })
4504        }
4505
4506        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4507            self.server.store.read()
4508        }
4509
4510        async fn condition<F>(&mut self, mut predicate: F)
4511        where
4512            F: FnMut(&Store) -> bool,
4513        {
4514            async_std::future::timeout(Duration::from_millis(500), async {
4515                while !(predicate)(&*self.server.store.read()) {
4516                    self.foreground.start_waiting();
4517                    self.notifications.next().await;
4518                    self.foreground.finish_waiting();
4519                }
4520            })
4521            .await
4522            .expect("condition timed out");
4523        }
4524    }
4525
4526    impl Drop for TestServer {
4527        fn drop(&mut self) {
4528            self.peer.reset();
4529        }
4530    }
4531
4532    struct TestClient {
4533        client: Arc<Client>,
4534        pub peer_id: PeerId,
4535        pub user_store: ModelHandle<UserStore>,
4536        project: Option<ModelHandle<Project>>,
4537        buffers: HashSet<ModelHandle<language::Buffer>>,
4538    }
4539
4540    impl Deref for TestClient {
4541        type Target = Arc<Client>;
4542
4543        fn deref(&self) -> &Self::Target {
4544            &self.client
4545        }
4546    }
4547
4548    impl TestClient {
4549        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4550            UserId::from_proto(
4551                self.user_store
4552                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4553            )
4554        }
4555
4556        async fn wait_for_current_user(&self, cx: &TestAppContext) {
4557            let mut authed_user = self
4558                .user_store
4559                .read_with(cx, |user_store, _| user_store.watch_current_user());
4560            while authed_user.next().await.unwrap().is_none() {}
4561        }
4562
4563        fn simulate_host(
4564            mut self,
4565            project: ModelHandle<Project>,
4566            mut language_server_config: LanguageServerConfig,
4567            operations: Rc<Cell<usize>>,
4568            max_operations: usize,
4569            rng: Arc<Mutex<StdRng>>,
4570            mut cx: TestAppContext,
4571        ) -> impl Future<Output = (Self, TestAppContext)> {
4572            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4573
4574            // Set up a fake language server.
4575            language_server_config.set_fake_initializer({
4576                let rng = rng.clone();
4577                let files = files.clone();
4578                let project = project.downgrade();
4579                move |fake_server| {
4580                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4581                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4582                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4583                                range: lsp::Range::new(
4584                                    lsp::Position::new(0, 0),
4585                                    lsp::Position::new(0, 0),
4586                                ),
4587                                new_text: "the-new-text".to_string(),
4588                            })),
4589                            ..Default::default()
4590                        }]))
4591                    });
4592
4593                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4594                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4595                            lsp::CodeAction {
4596                                title: "the-code-action".to_string(),
4597                                ..Default::default()
4598                            },
4599                        )])
4600                    });
4601
4602                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4603                        |params, _| {
4604                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4605                                params.position,
4606                                params.position,
4607                            )))
4608                        },
4609                    );
4610
4611                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4612                        let files = files.clone();
4613                        let rng = rng.clone();
4614                        move |_, _| {
4615                            let files = files.lock();
4616                            let mut rng = rng.lock();
4617                            let count = rng.gen_range::<usize, _>(1..3);
4618                            let files = (0..count)
4619                                .map(|_| files.choose(&mut *rng).unwrap())
4620                                .collect::<Vec<_>>();
4621                            log::info!("LSP: Returning definitions in files {:?}", &files);
4622                            Some(lsp::GotoDefinitionResponse::Array(
4623                                files
4624                                    .into_iter()
4625                                    .map(|file| lsp::Location {
4626                                        uri: lsp::Url::from_file_path(file).unwrap(),
4627                                        range: Default::default(),
4628                                    })
4629                                    .collect(),
4630                            ))
4631                        }
4632                    });
4633
4634                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4635                        let rng = rng.clone();
4636                        let project = project.clone();
4637                        move |params, mut cx| {
4638                            if let Some(project) = project.upgrade(&cx) {
4639                                project.update(&mut cx, |project, cx| {
4640                                    let path = params
4641                                        .text_document_position_params
4642                                        .text_document
4643                                        .uri
4644                                        .to_file_path()
4645                                        .unwrap();
4646                                    let (worktree, relative_path) =
4647                                        project.find_local_worktree(&path, cx)?;
4648                                    let project_path =
4649                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4650                                    let buffer =
4651                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4652
4653                                    let mut highlights = Vec::new();
4654                                    let highlight_count = rng.lock().gen_range(1..=5);
4655                                    let mut prev_end = 0;
4656                                    for _ in 0..highlight_count {
4657                                        let range =
4658                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4659                                        let start = buffer
4660                                            .offset_to_point_utf16(range.start)
4661                                            .to_lsp_position();
4662                                        let end = buffer
4663                                            .offset_to_point_utf16(range.end)
4664                                            .to_lsp_position();
4665                                        highlights.push(lsp::DocumentHighlight {
4666                                            range: lsp::Range::new(start, end),
4667                                            kind: Some(lsp::DocumentHighlightKind::READ),
4668                                        });
4669                                        prev_end = range.end;
4670                                    }
4671                                    Some(highlights)
4672                                })
4673                            } else {
4674                                None
4675                            }
4676                        }
4677                    });
4678                }
4679            });
4680
4681            project.update(&mut cx, |project, _| {
4682                project.languages().add(Arc::new(Language::new(
4683                    LanguageConfig {
4684                        name: "Rust".into(),
4685                        path_suffixes: vec!["rs".to_string()],
4686                        language_server: Some(language_server_config),
4687                        ..Default::default()
4688                    },
4689                    None,
4690                )));
4691            });
4692
4693            async move {
4694                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4695                while operations.get() < max_operations {
4696                    operations.set(operations.get() + 1);
4697
4698                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4699                    match distribution {
4700                        0..=20 if !files.lock().is_empty() => {
4701                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4702                            let mut path = path.as_path();
4703                            while let Some(parent_path) = path.parent() {
4704                                path = parent_path;
4705                                if rng.lock().gen() {
4706                                    break;
4707                                }
4708                            }
4709
4710                            log::info!("Host: find/create local worktree {:?}", path);
4711                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4712                                project.find_or_create_local_worktree(path, true, cx)
4713                            });
4714                            let find_or_create_worktree = async move {
4715                                find_or_create_worktree.await.unwrap();
4716                            };
4717                            if rng.lock().gen() {
4718                                cx.background().spawn(find_or_create_worktree).detach();
4719                            } else {
4720                                find_or_create_worktree.await;
4721                            }
4722                        }
4723                        10..=80 if !files.lock().is_empty() => {
4724                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4725                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4726                                let (worktree, path) = project
4727                                    .update(&mut cx, |project, cx| {
4728                                        project.find_or_create_local_worktree(
4729                                            file.clone(),
4730                                            true,
4731                                            cx,
4732                                        )
4733                                    })
4734                                    .await
4735                                    .unwrap();
4736                                let project_path =
4737                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4738                                log::info!(
4739                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4740                                    file,
4741                                    project_path.0,
4742                                    project_path.1
4743                                );
4744                                let buffer = project
4745                                    .update(&mut cx, |project, cx| {
4746                                        project.open_buffer(project_path, cx)
4747                                    })
4748                                    .await
4749                                    .unwrap();
4750                                self.buffers.insert(buffer.clone());
4751                                buffer
4752                            } else {
4753                                self.buffers
4754                                    .iter()
4755                                    .choose(&mut *rng.lock())
4756                                    .unwrap()
4757                                    .clone()
4758                            };
4759
4760                            if rng.lock().gen_bool(0.1) {
4761                                cx.update(|cx| {
4762                                    log::info!(
4763                                        "Host: dropping buffer {:?}",
4764                                        buffer.read(cx).file().unwrap().full_path(cx)
4765                                    );
4766                                    self.buffers.remove(&buffer);
4767                                    drop(buffer);
4768                                });
4769                            } else {
4770                                buffer.update(&mut cx, |buffer, cx| {
4771                                    log::info!(
4772                                        "Host: updating buffer {:?} ({})",
4773                                        buffer.file().unwrap().full_path(cx),
4774                                        buffer.remote_id()
4775                                    );
4776                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4777                                });
4778                            }
4779                        }
4780                        _ => loop {
4781                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4782                            let mut path = PathBuf::new();
4783                            path.push("/");
4784                            for _ in 0..path_component_count {
4785                                let letter = rng.lock().gen_range(b'a'..=b'z');
4786                                path.push(std::str::from_utf8(&[letter]).unwrap());
4787                            }
4788                            path.set_extension("rs");
4789                            let parent_path = path.parent().unwrap();
4790
4791                            log::info!("Host: creating file {:?}", path,);
4792
4793                            if fs.create_dir(&parent_path).await.is_ok()
4794                                && fs.create_file(&path, Default::default()).await.is_ok()
4795                            {
4796                                files.lock().push(path);
4797                                break;
4798                            } else {
4799                                log::info!("Host: cannot create file");
4800                            }
4801                        },
4802                    }
4803
4804                    cx.background().simulate_random_delay().await;
4805                }
4806
4807                log::info!("Host done");
4808
4809                self.project = Some(project);
4810                (self, cx)
4811            }
4812        }
4813
4814        pub async fn simulate_guest(
4815            mut self,
4816            guest_id: usize,
4817            project: ModelHandle<Project>,
4818            operations: Rc<Cell<usize>>,
4819            max_operations: usize,
4820            rng: Arc<Mutex<StdRng>>,
4821            mut cx: TestAppContext,
4822        ) -> (Self, TestAppContext) {
4823            while operations.get() < max_operations {
4824                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4825                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
4826                        project
4827                            .worktrees(&cx)
4828                            .filter(|worktree| {
4829                                let worktree = worktree.read(cx);
4830                                worktree.is_visible()
4831                                    && worktree.entries(false).any(|e| e.is_file())
4832                            })
4833                            .choose(&mut *rng.lock())
4834                    }) {
4835                        worktree
4836                    } else {
4837                        cx.background().simulate_random_delay().await;
4838                        continue;
4839                    };
4840
4841                    operations.set(operations.get() + 1);
4842                    let (worktree_root_name, project_path) =
4843                        worktree.read_with(&cx, |worktree, _| {
4844                            let entry = worktree
4845                                .entries(false)
4846                                .filter(|e| e.is_file())
4847                                .choose(&mut *rng.lock())
4848                                .unwrap();
4849                            (
4850                                worktree.root_name().to_string(),
4851                                (worktree.id(), entry.path.clone()),
4852                            )
4853                        });
4854                    log::info!(
4855                        "Guest {}: opening path {:?} in worktree {} ({})",
4856                        guest_id,
4857                        project_path.1,
4858                        project_path.0,
4859                        worktree_root_name,
4860                    );
4861                    let buffer = project
4862                        .update(&mut cx, |project, cx| {
4863                            project.open_buffer(project_path.clone(), cx)
4864                        })
4865                        .await
4866                        .unwrap();
4867                    log::info!(
4868                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
4869                        guest_id,
4870                        project_path.1,
4871                        project_path.0,
4872                        worktree_root_name,
4873                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
4874                    );
4875                    self.buffers.insert(buffer.clone());
4876                    buffer
4877                } else {
4878                    operations.set(operations.get() + 1);
4879
4880                    self.buffers
4881                        .iter()
4882                        .choose(&mut *rng.lock())
4883                        .unwrap()
4884                        .clone()
4885                };
4886
4887                let choice = rng.lock().gen_range(0..100);
4888                match choice {
4889                    0..=9 => {
4890                        cx.update(|cx| {
4891                            log::info!(
4892                                "Guest {}: dropping buffer {:?}",
4893                                guest_id,
4894                                buffer.read(cx).file().unwrap().full_path(cx)
4895                            );
4896                            self.buffers.remove(&buffer);
4897                            drop(buffer);
4898                        });
4899                    }
4900                    10..=19 => {
4901                        let completions = project.update(&mut cx, |project, cx| {
4902                            log::info!(
4903                                "Guest {}: requesting completions for buffer {} ({:?})",
4904                                guest_id,
4905                                buffer.read(cx).remote_id(),
4906                                buffer.read(cx).file().unwrap().full_path(cx)
4907                            );
4908                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4909                            project.completions(&buffer, offset, cx)
4910                        });
4911                        let completions = cx.background().spawn(async move {
4912                            completions.await.expect("completions request failed");
4913                        });
4914                        if rng.lock().gen_bool(0.3) {
4915                            log::info!("Guest {}: detaching completions request", guest_id);
4916                            completions.detach();
4917                        } else {
4918                            completions.await;
4919                        }
4920                    }
4921                    20..=29 => {
4922                        let code_actions = project.update(&mut cx, |project, cx| {
4923                            log::info!(
4924                                "Guest {}: requesting code actions for buffer {} ({:?})",
4925                                guest_id,
4926                                buffer.read(cx).remote_id(),
4927                                buffer.read(cx).file().unwrap().full_path(cx)
4928                            );
4929                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
4930                            project.code_actions(&buffer, range, cx)
4931                        });
4932                        let code_actions = cx.background().spawn(async move {
4933                            code_actions.await.expect("code actions request failed");
4934                        });
4935                        if rng.lock().gen_bool(0.3) {
4936                            log::info!("Guest {}: detaching code actions request", guest_id);
4937                            code_actions.detach();
4938                        } else {
4939                            code_actions.await;
4940                        }
4941                    }
4942                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
4943                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
4944                            log::info!(
4945                                "Guest {}: saving buffer {} ({:?})",
4946                                guest_id,
4947                                buffer.remote_id(),
4948                                buffer.file().unwrap().full_path(cx)
4949                            );
4950                            (buffer.version(), buffer.save(cx))
4951                        });
4952                        let save = cx.background().spawn(async move {
4953                            let (saved_version, _) = save.await.expect("save request failed");
4954                            assert!(saved_version.observed_all(&requested_version));
4955                        });
4956                        if rng.lock().gen_bool(0.3) {
4957                            log::info!("Guest {}: detaching save request", guest_id);
4958                            save.detach();
4959                        } else {
4960                            save.await;
4961                        }
4962                    }
4963                    40..=44 => {
4964                        let prepare_rename = project.update(&mut cx, |project, cx| {
4965                            log::info!(
4966                                "Guest {}: preparing rename for buffer {} ({:?})",
4967                                guest_id,
4968                                buffer.read(cx).remote_id(),
4969                                buffer.read(cx).file().unwrap().full_path(cx)
4970                            );
4971                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4972                            project.prepare_rename(buffer, offset, cx)
4973                        });
4974                        let prepare_rename = cx.background().spawn(async move {
4975                            prepare_rename.await.expect("prepare rename request failed");
4976                        });
4977                        if rng.lock().gen_bool(0.3) {
4978                            log::info!("Guest {}: detaching prepare rename request", guest_id);
4979                            prepare_rename.detach();
4980                        } else {
4981                            prepare_rename.await;
4982                        }
4983                    }
4984                    45..=49 => {
4985                        let definitions = project.update(&mut cx, |project, cx| {
4986                            log::info!(
4987                                "Guest {}: requesting definitions for buffer {} ({:?})",
4988                                guest_id,
4989                                buffer.read(cx).remote_id(),
4990                                buffer.read(cx).file().unwrap().full_path(cx)
4991                            );
4992                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
4993                            project.definition(&buffer, offset, cx)
4994                        });
4995                        let definitions = cx.background().spawn(async move {
4996                            definitions.await.expect("definitions request failed")
4997                        });
4998                        if rng.lock().gen_bool(0.3) {
4999                            log::info!("Guest {}: detaching definitions request", guest_id);
5000                            definitions.detach();
5001                        } else {
5002                            self.buffers
5003                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5004                        }
5005                    }
5006                    50..=54 => {
5007                        let highlights = project.update(&mut cx, |project, cx| {
5008                            log::info!(
5009                                "Guest {}: requesting highlights for buffer {} ({:?})",
5010                                guest_id,
5011                                buffer.read(cx).remote_id(),
5012                                buffer.read(cx).file().unwrap().full_path(cx)
5013                            );
5014                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5015                            project.document_highlights(&buffer, offset, cx)
5016                        });
5017                        let highlights = cx.background().spawn(async move {
5018                            highlights.await.expect("highlights request failed");
5019                        });
5020                        if rng.lock().gen_bool(0.3) {
5021                            log::info!("Guest {}: detaching highlights request", guest_id);
5022                            highlights.detach();
5023                        } else {
5024                            highlights.await;
5025                        }
5026                    }
5027                    55..=59 => {
5028                        let search = project.update(&mut cx, |project, cx| {
5029                            let query = rng.lock().gen_range('a'..='z');
5030                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5031                            project.search(SearchQuery::text(query, false, false), cx)
5032                        });
5033                        let search = cx
5034                            .background()
5035                            .spawn(async move { search.await.expect("search request failed") });
5036                        if rng.lock().gen_bool(0.3) {
5037                            log::info!("Guest {}: detaching search request", guest_id);
5038                            search.detach();
5039                        } else {
5040                            self.buffers.extend(search.await.into_keys());
5041                        }
5042                    }
5043                    _ => {
5044                        buffer.update(&mut cx, |buffer, cx| {
5045                            log::info!(
5046                                "Guest {}: updating buffer {} ({:?})",
5047                                guest_id,
5048                                buffer.remote_id(),
5049                                buffer.file().unwrap().full_path(cx)
5050                            );
5051                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5052                        });
5053                    }
5054                }
5055                cx.background().simulate_random_delay().await;
5056            }
5057
5058            log::info!("Guest {} done", guest_id);
5059
5060            self.project = Some(project);
5061            (self, cx)
5062        }
5063    }
5064
5065    impl Drop for TestClient {
5066        fn drop(&mut self) {
5067            self.client.tear_down();
5068        }
5069    }
5070
5071    impl Executor for Arc<gpui::executor::Background> {
5072        type Timer = gpui::executor::Timer;
5073
5074        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5075            self.spawn(future).detach();
5076        }
5077
5078        fn timer(&self, duration: Duration) -> Self::Timer {
5079            self.as_ref().timer(duration)
5080        }
5081    }
5082
5083    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5084        channel
5085            .messages()
5086            .cursor::<()>()
5087            .map(|m| {
5088                (
5089                    m.sender.github_login.clone(),
5090                    m.body.clone(),
5091                    m.is_pending(),
5092                )
5093            })
5094            .collect()
5095    }
5096
5097    struct EmptyView;
5098
5099    impl gpui::Entity for EmptyView {
5100        type Event = ();
5101    }
5102
5103    impl gpui::View for EmptyView {
5104        fn ui_name() -> &'static str {
5105            "empty view"
5106        }
5107
5108        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5109            gpui::Element::boxed(gpui::elements::Empty)
5110        }
5111    }
5112}