rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::start_language_server)
  87            .add_message_handler(Server::update_language_server)
  88            .add_message_handler(Server::update_diagnostic_summary)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
  96            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  97            .add_request_handler(
  98                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
  99            )
 100            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 101            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 102            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 103            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 105            .add_request_handler(Server::update_buffer)
 106            .add_message_handler(Server::update_buffer_file)
 107            .add_message_handler(Server::buffer_reloaded)
 108            .add_message_handler(Server::buffer_saved)
 109            .add_request_handler(Server::save_buffer)
 110            .add_request_handler(Server::get_channels)
 111            .add_request_handler(Server::get_users)
 112            .add_request_handler(Server::join_channel)
 113            .add_message_handler(Server::leave_channel)
 114            .add_request_handler(Server::send_channel_message)
 115            .add_request_handler(Server::follow)
 116            .add_request_handler(Server::get_channel_messages);
 117
 118        Arc::new(server)
 119    }
 120
 121    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 122    where
 123        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 124        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 125        M: EnvelopedMessage,
 126    {
 127        let prev_handler = self.handlers.insert(
 128            TypeId::of::<M>(),
 129            Box::new(move |server, envelope| {
 130                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 131                (handler)(server, *envelope).boxed()
 132            }),
 133        );
 134        if prev_handler.is_some() {
 135            panic!("registered a handler for the same message twice");
 136        }
 137        self
 138    }
 139
 140    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 141    where
 142        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 143        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 144        M: RequestMessage,
 145    {
 146        self.add_message_handler(move |server, envelope| {
 147            let receipt = envelope.receipt();
 148            let response = (handler)(server.clone(), envelope);
 149            async move {
 150                match response.await {
 151                    Ok(response) => {
 152                        server.peer.respond(receipt, response)?;
 153                        Ok(())
 154                    }
 155                    Err(error) => {
 156                        server.peer.respond_with_error(
 157                            receipt,
 158                            proto::Error {
 159                                message: error.to_string(),
 160                            },
 161                        )?;
 162                        Err(error)
 163                    }
 164                }
 165            }
 166        })
 167    }
 168
 169    pub fn handle_connection<E: Executor>(
 170        self: &Arc<Self>,
 171        connection: Connection,
 172        addr: String,
 173        user_id: UserId,
 174        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 175        executor: E,
 176    ) -> impl Future<Output = ()> {
 177        let mut this = self.clone();
 178        async move {
 179            let (connection_id, handle_io, mut incoming_rx) = this
 180                .peer
 181                .add_connection(connection, {
 182                    let executor = executor.clone();
 183                    move |duration| {
 184                        let timer = executor.timer(duration);
 185                        async move {
 186                            timer.await;
 187                        }
 188                    }
 189                })
 190                .await;
 191
 192            if let Some(send_connection_id) = send_connection_id.as_mut() {
 193                let _ = send_connection_id.send(connection_id).await;
 194            }
 195
 196            this.state_mut().add_connection(connection_id, user_id);
 197            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 198                log::error!("error updating contacts for {:?}: {}", user_id, err);
 199            }
 200
 201            let handle_io = handle_io.fuse();
 202            futures::pin_mut!(handle_io);
 203            loop {
 204                let next_message = incoming_rx.next().fuse();
 205                futures::pin_mut!(next_message);
 206                futures::select_biased! {
 207                    result = handle_io => {
 208                        if let Err(err) = result {
 209                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 210                        }
 211                        break;
 212                    }
 213                    message = next_message => {
 214                        if let Some(message) = message {
 215                            let start_time = Instant::now();
 216                            let type_name = message.payload_type_name();
 217                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 218                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 219                                let notifications = this.notifications.clone();
 220                                let is_background = message.is_background();
 221                                let handle_message = (handler)(this.clone(), message);
 222                                let handle_message = async move {
 223                                    if let Err(err) = handle_message.await {
 224                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 225                                    } else {
 226                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 227                                    }
 228                                    if let Some(mut notifications) = notifications {
 229                                        let _ = notifications.send(()).await;
 230                                    }
 231                                };
 232                                if is_background {
 233                                    executor.spawn_detached(handle_message);
 234                                } else {
 235                                    handle_message.await;
 236                                }
 237                            } else {
 238                                log::warn!("unhandled message: {}", type_name);
 239                            }
 240                        } else {
 241                            log::info!("rpc connection closed {:?}", addr);
 242                            break;
 243                        }
 244                    }
 245                }
 246            }
 247
 248            if let Err(err) = this.sign_out(connection_id).await {
 249                log::error!("error signing out connection {:?} - {:?}", addr, err);
 250            }
 251        }
 252    }
 253
 254    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 255        self.peer.disconnect(connection_id);
 256        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 257
 258        for (project_id, project) in removed_connection.hosted_projects {
 259            if let Some(share) = project.share {
 260                broadcast(
 261                    connection_id,
 262                    share.guests.keys().copied().collect(),
 263                    |conn_id| {
 264                        self.peer
 265                            .send(conn_id, proto::UnshareProject { project_id })
 266                    },
 267                )?;
 268            }
 269        }
 270
 271        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 272            broadcast(connection_id, peer_ids, |conn_id| {
 273                self.peer.send(
 274                    conn_id,
 275                    proto::RemoveProjectCollaborator {
 276                        project_id,
 277                        peer_id: connection_id.0,
 278                    },
 279                )
 280            })?;
 281        }
 282
 283        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 284        Ok(())
 285    }
 286
 287    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 288        Ok(proto::Ack {})
 289    }
 290
 291    async fn register_project(
 292        mut self: Arc<Server>,
 293        request: TypedEnvelope<proto::RegisterProject>,
 294    ) -> tide::Result<proto::RegisterProjectResponse> {
 295        let project_id = {
 296            let mut state = self.state_mut();
 297            let user_id = state.user_id_for_connection(request.sender_id)?;
 298            state.register_project(request.sender_id, user_id)
 299        };
 300        Ok(proto::RegisterProjectResponse { project_id })
 301    }
 302
 303    async fn unregister_project(
 304        mut self: Arc<Server>,
 305        request: TypedEnvelope<proto::UnregisterProject>,
 306    ) -> tide::Result<()> {
 307        let project = self
 308            .state_mut()
 309            .unregister_project(request.payload.project_id, request.sender_id)?;
 310        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 311        Ok(())
 312    }
 313
 314    async fn share_project(
 315        mut self: Arc<Server>,
 316        request: TypedEnvelope<proto::ShareProject>,
 317    ) -> tide::Result<proto::Ack> {
 318        self.state_mut()
 319            .share_project(request.payload.project_id, request.sender_id);
 320        Ok(proto::Ack {})
 321    }
 322
 323    async fn unshare_project(
 324        mut self: Arc<Server>,
 325        request: TypedEnvelope<proto::UnshareProject>,
 326    ) -> tide::Result<()> {
 327        let project_id = request.payload.project_id;
 328        let project = self
 329            .state_mut()
 330            .unshare_project(project_id, request.sender_id)?;
 331
 332        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 333            self.peer
 334                .send(conn_id, proto::UnshareProject { project_id })
 335        })?;
 336        self.update_contacts_for_users(&project.authorized_user_ids)?;
 337        Ok(())
 338    }
 339
 340    async fn join_project(
 341        mut self: Arc<Server>,
 342        request: TypedEnvelope<proto::JoinProject>,
 343    ) -> tide::Result<proto::JoinProjectResponse> {
 344        let project_id = request.payload.project_id;
 345
 346        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 347        let (response, connection_ids, contact_user_ids) = self
 348            .state_mut()
 349            .join_project(request.sender_id, user_id, project_id)
 350            .and_then(|joined| {
 351                let share = joined.project.share()?;
 352                let peer_count = share.guests.len();
 353                let mut collaborators = Vec::with_capacity(peer_count);
 354                collaborators.push(proto::Collaborator {
 355                    peer_id: joined.project.host_connection_id.0,
 356                    replica_id: 0,
 357                    user_id: joined.project.host_user_id.to_proto(),
 358                });
 359                let worktrees = share
 360                    .worktrees
 361                    .iter()
 362                    .filter_map(|(id, shared_worktree)| {
 363                        let worktree = joined.project.worktrees.get(&id)?;
 364                        Some(proto::Worktree {
 365                            id: *id,
 366                            root_name: worktree.root_name.clone(),
 367                            entries: shared_worktree.entries.values().cloned().collect(),
 368                            diagnostic_summaries: shared_worktree
 369                                .diagnostic_summaries
 370                                .values()
 371                                .cloned()
 372                                .collect(),
 373                            visible: worktree.visible,
 374                        })
 375                    })
 376                    .collect();
 377                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 378                    if *peer_conn_id != request.sender_id {
 379                        collaborators.push(proto::Collaborator {
 380                            peer_id: peer_conn_id.0,
 381                            replica_id: *peer_replica_id as u32,
 382                            user_id: peer_user_id.to_proto(),
 383                        });
 384                    }
 385                }
 386                let response = proto::JoinProjectResponse {
 387                    worktrees,
 388                    replica_id: joined.replica_id as u32,
 389                    collaborators,
 390                    language_servers: joined.project.language_servers.clone(),
 391                };
 392                let connection_ids = joined.project.connection_ids();
 393                let contact_user_ids = joined.project.authorized_user_ids();
 394                Ok((response, connection_ids, contact_user_ids))
 395            })?;
 396
 397        broadcast(request.sender_id, connection_ids, |conn_id| {
 398            self.peer.send(
 399                conn_id,
 400                proto::AddProjectCollaborator {
 401                    project_id,
 402                    collaborator: Some(proto::Collaborator {
 403                        peer_id: request.sender_id.0,
 404                        replica_id: response.replica_id,
 405                        user_id: user_id.to_proto(),
 406                    }),
 407                },
 408            )
 409        })?;
 410        self.update_contacts_for_users(&contact_user_ids)?;
 411        Ok(response)
 412    }
 413
 414    async fn leave_project(
 415        mut self: Arc<Server>,
 416        request: TypedEnvelope<proto::LeaveProject>,
 417    ) -> tide::Result<()> {
 418        let sender_id = request.sender_id;
 419        let project_id = request.payload.project_id;
 420        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 421
 422        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 423            self.peer.send(
 424                conn_id,
 425                proto::RemoveProjectCollaborator {
 426                    project_id,
 427                    peer_id: sender_id.0,
 428                },
 429            )
 430        })?;
 431        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 432
 433        Ok(())
 434    }
 435
 436    async fn register_worktree(
 437        mut self: Arc<Server>,
 438        request: TypedEnvelope<proto::RegisterWorktree>,
 439    ) -> tide::Result<proto::Ack> {
 440        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 441
 442        let mut contact_user_ids = HashSet::default();
 443        contact_user_ids.insert(host_user_id);
 444        for github_login in &request.payload.authorized_logins {
 445            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 446            contact_user_ids.insert(contact_user_id);
 447        }
 448
 449        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 450        let guest_connection_ids;
 451        {
 452            let mut state = self.state_mut();
 453            guest_connection_ids = state
 454                .read_project(request.payload.project_id, request.sender_id)?
 455                .guest_connection_ids();
 456            state.register_worktree(
 457                request.payload.project_id,
 458                request.payload.worktree_id,
 459                request.sender_id,
 460                Worktree {
 461                    authorized_user_ids: contact_user_ids.clone(),
 462                    root_name: request.payload.root_name.clone(),
 463                    visible: request.payload.visible,
 464                },
 465            )?;
 466        }
 467        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 468            self.peer
 469                .forward_send(request.sender_id, connection_id, request.payload.clone())
 470        })?;
 471        self.update_contacts_for_users(&contact_user_ids)?;
 472        Ok(proto::Ack {})
 473    }
 474
 475    async fn unregister_worktree(
 476        mut self: Arc<Server>,
 477        request: TypedEnvelope<proto::UnregisterWorktree>,
 478    ) -> tide::Result<()> {
 479        let project_id = request.payload.project_id;
 480        let worktree_id = request.payload.worktree_id;
 481        let (worktree, guest_connection_ids) =
 482            self.state_mut()
 483                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 484        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 485            self.peer.send(
 486                conn_id,
 487                proto::UnregisterWorktree {
 488                    project_id,
 489                    worktree_id,
 490                },
 491            )
 492        })?;
 493        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 494        Ok(())
 495    }
 496
 497    async fn update_worktree(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateWorktree>,
 500    ) -> tide::Result<proto::Ack> {
 501        let connection_ids = self.state_mut().update_worktree(
 502            request.sender_id,
 503            request.payload.project_id,
 504            request.payload.worktree_id,
 505            &request.payload.removed_entries,
 506            &request.payload.updated_entries,
 507        )?;
 508
 509        broadcast(request.sender_id, connection_ids, |connection_id| {
 510            self.peer
 511                .forward_send(request.sender_id, connection_id, request.payload.clone())
 512        })?;
 513
 514        Ok(proto::Ack {})
 515    }
 516
 517    async fn update_diagnostic_summary(
 518        mut self: Arc<Server>,
 519        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 520    ) -> tide::Result<()> {
 521        let summary = request
 522            .payload
 523            .summary
 524            .clone()
 525            .ok_or_else(|| anyhow!("invalid summary"))?;
 526        let receiver_ids = self.state_mut().update_diagnostic_summary(
 527            request.payload.project_id,
 528            request.payload.worktree_id,
 529            request.sender_id,
 530            summary,
 531        )?;
 532
 533        broadcast(request.sender_id, receiver_ids, |connection_id| {
 534            self.peer
 535                .forward_send(request.sender_id, connection_id, request.payload.clone())
 536        })?;
 537        Ok(())
 538    }
 539
 540    async fn start_language_server(
 541        mut self: Arc<Server>,
 542        request: TypedEnvelope<proto::StartLanguageServer>,
 543    ) -> tide::Result<()> {
 544        let receiver_ids = self.state_mut().start_language_server(
 545            request.payload.project_id,
 546            request.sender_id,
 547            request
 548                .payload
 549                .server
 550                .clone()
 551                .ok_or_else(|| anyhow!("invalid language server"))?,
 552        )?;
 553        broadcast(request.sender_id, receiver_ids, |connection_id| {
 554            self.peer
 555                .forward_send(request.sender_id, connection_id, request.payload.clone())
 556        })?;
 557        Ok(())
 558    }
 559
 560    async fn update_language_server(
 561        self: Arc<Server>,
 562        request: TypedEnvelope<proto::UpdateLanguageServer>,
 563    ) -> tide::Result<()> {
 564        let receiver_ids = self
 565            .state()
 566            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 567        broadcast(request.sender_id, receiver_ids, |connection_id| {
 568            self.peer
 569                .forward_send(request.sender_id, connection_id, request.payload.clone())
 570        })?;
 571        Ok(())
 572    }
 573
 574    async fn forward_project_request<T>(
 575        self: Arc<Server>,
 576        request: TypedEnvelope<T>,
 577    ) -> tide::Result<T::Response>
 578    where
 579        T: EntityMessage + RequestMessage,
 580    {
 581        let host_connection_id = self
 582            .state()
 583            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 584            .host_connection_id;
 585        Ok(self
 586            .peer
 587            .forward_request(request.sender_id, host_connection_id, request.payload)
 588            .await?)
 589    }
 590
 591    async fn save_buffer(
 592        self: Arc<Server>,
 593        request: TypedEnvelope<proto::SaveBuffer>,
 594    ) -> tide::Result<proto::BufferSaved> {
 595        let host;
 596        let mut guests;
 597        {
 598            let state = self.state();
 599            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 600            host = project.host_connection_id;
 601            guests = project.guest_connection_ids()
 602        }
 603
 604        let response = self
 605            .peer
 606            .forward_request(request.sender_id, host, request.payload.clone())
 607            .await?;
 608
 609        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 610        broadcast(host, guests, |conn_id| {
 611            self.peer.forward_send(host, conn_id, response.clone())
 612        })?;
 613
 614        Ok(response)
 615    }
 616
 617    async fn update_buffer(
 618        self: Arc<Server>,
 619        request: TypedEnvelope<proto::UpdateBuffer>,
 620    ) -> tide::Result<proto::Ack> {
 621        let receiver_ids = self
 622            .state()
 623            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 624        broadcast(request.sender_id, receiver_ids, |connection_id| {
 625            self.peer
 626                .forward_send(request.sender_id, connection_id, request.payload.clone())
 627        })?;
 628        Ok(proto::Ack {})
 629    }
 630
 631    async fn update_buffer_file(
 632        self: Arc<Server>,
 633        request: TypedEnvelope<proto::UpdateBufferFile>,
 634    ) -> tide::Result<()> {
 635        let receiver_ids = self
 636            .state()
 637            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 638        broadcast(request.sender_id, receiver_ids, |connection_id| {
 639            self.peer
 640                .forward_send(request.sender_id, connection_id, request.payload.clone())
 641        })?;
 642        Ok(())
 643    }
 644
 645    async fn buffer_reloaded(
 646        self: Arc<Server>,
 647        request: TypedEnvelope<proto::BufferReloaded>,
 648    ) -> tide::Result<()> {
 649        let receiver_ids = self
 650            .state()
 651            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 652        broadcast(request.sender_id, receiver_ids, |connection_id| {
 653            self.peer
 654                .forward_send(request.sender_id, connection_id, request.payload.clone())
 655        })?;
 656        Ok(())
 657    }
 658
 659    async fn buffer_saved(
 660        self: Arc<Server>,
 661        request: TypedEnvelope<proto::BufferSaved>,
 662    ) -> tide::Result<()> {
 663        let receiver_ids = self
 664            .state()
 665            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 666        broadcast(request.sender_id, receiver_ids, |connection_id| {
 667            self.peer
 668                .forward_send(request.sender_id, connection_id, request.payload.clone())
 669        })?;
 670        Ok(())
 671    }
 672
 673    async fn follow(
 674        self: Arc<Self>,
 675        request: TypedEnvelope<proto::Follow>,
 676    ) -> tide::Result<proto::FollowResponse> {
 677        let leader_id = ConnectionId(request.payload.leader_id);
 678        if !self
 679            .state()
 680            .project_connection_ids(request.payload.project_id, request.sender_id)?
 681            .contains(&leader_id)
 682        {
 683            Err(anyhow!("no such peer"))?;
 684        }
 685        let response = self
 686            .peer
 687            .forward_request(request.sender_id, leader_id, request.payload)
 688            .await?;
 689        Ok(response)
 690    }
 691
 692    async fn get_channels(
 693        self: Arc<Server>,
 694        request: TypedEnvelope<proto::GetChannels>,
 695    ) -> tide::Result<proto::GetChannelsResponse> {
 696        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 697        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 698        Ok(proto::GetChannelsResponse {
 699            channels: channels
 700                .into_iter()
 701                .map(|chan| proto::Channel {
 702                    id: chan.id.to_proto(),
 703                    name: chan.name,
 704                })
 705                .collect(),
 706        })
 707    }
 708
 709    async fn get_users(
 710        self: Arc<Server>,
 711        request: TypedEnvelope<proto::GetUsers>,
 712    ) -> tide::Result<proto::GetUsersResponse> {
 713        let user_ids = request
 714            .payload
 715            .user_ids
 716            .into_iter()
 717            .map(UserId::from_proto)
 718            .collect();
 719        let users = self
 720            .app_state
 721            .db
 722            .get_users_by_ids(user_ids)
 723            .await?
 724            .into_iter()
 725            .map(|user| proto::User {
 726                id: user.id.to_proto(),
 727                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 728                github_login: user.github_login,
 729            })
 730            .collect();
 731        Ok(proto::GetUsersResponse { users })
 732    }
 733
 734    fn update_contacts_for_users<'a>(
 735        self: &Arc<Server>,
 736        user_ids: impl IntoIterator<Item = &'a UserId>,
 737    ) -> anyhow::Result<()> {
 738        let mut result = Ok(());
 739        let state = self.state();
 740        for user_id in user_ids {
 741            let contacts = state.contacts_for_user(*user_id);
 742            for connection_id in state.connection_ids_for_user(*user_id) {
 743                if let Err(error) = self.peer.send(
 744                    connection_id,
 745                    proto::UpdateContacts {
 746                        contacts: contacts.clone(),
 747                    },
 748                ) {
 749                    result = Err(error);
 750                }
 751            }
 752        }
 753        result
 754    }
 755
 756    async fn join_channel(
 757        mut self: Arc<Self>,
 758        request: TypedEnvelope<proto::JoinChannel>,
 759    ) -> tide::Result<proto::JoinChannelResponse> {
 760        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 761        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 762        if !self
 763            .app_state
 764            .db
 765            .can_user_access_channel(user_id, channel_id)
 766            .await?
 767        {
 768            Err(anyhow!("access denied"))?;
 769        }
 770
 771        self.state_mut().join_channel(request.sender_id, channel_id);
 772        let messages = self
 773            .app_state
 774            .db
 775            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 776            .await?
 777            .into_iter()
 778            .map(|msg| proto::ChannelMessage {
 779                id: msg.id.to_proto(),
 780                body: msg.body,
 781                timestamp: msg.sent_at.unix_timestamp() as u64,
 782                sender_id: msg.sender_id.to_proto(),
 783                nonce: Some(msg.nonce.as_u128().into()),
 784            })
 785            .collect::<Vec<_>>();
 786        Ok(proto::JoinChannelResponse {
 787            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 788            messages,
 789        })
 790    }
 791
 792    async fn leave_channel(
 793        mut self: Arc<Self>,
 794        request: TypedEnvelope<proto::LeaveChannel>,
 795    ) -> tide::Result<()> {
 796        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 797        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 798        if !self
 799            .app_state
 800            .db
 801            .can_user_access_channel(user_id, channel_id)
 802            .await?
 803        {
 804            Err(anyhow!("access denied"))?;
 805        }
 806
 807        self.state_mut()
 808            .leave_channel(request.sender_id, channel_id);
 809
 810        Ok(())
 811    }
 812
 813    async fn send_channel_message(
 814        self: Arc<Self>,
 815        request: TypedEnvelope<proto::SendChannelMessage>,
 816    ) -> tide::Result<proto::SendChannelMessageResponse> {
 817        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 818        let user_id;
 819        let connection_ids;
 820        {
 821            let state = self.state();
 822            user_id = state.user_id_for_connection(request.sender_id)?;
 823            connection_ids = state.channel_connection_ids(channel_id)?;
 824        }
 825
 826        // Validate the message body.
 827        let body = request.payload.body.trim().to_string();
 828        if body.len() > MAX_MESSAGE_LEN {
 829            return Err(anyhow!("message is too long"))?;
 830        }
 831        if body.is_empty() {
 832            return Err(anyhow!("message can't be blank"))?;
 833        }
 834
 835        let timestamp = OffsetDateTime::now_utc();
 836        let nonce = request
 837            .payload
 838            .nonce
 839            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 840
 841        let message_id = self
 842            .app_state
 843            .db
 844            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 845            .await?
 846            .to_proto();
 847        let message = proto::ChannelMessage {
 848            sender_id: user_id.to_proto(),
 849            id: message_id,
 850            body,
 851            timestamp: timestamp.unix_timestamp() as u64,
 852            nonce: Some(nonce),
 853        };
 854        broadcast(request.sender_id, connection_ids, |conn_id| {
 855            self.peer.send(
 856                conn_id,
 857                proto::ChannelMessageSent {
 858                    channel_id: channel_id.to_proto(),
 859                    message: Some(message.clone()),
 860                },
 861            )
 862        })?;
 863        Ok(proto::SendChannelMessageResponse {
 864            message: Some(message),
 865        })
 866    }
 867
 868    async fn get_channel_messages(
 869        self: Arc<Self>,
 870        request: TypedEnvelope<proto::GetChannelMessages>,
 871    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 872        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 873        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 874        if !self
 875            .app_state
 876            .db
 877            .can_user_access_channel(user_id, channel_id)
 878            .await?
 879        {
 880            Err(anyhow!("access denied"))?;
 881        }
 882
 883        let messages = self
 884            .app_state
 885            .db
 886            .get_channel_messages(
 887                channel_id,
 888                MESSAGE_COUNT_PER_PAGE,
 889                Some(MessageId::from_proto(request.payload.before_message_id)),
 890            )
 891            .await?
 892            .into_iter()
 893            .map(|msg| proto::ChannelMessage {
 894                id: msg.id.to_proto(),
 895                body: msg.body,
 896                timestamp: msg.sent_at.unix_timestamp() as u64,
 897                sender_id: msg.sender_id.to_proto(),
 898                nonce: Some(msg.nonce.as_u128().into()),
 899            })
 900            .collect::<Vec<_>>();
 901
 902        Ok(proto::GetChannelMessagesResponse {
 903            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 904            messages,
 905        })
 906    }
 907
 908    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 909        self.store.read()
 910    }
 911
 912    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 913        self.store.write()
 914    }
 915}
 916
 917impl Executor for RealExecutor {
 918    type Timer = Timer;
 919
 920    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 921        task::spawn(future);
 922    }
 923
 924    fn timer(&self, duration: Duration) -> Self::Timer {
 925        Timer::after(duration)
 926    }
 927}
 928
 929fn broadcast<F>(
 930    sender_id: ConnectionId,
 931    receiver_ids: Vec<ConnectionId>,
 932    mut f: F,
 933) -> anyhow::Result<()>
 934where
 935    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 936{
 937    let mut result = Ok(());
 938    for receiver_id in receiver_ids {
 939        if receiver_id != sender_id {
 940            if let Err(error) = f(receiver_id) {
 941                if result.is_ok() {
 942                    result = Err(error);
 943                }
 944            }
 945        }
 946    }
 947    result
 948}
 949
 950pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 951    let server = Server::new(app.state().clone(), rpc.clone(), None);
 952    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 953        let server = server.clone();
 954        async move {
 955            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 956
 957            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 958            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 959            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 960            let client_protocol_version: Option<u32> = request
 961                .header("X-Zed-Protocol-Version")
 962                .and_then(|v| v.as_str().parse().ok());
 963
 964            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 965                return Ok(Response::new(StatusCode::UpgradeRequired));
 966            }
 967
 968            let header = match request.header("Sec-Websocket-Key") {
 969                Some(h) => h.as_str(),
 970                None => return Err(anyhow!("expected sec-websocket-key"))?,
 971            };
 972
 973            let user_id = process_auth_header(&request).await?;
 974
 975            let mut response = Response::new(StatusCode::SwitchingProtocols);
 976            response.insert_header(UPGRADE, "websocket");
 977            response.insert_header(CONNECTION, "Upgrade");
 978            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 979            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 980            response.insert_header("Sec-Websocket-Version", "13");
 981
 982            let http_res: &mut tide::http::Response = response.as_mut();
 983            let upgrade_receiver = http_res.recv_upgrade().await;
 984            let addr = request.remote().unwrap_or("unknown").to_string();
 985            task::spawn(async move {
 986                if let Some(stream) = upgrade_receiver.await {
 987                    server
 988                        .handle_connection(
 989                            Connection::new(
 990                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 991                            ),
 992                            addr,
 993                            user_id,
 994                            None,
 995                            RealExecutor,
 996                        )
 997                        .await;
 998                }
 999            });
1000
1001            Ok(response)
1002        }
1003    });
1004}
1005
1006fn header_contains_ignore_case<T>(
1007    request: &tide::Request<T>,
1008    header_name: HeaderName,
1009    value: &str,
1010) -> bool {
1011    request
1012        .header(header_name)
1013        .map(|h| {
1014            h.as_str()
1015                .split(',')
1016                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1017        })
1018        .unwrap_or(false)
1019}
1020
1021#[cfg(test)]
1022mod tests {
1023    use super::*;
1024    use crate::{
1025        auth,
1026        db::{tests::TestDb, UserId},
1027        github, AppState, Config,
1028    };
1029    use ::rpc::Peer;
1030    use client::{
1031        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1032        EstablishConnectionError, UserStore,
1033    };
1034    use collections::BTreeMap;
1035    use editor::{
1036        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1037        ToOffset, ToggleCodeActions, Undo,
1038    };
1039    use gpui::{executor, ModelHandle, TestAppContext, ViewHandle};
1040    use language::{
1041        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1042        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1043    };
1044    use lsp;
1045    use parking_lot::Mutex;
1046    use postage::barrier;
1047    use project::{
1048        fs::{FakeFs, Fs as _},
1049        search::SearchQuery,
1050        worktree::WorktreeHandle,
1051        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1052    };
1053    use rand::prelude::*;
1054    use rpc::PeerId;
1055    use serde_json::json;
1056    use sqlx::types::time::OffsetDateTime;
1057    use std::{
1058        cell::Cell,
1059        env,
1060        ops::Deref,
1061        path::{Path, PathBuf},
1062        rc::Rc,
1063        sync::{
1064            atomic::{AtomicBool, Ordering::SeqCst},
1065            Arc,
1066        },
1067        time::Duration,
1068    };
1069    use workspace::{Settings, SplitDirection, Workspace, WorkspaceParams};
1070
1071    #[cfg(test)]
1072    #[ctor::ctor]
1073    fn init_logger() {
1074        if std::env::var("RUST_LOG").is_ok() {
1075            env_logger::init();
1076        }
1077    }
1078
1079    #[gpui::test(iterations = 10)]
1080    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1081        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1082        let lang_registry = Arc::new(LanguageRegistry::test());
1083        let fs = FakeFs::new(cx_a.background());
1084        cx_a.foreground().forbid_parking();
1085
1086        // Connect to a server as 2 clients.
1087        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1088        let client_a = server.create_client(cx_a, "user_a").await;
1089        let client_b = server.create_client(cx_b, "user_b").await;
1090
1091        // Share a project as client A
1092        fs.insert_tree(
1093            "/a",
1094            json!({
1095                ".zed.toml": r#"collaborators = ["user_b"]"#,
1096                "a.txt": "a-contents",
1097                "b.txt": "b-contents",
1098            }),
1099        )
1100        .await;
1101        let project_a = cx_a.update(|cx| {
1102            Project::local(
1103                client_a.clone(),
1104                client_a.user_store.clone(),
1105                lang_registry.clone(),
1106                fs.clone(),
1107                cx,
1108            )
1109        });
1110        let (worktree_a, _) = project_a
1111            .update(cx_a, |p, cx| {
1112                p.find_or_create_local_worktree("/a", true, cx)
1113            })
1114            .await
1115            .unwrap();
1116        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1117        worktree_a
1118            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1119            .await;
1120        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1121        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1122
1123        // Join that project as client B
1124        let project_b = Project::remote(
1125            project_id,
1126            client_b.clone(),
1127            client_b.user_store.clone(),
1128            lang_registry.clone(),
1129            fs.clone(),
1130            &mut cx_b.to_async(),
1131        )
1132        .await
1133        .unwrap();
1134
1135        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1136            assert_eq!(
1137                project
1138                    .collaborators()
1139                    .get(&client_a.peer_id)
1140                    .unwrap()
1141                    .user
1142                    .github_login,
1143                "user_a"
1144            );
1145            project.replica_id()
1146        });
1147        project_a
1148            .condition(&cx_a, |tree, _| {
1149                tree.collaborators()
1150                    .get(&client_b.peer_id)
1151                    .map_or(false, |collaborator| {
1152                        collaborator.replica_id == replica_id_b
1153                            && collaborator.user.github_login == "user_b"
1154                    })
1155            })
1156            .await;
1157
1158        // Open the same file as client B and client A.
1159        let buffer_b = project_b
1160            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1161            .await
1162            .unwrap();
1163        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1164        project_a.read_with(cx_a, |project, cx| {
1165            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1166        });
1167        let buffer_a = project_a
1168            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1169            .await
1170            .unwrap();
1171
1172        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1173
1174        // TODO
1175        // // Create a selection set as client B and see that selection set as client A.
1176        // buffer_a
1177        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1178        //     .await;
1179
1180        // Edit the buffer as client B and see that edit as client A.
1181        editor_b.update(cx_b, |editor, cx| {
1182            editor.handle_input(&Input("ok, ".into()), cx)
1183        });
1184        buffer_a
1185            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1186            .await;
1187
1188        // TODO
1189        // // Remove the selection set as client B, see those selections disappear as client A.
1190        cx_b.update(move |_| drop(editor_b));
1191        // buffer_a
1192        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1193        //     .await;
1194
1195        // Dropping the client B's project removes client B from client A's collaborators.
1196        cx_b.update(move |_| drop(project_b));
1197        project_a
1198            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1199            .await;
1200    }
1201
1202    #[gpui::test(iterations = 10)]
1203    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1204        let lang_registry = Arc::new(LanguageRegistry::test());
1205        let fs = FakeFs::new(cx_a.background());
1206        cx_a.foreground().forbid_parking();
1207
1208        // Connect to a server as 2 clients.
1209        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1210        let client_a = server.create_client(cx_a, "user_a").await;
1211        let client_b = server.create_client(cx_b, "user_b").await;
1212
1213        // Share a project as client A
1214        fs.insert_tree(
1215            "/a",
1216            json!({
1217                ".zed.toml": r#"collaborators = ["user_b"]"#,
1218                "a.txt": "a-contents",
1219                "b.txt": "b-contents",
1220            }),
1221        )
1222        .await;
1223        let project_a = cx_a.update(|cx| {
1224            Project::local(
1225                client_a.clone(),
1226                client_a.user_store.clone(),
1227                lang_registry.clone(),
1228                fs.clone(),
1229                cx,
1230            )
1231        });
1232        let (worktree_a, _) = project_a
1233            .update(cx_a, |p, cx| {
1234                p.find_or_create_local_worktree("/a", true, cx)
1235            })
1236            .await
1237            .unwrap();
1238        worktree_a
1239            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1240            .await;
1241        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1242        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1243        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1244        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1245
1246        // Join that project as client B
1247        let project_b = Project::remote(
1248            project_id,
1249            client_b.clone(),
1250            client_b.user_store.clone(),
1251            lang_registry.clone(),
1252            fs.clone(),
1253            &mut cx_b.to_async(),
1254        )
1255        .await
1256        .unwrap();
1257        project_b
1258            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1259            .await
1260            .unwrap();
1261
1262        // Unshare the project as client A
1263        project_a
1264            .update(cx_a, |project, cx| project.unshare(cx))
1265            .await
1266            .unwrap();
1267        project_b
1268            .condition(cx_b, |project, _| project.is_read_only())
1269            .await;
1270        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1271        cx_b.update(|_| {
1272            drop(project_b);
1273        });
1274
1275        // Share the project again and ensure guests can still join.
1276        project_a
1277            .update(cx_a, |project, cx| project.share(cx))
1278            .await
1279            .unwrap();
1280        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1281
1282        let project_b2 = Project::remote(
1283            project_id,
1284            client_b.clone(),
1285            client_b.user_store.clone(),
1286            lang_registry.clone(),
1287            fs.clone(),
1288            &mut cx_b.to_async(),
1289        )
1290        .await
1291        .unwrap();
1292        project_b2
1293            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1294            .await
1295            .unwrap();
1296    }
1297
1298    #[gpui::test(iterations = 10)]
1299    async fn test_propagate_saves_and_fs_changes(
1300        cx_a: &mut TestAppContext,
1301        cx_b: &mut TestAppContext,
1302        cx_c: &mut TestAppContext,
1303    ) {
1304        let lang_registry = Arc::new(LanguageRegistry::test());
1305        let fs = FakeFs::new(cx_a.background());
1306        cx_a.foreground().forbid_parking();
1307
1308        // Connect to a server as 3 clients.
1309        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1310        let client_a = server.create_client(cx_a, "user_a").await;
1311        let client_b = server.create_client(cx_b, "user_b").await;
1312        let client_c = server.create_client(cx_c, "user_c").await;
1313
1314        // Share a worktree as client A.
1315        fs.insert_tree(
1316            "/a",
1317            json!({
1318                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1319                "file1": "",
1320                "file2": ""
1321            }),
1322        )
1323        .await;
1324        let project_a = cx_a.update(|cx| {
1325            Project::local(
1326                client_a.clone(),
1327                client_a.user_store.clone(),
1328                lang_registry.clone(),
1329                fs.clone(),
1330                cx,
1331            )
1332        });
1333        let (worktree_a, _) = project_a
1334            .update(cx_a, |p, cx| {
1335                p.find_or_create_local_worktree("/a", true, cx)
1336            })
1337            .await
1338            .unwrap();
1339        worktree_a
1340            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1341            .await;
1342        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1343        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1344        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1345
1346        // Join that worktree as clients B and C.
1347        let project_b = Project::remote(
1348            project_id,
1349            client_b.clone(),
1350            client_b.user_store.clone(),
1351            lang_registry.clone(),
1352            fs.clone(),
1353            &mut cx_b.to_async(),
1354        )
1355        .await
1356        .unwrap();
1357        let project_c = Project::remote(
1358            project_id,
1359            client_c.clone(),
1360            client_c.user_store.clone(),
1361            lang_registry.clone(),
1362            fs.clone(),
1363            &mut cx_c.to_async(),
1364        )
1365        .await
1366        .unwrap();
1367        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1368        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1369
1370        // Open and edit a buffer as both guests B and C.
1371        let buffer_b = project_b
1372            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1373            .await
1374            .unwrap();
1375        let buffer_c = project_c
1376            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1377            .await
1378            .unwrap();
1379        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1380        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1381
1382        // Open and edit that buffer as the host.
1383        let buffer_a = project_a
1384            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1385            .await
1386            .unwrap();
1387
1388        buffer_a
1389            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1390            .await;
1391        buffer_a.update(cx_a, |buf, cx| {
1392            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1393        });
1394
1395        // Wait for edits to propagate
1396        buffer_a
1397            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1398            .await;
1399        buffer_b
1400            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1401            .await;
1402        buffer_c
1403            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1404            .await;
1405
1406        // Edit the buffer as the host and concurrently save as guest B.
1407        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1408        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1409        save_b.await.unwrap();
1410        assert_eq!(
1411            fs.load("/a/file1".as_ref()).await.unwrap(),
1412            "hi-a, i-am-c, i-am-b, i-am-a"
1413        );
1414        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1415        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1416        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1417
1418        worktree_a.flush_fs_events(cx_a).await;
1419
1420        // Make changes on host's file system, see those changes on guest worktrees.
1421        fs.rename(
1422            "/a/file1".as_ref(),
1423            "/a/file1-renamed".as_ref(),
1424            Default::default(),
1425        )
1426        .await
1427        .unwrap();
1428
1429        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1430            .await
1431            .unwrap();
1432        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1433
1434        worktree_a
1435            .condition(&cx_a, |tree, _| {
1436                tree.paths()
1437                    .map(|p| p.to_string_lossy())
1438                    .collect::<Vec<_>>()
1439                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1440            })
1441            .await;
1442        worktree_b
1443            .condition(&cx_b, |tree, _| {
1444                tree.paths()
1445                    .map(|p| p.to_string_lossy())
1446                    .collect::<Vec<_>>()
1447                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1448            })
1449            .await;
1450        worktree_c
1451            .condition(&cx_c, |tree, _| {
1452                tree.paths()
1453                    .map(|p| p.to_string_lossy())
1454                    .collect::<Vec<_>>()
1455                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1456            })
1457            .await;
1458
1459        // Ensure buffer files are updated as well.
1460        buffer_a
1461            .condition(&cx_a, |buf, _| {
1462                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1463            })
1464            .await;
1465        buffer_b
1466            .condition(&cx_b, |buf, _| {
1467                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1468            })
1469            .await;
1470        buffer_c
1471            .condition(&cx_c, |buf, _| {
1472                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1473            })
1474            .await;
1475    }
1476
1477    #[gpui::test(iterations = 10)]
1478    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1479        cx_a.foreground().forbid_parking();
1480        let lang_registry = Arc::new(LanguageRegistry::test());
1481        let fs = FakeFs::new(cx_a.background());
1482
1483        // Connect to a server as 2 clients.
1484        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1485        let client_a = server.create_client(cx_a, "user_a").await;
1486        let client_b = server.create_client(cx_b, "user_b").await;
1487
1488        // Share a project as client A
1489        fs.insert_tree(
1490            "/dir",
1491            json!({
1492                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1493                "a.txt": "a-contents",
1494            }),
1495        )
1496        .await;
1497
1498        let project_a = cx_a.update(|cx| {
1499            Project::local(
1500                client_a.clone(),
1501                client_a.user_store.clone(),
1502                lang_registry.clone(),
1503                fs.clone(),
1504                cx,
1505            )
1506        });
1507        let (worktree_a, _) = project_a
1508            .update(cx_a, |p, cx| {
1509                p.find_or_create_local_worktree("/dir", true, cx)
1510            })
1511            .await
1512            .unwrap();
1513        worktree_a
1514            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1515            .await;
1516        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1517        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1518        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1519
1520        // Join that project as client B
1521        let project_b = Project::remote(
1522            project_id,
1523            client_b.clone(),
1524            client_b.user_store.clone(),
1525            lang_registry.clone(),
1526            fs.clone(),
1527            &mut cx_b.to_async(),
1528        )
1529        .await
1530        .unwrap();
1531
1532        // Open a buffer as client B
1533        let buffer_b = project_b
1534            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1535            .await
1536            .unwrap();
1537
1538        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1539        buffer_b.read_with(cx_b, |buf, _| {
1540            assert!(buf.is_dirty());
1541            assert!(!buf.has_conflict());
1542        });
1543
1544        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1545        buffer_b
1546            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1547            .await;
1548        buffer_b.read_with(cx_b, |buf, _| {
1549            assert!(!buf.has_conflict());
1550        });
1551
1552        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1553        buffer_b.read_with(cx_b, |buf, _| {
1554            assert!(buf.is_dirty());
1555            assert!(!buf.has_conflict());
1556        });
1557    }
1558
1559    #[gpui::test(iterations = 10)]
1560    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1561        cx_a.foreground().forbid_parking();
1562        let lang_registry = Arc::new(LanguageRegistry::test());
1563        let fs = FakeFs::new(cx_a.background());
1564
1565        // Connect to a server as 2 clients.
1566        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1567        let client_a = server.create_client(cx_a, "user_a").await;
1568        let client_b = server.create_client(cx_b, "user_b").await;
1569
1570        // Share a project as client A
1571        fs.insert_tree(
1572            "/dir",
1573            json!({
1574                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1575                "a.txt": "a-contents",
1576            }),
1577        )
1578        .await;
1579
1580        let project_a = cx_a.update(|cx| {
1581            Project::local(
1582                client_a.clone(),
1583                client_a.user_store.clone(),
1584                lang_registry.clone(),
1585                fs.clone(),
1586                cx,
1587            )
1588        });
1589        let (worktree_a, _) = project_a
1590            .update(cx_a, |p, cx| {
1591                p.find_or_create_local_worktree("/dir", true, cx)
1592            })
1593            .await
1594            .unwrap();
1595        worktree_a
1596            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1597            .await;
1598        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1599        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1600        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1601
1602        // Join that project as client B
1603        let project_b = Project::remote(
1604            project_id,
1605            client_b.clone(),
1606            client_b.user_store.clone(),
1607            lang_registry.clone(),
1608            fs.clone(),
1609            &mut cx_b.to_async(),
1610        )
1611        .await
1612        .unwrap();
1613        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1614
1615        // Open a buffer as client B
1616        let buffer_b = project_b
1617            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1618            .await
1619            .unwrap();
1620        buffer_b.read_with(cx_b, |buf, _| {
1621            assert!(!buf.is_dirty());
1622            assert!(!buf.has_conflict());
1623        });
1624
1625        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1626            .await
1627            .unwrap();
1628        buffer_b
1629            .condition(&cx_b, |buf, _| {
1630                buf.text() == "new contents" && !buf.is_dirty()
1631            })
1632            .await;
1633        buffer_b.read_with(cx_b, |buf, _| {
1634            assert!(!buf.has_conflict());
1635        });
1636    }
1637
1638    #[gpui::test(iterations = 10)]
1639    async fn test_editing_while_guest_opens_buffer(
1640        cx_a: &mut TestAppContext,
1641        cx_b: &mut TestAppContext,
1642    ) {
1643        cx_a.foreground().forbid_parking();
1644        let lang_registry = Arc::new(LanguageRegistry::test());
1645        let fs = FakeFs::new(cx_a.background());
1646
1647        // Connect to a server as 2 clients.
1648        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1649        let client_a = server.create_client(cx_a, "user_a").await;
1650        let client_b = server.create_client(cx_b, "user_b").await;
1651
1652        // Share a project as client A
1653        fs.insert_tree(
1654            "/dir",
1655            json!({
1656                ".zed.toml": r#"collaborators = ["user_b"]"#,
1657                "a.txt": "a-contents",
1658            }),
1659        )
1660        .await;
1661        let project_a = cx_a.update(|cx| {
1662            Project::local(
1663                client_a.clone(),
1664                client_a.user_store.clone(),
1665                lang_registry.clone(),
1666                fs.clone(),
1667                cx,
1668            )
1669        });
1670        let (worktree_a, _) = project_a
1671            .update(cx_a, |p, cx| {
1672                p.find_or_create_local_worktree("/dir", true, cx)
1673            })
1674            .await
1675            .unwrap();
1676        worktree_a
1677            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1678            .await;
1679        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1680        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1681        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1682
1683        // Join that project as client B
1684        let project_b = Project::remote(
1685            project_id,
1686            client_b.clone(),
1687            client_b.user_store.clone(),
1688            lang_registry.clone(),
1689            fs.clone(),
1690            &mut cx_b.to_async(),
1691        )
1692        .await
1693        .unwrap();
1694
1695        // Open a buffer as client A
1696        let buffer_a = project_a
1697            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1698            .await
1699            .unwrap();
1700
1701        // Start opening the same buffer as client B
1702        let buffer_b = cx_b
1703            .background()
1704            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1705
1706        // Edit the buffer as client A while client B is still opening it.
1707        cx_b.background().simulate_random_delay().await;
1708        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1709        cx_b.background().simulate_random_delay().await;
1710        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1711
1712        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1713        let buffer_b = buffer_b.await.unwrap();
1714        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1715    }
1716
1717    #[gpui::test(iterations = 10)]
1718    async fn test_leaving_worktree_while_opening_buffer(
1719        cx_a: &mut TestAppContext,
1720        cx_b: &mut TestAppContext,
1721    ) {
1722        cx_a.foreground().forbid_parking();
1723        let lang_registry = Arc::new(LanguageRegistry::test());
1724        let fs = FakeFs::new(cx_a.background());
1725
1726        // Connect to a server as 2 clients.
1727        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1728        let client_a = server.create_client(cx_a, "user_a").await;
1729        let client_b = server.create_client(cx_b, "user_b").await;
1730
1731        // Share a project as client A
1732        fs.insert_tree(
1733            "/dir",
1734            json!({
1735                ".zed.toml": r#"collaborators = ["user_b"]"#,
1736                "a.txt": "a-contents",
1737            }),
1738        )
1739        .await;
1740        let project_a = cx_a.update(|cx| {
1741            Project::local(
1742                client_a.clone(),
1743                client_a.user_store.clone(),
1744                lang_registry.clone(),
1745                fs.clone(),
1746                cx,
1747            )
1748        });
1749        let (worktree_a, _) = project_a
1750            .update(cx_a, |p, cx| {
1751                p.find_or_create_local_worktree("/dir", true, cx)
1752            })
1753            .await
1754            .unwrap();
1755        worktree_a
1756            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1757            .await;
1758        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1759        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1760        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1761
1762        // Join that project as client B
1763        let project_b = Project::remote(
1764            project_id,
1765            client_b.clone(),
1766            client_b.user_store.clone(),
1767            lang_registry.clone(),
1768            fs.clone(),
1769            &mut cx_b.to_async(),
1770        )
1771        .await
1772        .unwrap();
1773
1774        // See that a guest has joined as client A.
1775        project_a
1776            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1777            .await;
1778
1779        // Begin opening a buffer as client B, but leave the project before the open completes.
1780        let buffer_b = cx_b
1781            .background()
1782            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1783        cx_b.update(|_| drop(project_b));
1784        drop(buffer_b);
1785
1786        // See that the guest has left.
1787        project_a
1788            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1789            .await;
1790    }
1791
1792    #[gpui::test(iterations = 10)]
1793    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1794        cx_a.foreground().forbid_parking();
1795        let lang_registry = Arc::new(LanguageRegistry::test());
1796        let fs = FakeFs::new(cx_a.background());
1797
1798        // Connect to a server as 2 clients.
1799        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1800        let client_a = server.create_client(cx_a, "user_a").await;
1801        let client_b = server.create_client(cx_b, "user_b").await;
1802
1803        // Share a project as client A
1804        fs.insert_tree(
1805            "/a",
1806            json!({
1807                ".zed.toml": r#"collaborators = ["user_b"]"#,
1808                "a.txt": "a-contents",
1809                "b.txt": "b-contents",
1810            }),
1811        )
1812        .await;
1813        let project_a = cx_a.update(|cx| {
1814            Project::local(
1815                client_a.clone(),
1816                client_a.user_store.clone(),
1817                lang_registry.clone(),
1818                fs.clone(),
1819                cx,
1820            )
1821        });
1822        let (worktree_a, _) = project_a
1823            .update(cx_a, |p, cx| {
1824                p.find_or_create_local_worktree("/a", true, cx)
1825            })
1826            .await
1827            .unwrap();
1828        worktree_a
1829            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1830            .await;
1831        let project_id = project_a
1832            .update(cx_a, |project, _| project.next_remote_id())
1833            .await;
1834        project_a
1835            .update(cx_a, |project, cx| project.share(cx))
1836            .await
1837            .unwrap();
1838
1839        // Join that project as client B
1840        let _project_b = Project::remote(
1841            project_id,
1842            client_b.clone(),
1843            client_b.user_store.clone(),
1844            lang_registry.clone(),
1845            fs.clone(),
1846            &mut cx_b.to_async(),
1847        )
1848        .await
1849        .unwrap();
1850
1851        // Client A sees that a guest has joined.
1852        project_a
1853            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1854            .await;
1855
1856        // Drop client B's connection and ensure client A observes client B leaving the project.
1857        client_b.disconnect(&cx_b.to_async()).unwrap();
1858        project_a
1859            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1860            .await;
1861
1862        // Rejoin the project as client B
1863        let _project_b = Project::remote(
1864            project_id,
1865            client_b.clone(),
1866            client_b.user_store.clone(),
1867            lang_registry.clone(),
1868            fs.clone(),
1869            &mut cx_b.to_async(),
1870        )
1871        .await
1872        .unwrap();
1873
1874        // Client A sees that a guest has re-joined.
1875        project_a
1876            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1877            .await;
1878
1879        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1880        client_b.wait_for_current_user(cx_b).await;
1881        server.disconnect_client(client_b.current_user_id(cx_b));
1882        cx_a.foreground().advance_clock(Duration::from_secs(3));
1883        project_a
1884            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1885            .await;
1886    }
1887
1888    #[gpui::test(iterations = 10)]
1889    async fn test_collaborating_with_diagnostics(
1890        cx_a: &mut TestAppContext,
1891        cx_b: &mut TestAppContext,
1892    ) {
1893        cx_a.foreground().forbid_parking();
1894        let mut lang_registry = Arc::new(LanguageRegistry::test());
1895        let fs = FakeFs::new(cx_a.background());
1896
1897        // Set up a fake language server.
1898        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1899        Arc::get_mut(&mut lang_registry)
1900            .unwrap()
1901            .add(Arc::new(Language::new(
1902                LanguageConfig {
1903                    name: "Rust".into(),
1904                    path_suffixes: vec!["rs".to_string()],
1905                    language_server: Some(language_server_config),
1906                    ..Default::default()
1907                },
1908                Some(tree_sitter_rust::language()),
1909            )));
1910
1911        // Connect to a server as 2 clients.
1912        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1913        let client_a = server.create_client(cx_a, "user_a").await;
1914        let client_b = server.create_client(cx_b, "user_b").await;
1915
1916        // Share a project as client A
1917        fs.insert_tree(
1918            "/a",
1919            json!({
1920                ".zed.toml": r#"collaborators = ["user_b"]"#,
1921                "a.rs": "let one = two",
1922                "other.rs": "",
1923            }),
1924        )
1925        .await;
1926        let project_a = cx_a.update(|cx| {
1927            Project::local(
1928                client_a.clone(),
1929                client_a.user_store.clone(),
1930                lang_registry.clone(),
1931                fs.clone(),
1932                cx,
1933            )
1934        });
1935        let (worktree_a, _) = project_a
1936            .update(cx_a, |p, cx| {
1937                p.find_or_create_local_worktree("/a", true, cx)
1938            })
1939            .await
1940            .unwrap();
1941        worktree_a
1942            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1943            .await;
1944        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1945        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1946        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1947
1948        // Cause the language server to start.
1949        let _ = cx_a
1950            .background()
1951            .spawn(project_a.update(cx_a, |project, cx| {
1952                project.open_buffer(
1953                    ProjectPath {
1954                        worktree_id,
1955                        path: Path::new("other.rs").into(),
1956                    },
1957                    cx,
1958                )
1959            }))
1960            .await
1961            .unwrap();
1962
1963        // Simulate a language server reporting errors for a file.
1964        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1965        fake_language_server
1966            .receive_notification::<lsp::notification::DidOpenTextDocument>()
1967            .await;
1968        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1969            lsp::PublishDiagnosticsParams {
1970                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1971                version: None,
1972                diagnostics: vec![lsp::Diagnostic {
1973                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1974                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1975                    message: "message 1".to_string(),
1976                    ..Default::default()
1977                }],
1978            },
1979        );
1980
1981        // Wait for server to see the diagnostics update.
1982        server
1983            .condition(|store| {
1984                let worktree = store
1985                    .project(project_id)
1986                    .unwrap()
1987                    .share
1988                    .as_ref()
1989                    .unwrap()
1990                    .worktrees
1991                    .get(&worktree_id.to_proto())
1992                    .unwrap();
1993
1994                !worktree.diagnostic_summaries.is_empty()
1995            })
1996            .await;
1997
1998        // Join the worktree as client B.
1999        let project_b = Project::remote(
2000            project_id,
2001            client_b.clone(),
2002            client_b.user_store.clone(),
2003            lang_registry.clone(),
2004            fs.clone(),
2005            &mut cx_b.to_async(),
2006        )
2007        .await
2008        .unwrap();
2009
2010        project_b.read_with(cx_b, |project, cx| {
2011            assert_eq!(
2012                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2013                &[(
2014                    ProjectPath {
2015                        worktree_id,
2016                        path: Arc::from(Path::new("a.rs")),
2017                    },
2018                    DiagnosticSummary {
2019                        error_count: 1,
2020                        warning_count: 0,
2021                        ..Default::default()
2022                    },
2023                )]
2024            )
2025        });
2026
2027        // Simulate a language server reporting more errors for a file.
2028        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2029            lsp::PublishDiagnosticsParams {
2030                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2031                version: None,
2032                diagnostics: vec![
2033                    lsp::Diagnostic {
2034                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2035                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2036                        message: "message 1".to_string(),
2037                        ..Default::default()
2038                    },
2039                    lsp::Diagnostic {
2040                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2041                        range: lsp::Range::new(
2042                            lsp::Position::new(0, 10),
2043                            lsp::Position::new(0, 13),
2044                        ),
2045                        message: "message 2".to_string(),
2046                        ..Default::default()
2047                    },
2048                ],
2049            },
2050        );
2051
2052        // Client b gets the updated summaries
2053        project_b
2054            .condition(&cx_b, |project, cx| {
2055                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2056                    == &[(
2057                        ProjectPath {
2058                            worktree_id,
2059                            path: Arc::from(Path::new("a.rs")),
2060                        },
2061                        DiagnosticSummary {
2062                            error_count: 1,
2063                            warning_count: 1,
2064                            ..Default::default()
2065                        },
2066                    )]
2067            })
2068            .await;
2069
2070        // Open the file with the errors on client B. They should be present.
2071        let buffer_b = cx_b
2072            .background()
2073            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2074            .await
2075            .unwrap();
2076
2077        buffer_b.read_with(cx_b, |buffer, _| {
2078            assert_eq!(
2079                buffer
2080                    .snapshot()
2081                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2082                    .map(|entry| entry)
2083                    .collect::<Vec<_>>(),
2084                &[
2085                    DiagnosticEntry {
2086                        range: Point::new(0, 4)..Point::new(0, 7),
2087                        diagnostic: Diagnostic {
2088                            group_id: 0,
2089                            message: "message 1".to_string(),
2090                            severity: lsp::DiagnosticSeverity::ERROR,
2091                            is_primary: true,
2092                            ..Default::default()
2093                        }
2094                    },
2095                    DiagnosticEntry {
2096                        range: Point::new(0, 10)..Point::new(0, 13),
2097                        diagnostic: Diagnostic {
2098                            group_id: 1,
2099                            severity: lsp::DiagnosticSeverity::WARNING,
2100                            message: "message 2".to_string(),
2101                            is_primary: true,
2102                            ..Default::default()
2103                        }
2104                    }
2105                ]
2106            );
2107        });
2108    }
2109
2110    #[gpui::test(iterations = 10)]
2111    async fn test_collaborating_with_completion(
2112        cx_a: &mut TestAppContext,
2113        cx_b: &mut TestAppContext,
2114    ) {
2115        cx_a.foreground().forbid_parking();
2116        let mut lang_registry = Arc::new(LanguageRegistry::test());
2117        let fs = FakeFs::new(cx_a.background());
2118
2119        // Set up a fake language server.
2120        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2121        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2122            completion_provider: Some(lsp::CompletionOptions {
2123                trigger_characters: Some(vec![".".to_string()]),
2124                ..Default::default()
2125            }),
2126            ..Default::default()
2127        });
2128        Arc::get_mut(&mut lang_registry)
2129            .unwrap()
2130            .add(Arc::new(Language::new(
2131                LanguageConfig {
2132                    name: "Rust".into(),
2133                    path_suffixes: vec!["rs".to_string()],
2134                    language_server: Some(language_server_config),
2135                    ..Default::default()
2136                },
2137                Some(tree_sitter_rust::language()),
2138            )));
2139
2140        // Connect to a server as 2 clients.
2141        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2142        let client_a = server.create_client(cx_a, "user_a").await;
2143        let client_b = server.create_client(cx_b, "user_b").await;
2144
2145        // Share a project as client A
2146        fs.insert_tree(
2147            "/a",
2148            json!({
2149                ".zed.toml": r#"collaborators = ["user_b"]"#,
2150                "main.rs": "fn main() { a }",
2151                "other.rs": "",
2152            }),
2153        )
2154        .await;
2155        let project_a = cx_a.update(|cx| {
2156            Project::local(
2157                client_a.clone(),
2158                client_a.user_store.clone(),
2159                lang_registry.clone(),
2160                fs.clone(),
2161                cx,
2162            )
2163        });
2164        let (worktree_a, _) = project_a
2165            .update(cx_a, |p, cx| {
2166                p.find_or_create_local_worktree("/a", true, cx)
2167            })
2168            .await
2169            .unwrap();
2170        worktree_a
2171            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2172            .await;
2173        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2174        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2175        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2176
2177        // Join the worktree as client B.
2178        let project_b = Project::remote(
2179            project_id,
2180            client_b.clone(),
2181            client_b.user_store.clone(),
2182            lang_registry.clone(),
2183            fs.clone(),
2184            &mut cx_b.to_async(),
2185        )
2186        .await
2187        .unwrap();
2188
2189        // Open a file in an editor as the guest.
2190        let buffer_b = project_b
2191            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2192            .await
2193            .unwrap();
2194        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2195        let editor_b = cx_b.add_view(window_b, |cx| {
2196            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2197        });
2198
2199        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2200        buffer_b
2201            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2202            .await;
2203
2204        // Type a completion trigger character as the guest.
2205        editor_b.update(cx_b, |editor, cx| {
2206            editor.select_ranges([13..13], None, cx);
2207            editor.handle_input(&Input(".".into()), cx);
2208            cx.focus(&editor_b);
2209        });
2210
2211        // Receive a completion request as the host's language server.
2212        // Return some completions from the host's language server.
2213        cx_a.foreground().start_waiting();
2214        fake_language_server
2215            .handle_request::<lsp::request::Completion, _>(|params, _| {
2216                assert_eq!(
2217                    params.text_document_position.text_document.uri,
2218                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2219                );
2220                assert_eq!(
2221                    params.text_document_position.position,
2222                    lsp::Position::new(0, 14),
2223                );
2224
2225                Some(lsp::CompletionResponse::Array(vec![
2226                    lsp::CompletionItem {
2227                        label: "first_method(…)".into(),
2228                        detail: Some("fn(&mut self, B) -> C".into()),
2229                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2230                            new_text: "first_method($1)".to_string(),
2231                            range: lsp::Range::new(
2232                                lsp::Position::new(0, 14),
2233                                lsp::Position::new(0, 14),
2234                            ),
2235                        })),
2236                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2237                        ..Default::default()
2238                    },
2239                    lsp::CompletionItem {
2240                        label: "second_method(…)".into(),
2241                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2242                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2243                            new_text: "second_method()".to_string(),
2244                            range: lsp::Range::new(
2245                                lsp::Position::new(0, 14),
2246                                lsp::Position::new(0, 14),
2247                            ),
2248                        })),
2249                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2250                        ..Default::default()
2251                    },
2252                ]))
2253            })
2254            .next()
2255            .await
2256            .unwrap();
2257        cx_a.foreground().finish_waiting();
2258
2259        // Open the buffer on the host.
2260        let buffer_a = project_a
2261            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2262            .await
2263            .unwrap();
2264        buffer_a
2265            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2266            .await;
2267
2268        // Confirm a completion on the guest.
2269        editor_b
2270            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2271            .await;
2272        editor_b.update(cx_b, |editor, cx| {
2273            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2274            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2275        });
2276
2277        // Return a resolved completion from the host's language server.
2278        // The resolved completion has an additional text edit.
2279        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2280            |params, _| {
2281                assert_eq!(params.label, "first_method(…)");
2282                lsp::CompletionItem {
2283                    label: "first_method(…)".into(),
2284                    detail: Some("fn(&mut self, B) -> C".into()),
2285                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2286                        new_text: "first_method($1)".to_string(),
2287                        range: lsp::Range::new(
2288                            lsp::Position::new(0, 14),
2289                            lsp::Position::new(0, 14),
2290                        ),
2291                    })),
2292                    additional_text_edits: Some(vec![lsp::TextEdit {
2293                        new_text: "use d::SomeTrait;\n".to_string(),
2294                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2295                    }]),
2296                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2297                    ..Default::default()
2298                }
2299            },
2300        );
2301
2302        // The additional edit is applied.
2303        buffer_a
2304            .condition(&cx_a, |buffer, _| {
2305                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2306            })
2307            .await;
2308        buffer_b
2309            .condition(&cx_b, |buffer, _| {
2310                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2311            })
2312            .await;
2313    }
2314
2315    #[gpui::test(iterations = 10)]
2316    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2317        cx_a.foreground().forbid_parking();
2318        let mut lang_registry = Arc::new(LanguageRegistry::test());
2319        let fs = FakeFs::new(cx_a.background());
2320
2321        // Set up a fake language server.
2322        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2323        Arc::get_mut(&mut lang_registry)
2324            .unwrap()
2325            .add(Arc::new(Language::new(
2326                LanguageConfig {
2327                    name: "Rust".into(),
2328                    path_suffixes: vec!["rs".to_string()],
2329                    language_server: Some(language_server_config),
2330                    ..Default::default()
2331                },
2332                Some(tree_sitter_rust::language()),
2333            )));
2334
2335        // Connect to a server as 2 clients.
2336        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2337        let client_a = server.create_client(cx_a, "user_a").await;
2338        let client_b = server.create_client(cx_b, "user_b").await;
2339
2340        // Share a project as client A
2341        fs.insert_tree(
2342            "/a",
2343            json!({
2344                ".zed.toml": r#"collaborators = ["user_b"]"#,
2345                "a.rs": "let one = two",
2346            }),
2347        )
2348        .await;
2349        let project_a = cx_a.update(|cx| {
2350            Project::local(
2351                client_a.clone(),
2352                client_a.user_store.clone(),
2353                lang_registry.clone(),
2354                fs.clone(),
2355                cx,
2356            )
2357        });
2358        let (worktree_a, _) = project_a
2359            .update(cx_a, |p, cx| {
2360                p.find_or_create_local_worktree("/a", true, cx)
2361            })
2362            .await
2363            .unwrap();
2364        worktree_a
2365            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2366            .await;
2367        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2368        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2369        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2370
2371        // Join the worktree as client B.
2372        let project_b = Project::remote(
2373            project_id,
2374            client_b.clone(),
2375            client_b.user_store.clone(),
2376            lang_registry.clone(),
2377            fs.clone(),
2378            &mut cx_b.to_async(),
2379        )
2380        .await
2381        .unwrap();
2382
2383        let buffer_b = cx_b
2384            .background()
2385            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2386            .await
2387            .unwrap();
2388
2389        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2390        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2391            Some(vec![
2392                lsp::TextEdit {
2393                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2394                    new_text: "h".to_string(),
2395                },
2396                lsp::TextEdit {
2397                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2398                    new_text: "y".to_string(),
2399                },
2400            ])
2401        });
2402
2403        project_b
2404            .update(cx_b, |project, cx| {
2405                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2406            })
2407            .await
2408            .unwrap();
2409        assert_eq!(
2410            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2411            "let honey = two"
2412        );
2413    }
2414
2415    #[gpui::test(iterations = 10)]
2416    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2417        cx_a.foreground().forbid_parking();
2418        let mut lang_registry = Arc::new(LanguageRegistry::test());
2419        let fs = FakeFs::new(cx_a.background());
2420        fs.insert_tree(
2421            "/root-1",
2422            json!({
2423                ".zed.toml": r#"collaborators = ["user_b"]"#,
2424                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2425            }),
2426        )
2427        .await;
2428        fs.insert_tree(
2429            "/root-2",
2430            json!({
2431                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2432            }),
2433        )
2434        .await;
2435
2436        // Set up a fake language server.
2437        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2438        Arc::get_mut(&mut lang_registry)
2439            .unwrap()
2440            .add(Arc::new(Language::new(
2441                LanguageConfig {
2442                    name: "Rust".into(),
2443                    path_suffixes: vec!["rs".to_string()],
2444                    language_server: Some(language_server_config),
2445                    ..Default::default()
2446                },
2447                Some(tree_sitter_rust::language()),
2448            )));
2449
2450        // Connect to a server as 2 clients.
2451        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2452        let client_a = server.create_client(cx_a, "user_a").await;
2453        let client_b = server.create_client(cx_b, "user_b").await;
2454
2455        // Share a project as client A
2456        let project_a = cx_a.update(|cx| {
2457            Project::local(
2458                client_a.clone(),
2459                client_a.user_store.clone(),
2460                lang_registry.clone(),
2461                fs.clone(),
2462                cx,
2463            )
2464        });
2465        let (worktree_a, _) = project_a
2466            .update(cx_a, |p, cx| {
2467                p.find_or_create_local_worktree("/root-1", true, cx)
2468            })
2469            .await
2470            .unwrap();
2471        worktree_a
2472            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2473            .await;
2474        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2475        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2476        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2477
2478        // Join the worktree as client B.
2479        let project_b = Project::remote(
2480            project_id,
2481            client_b.clone(),
2482            client_b.user_store.clone(),
2483            lang_registry.clone(),
2484            fs.clone(),
2485            &mut cx_b.to_async(),
2486        )
2487        .await
2488        .unwrap();
2489
2490        // Open the file on client B.
2491        let buffer_b = cx_b
2492            .background()
2493            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2494            .await
2495            .unwrap();
2496
2497        // Request the definition of a symbol as the guest.
2498        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2499        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2500            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2501                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2502                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2503            )))
2504        });
2505
2506        let definitions_1 = project_b
2507            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2508            .await
2509            .unwrap();
2510        cx_b.read(|cx| {
2511            assert_eq!(definitions_1.len(), 1);
2512            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2513            let target_buffer = definitions_1[0].buffer.read(cx);
2514            assert_eq!(
2515                target_buffer.text(),
2516                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2517            );
2518            assert_eq!(
2519                definitions_1[0].range.to_point(target_buffer),
2520                Point::new(0, 6)..Point::new(0, 9)
2521            );
2522        });
2523
2524        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2525        // the previous call to `definition`.
2526        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2527            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2528                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2529                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2530            )))
2531        });
2532
2533        let definitions_2 = project_b
2534            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2535            .await
2536            .unwrap();
2537        cx_b.read(|cx| {
2538            assert_eq!(definitions_2.len(), 1);
2539            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2540            let target_buffer = definitions_2[0].buffer.read(cx);
2541            assert_eq!(
2542                target_buffer.text(),
2543                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2544            );
2545            assert_eq!(
2546                definitions_2[0].range.to_point(target_buffer),
2547                Point::new(1, 6)..Point::new(1, 11)
2548            );
2549        });
2550        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2551    }
2552
2553    #[gpui::test(iterations = 10)]
2554    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2555        cx_a.foreground().forbid_parking();
2556        let mut lang_registry = Arc::new(LanguageRegistry::test());
2557        let fs = FakeFs::new(cx_a.background());
2558        fs.insert_tree(
2559            "/root-1",
2560            json!({
2561                ".zed.toml": r#"collaborators = ["user_b"]"#,
2562                "one.rs": "const ONE: usize = 1;",
2563                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2564            }),
2565        )
2566        .await;
2567        fs.insert_tree(
2568            "/root-2",
2569            json!({
2570                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2571            }),
2572        )
2573        .await;
2574
2575        // Set up a fake language server.
2576        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2577        Arc::get_mut(&mut lang_registry)
2578            .unwrap()
2579            .add(Arc::new(Language::new(
2580                LanguageConfig {
2581                    name: "Rust".into(),
2582                    path_suffixes: vec!["rs".to_string()],
2583                    language_server: Some(language_server_config),
2584                    ..Default::default()
2585                },
2586                Some(tree_sitter_rust::language()),
2587            )));
2588
2589        // Connect to a server as 2 clients.
2590        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2591        let client_a = server.create_client(cx_a, "user_a").await;
2592        let client_b = server.create_client(cx_b, "user_b").await;
2593
2594        // Share a project as client A
2595        let project_a = cx_a.update(|cx| {
2596            Project::local(
2597                client_a.clone(),
2598                client_a.user_store.clone(),
2599                lang_registry.clone(),
2600                fs.clone(),
2601                cx,
2602            )
2603        });
2604        let (worktree_a, _) = project_a
2605            .update(cx_a, |p, cx| {
2606                p.find_or_create_local_worktree("/root-1", true, cx)
2607            })
2608            .await
2609            .unwrap();
2610        worktree_a
2611            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2612            .await;
2613        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2614        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2615        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2616
2617        // Join the worktree as client B.
2618        let project_b = Project::remote(
2619            project_id,
2620            client_b.clone(),
2621            client_b.user_store.clone(),
2622            lang_registry.clone(),
2623            fs.clone(),
2624            &mut cx_b.to_async(),
2625        )
2626        .await
2627        .unwrap();
2628
2629        // Open the file on client B.
2630        let buffer_b = cx_b
2631            .background()
2632            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2633            .await
2634            .unwrap();
2635
2636        // Request references to a symbol as the guest.
2637        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2638        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2639            assert_eq!(
2640                params.text_document_position.text_document.uri.as_str(),
2641                "file:///root-1/one.rs"
2642            );
2643            Some(vec![
2644                lsp::Location {
2645                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2646                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2647                },
2648                lsp::Location {
2649                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2650                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2651                },
2652                lsp::Location {
2653                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2654                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2655                },
2656            ])
2657        });
2658
2659        let references = project_b
2660            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2661            .await
2662            .unwrap();
2663        cx_b.read(|cx| {
2664            assert_eq!(references.len(), 3);
2665            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2666
2667            let two_buffer = references[0].buffer.read(cx);
2668            let three_buffer = references[2].buffer.read(cx);
2669            assert_eq!(
2670                two_buffer.file().unwrap().path().as_ref(),
2671                Path::new("two.rs")
2672            );
2673            assert_eq!(references[1].buffer, references[0].buffer);
2674            assert_eq!(
2675                three_buffer.file().unwrap().full_path(cx),
2676                Path::new("three.rs")
2677            );
2678
2679            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2680            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2681            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2682        });
2683    }
2684
2685    #[gpui::test(iterations = 10)]
2686    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2687        cx_a.foreground().forbid_parking();
2688        let lang_registry = Arc::new(LanguageRegistry::test());
2689        let fs = FakeFs::new(cx_a.background());
2690        fs.insert_tree(
2691            "/root-1",
2692            json!({
2693                ".zed.toml": r#"collaborators = ["user_b"]"#,
2694                "a": "hello world",
2695                "b": "goodnight moon",
2696                "c": "a world of goo",
2697                "d": "world champion of clown world",
2698            }),
2699        )
2700        .await;
2701        fs.insert_tree(
2702            "/root-2",
2703            json!({
2704                "e": "disney world is fun",
2705            }),
2706        )
2707        .await;
2708
2709        // Connect to a server as 2 clients.
2710        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2711        let client_a = server.create_client(cx_a, "user_a").await;
2712        let client_b = server.create_client(cx_b, "user_b").await;
2713
2714        // Share a project as client A
2715        let project_a = cx_a.update(|cx| {
2716            Project::local(
2717                client_a.clone(),
2718                client_a.user_store.clone(),
2719                lang_registry.clone(),
2720                fs.clone(),
2721                cx,
2722            )
2723        });
2724        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2725
2726        let (worktree_1, _) = project_a
2727            .update(cx_a, |p, cx| {
2728                p.find_or_create_local_worktree("/root-1", true, cx)
2729            })
2730            .await
2731            .unwrap();
2732        worktree_1
2733            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2734            .await;
2735        let (worktree_2, _) = project_a
2736            .update(cx_a, |p, cx| {
2737                p.find_or_create_local_worktree("/root-2", true, cx)
2738            })
2739            .await
2740            .unwrap();
2741        worktree_2
2742            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2743            .await;
2744
2745        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2746
2747        // Join the worktree as client B.
2748        let project_b = Project::remote(
2749            project_id,
2750            client_b.clone(),
2751            client_b.user_store.clone(),
2752            lang_registry.clone(),
2753            fs.clone(),
2754            &mut cx_b.to_async(),
2755        )
2756        .await
2757        .unwrap();
2758
2759        let results = project_b
2760            .update(cx_b, |project, cx| {
2761                project.search(SearchQuery::text("world", false, false), cx)
2762            })
2763            .await
2764            .unwrap();
2765
2766        let mut ranges_by_path = results
2767            .into_iter()
2768            .map(|(buffer, ranges)| {
2769                buffer.read_with(cx_b, |buffer, cx| {
2770                    let path = buffer.file().unwrap().full_path(cx);
2771                    let offset_ranges = ranges
2772                        .into_iter()
2773                        .map(|range| range.to_offset(buffer))
2774                        .collect::<Vec<_>>();
2775                    (path, offset_ranges)
2776                })
2777            })
2778            .collect::<Vec<_>>();
2779        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2780
2781        assert_eq!(
2782            ranges_by_path,
2783            &[
2784                (PathBuf::from("root-1/a"), vec![6..11]),
2785                (PathBuf::from("root-1/c"), vec![2..7]),
2786                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2787                (PathBuf::from("root-2/e"), vec![7..12]),
2788            ]
2789        );
2790    }
2791
2792    #[gpui::test(iterations = 10)]
2793    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2794        cx_a.foreground().forbid_parking();
2795        let lang_registry = Arc::new(LanguageRegistry::test());
2796        let fs = FakeFs::new(cx_a.background());
2797        fs.insert_tree(
2798            "/root-1",
2799            json!({
2800                ".zed.toml": r#"collaborators = ["user_b"]"#,
2801                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2802            }),
2803        )
2804        .await;
2805
2806        // Set up a fake language server.
2807        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2808        lang_registry.add(Arc::new(Language::new(
2809            LanguageConfig {
2810                name: "Rust".into(),
2811                path_suffixes: vec!["rs".to_string()],
2812                language_server: Some(language_server_config),
2813                ..Default::default()
2814            },
2815            Some(tree_sitter_rust::language()),
2816        )));
2817
2818        // Connect to a server as 2 clients.
2819        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2820        let client_a = server.create_client(cx_a, "user_a").await;
2821        let client_b = server.create_client(cx_b, "user_b").await;
2822
2823        // Share a project as client A
2824        let project_a = cx_a.update(|cx| {
2825            Project::local(
2826                client_a.clone(),
2827                client_a.user_store.clone(),
2828                lang_registry.clone(),
2829                fs.clone(),
2830                cx,
2831            )
2832        });
2833        let (worktree_a, _) = project_a
2834            .update(cx_a, |p, cx| {
2835                p.find_or_create_local_worktree("/root-1", true, cx)
2836            })
2837            .await
2838            .unwrap();
2839        worktree_a
2840            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2841            .await;
2842        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2843        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2844        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2845
2846        // Join the worktree as client B.
2847        let project_b = Project::remote(
2848            project_id,
2849            client_b.clone(),
2850            client_b.user_store.clone(),
2851            lang_registry.clone(),
2852            fs.clone(),
2853            &mut cx_b.to_async(),
2854        )
2855        .await
2856        .unwrap();
2857
2858        // Open the file on client B.
2859        let buffer_b = cx_b
2860            .background()
2861            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2862            .await
2863            .unwrap();
2864
2865        // Request document highlights as the guest.
2866        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2867        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2868            |params, _| {
2869                assert_eq!(
2870                    params
2871                        .text_document_position_params
2872                        .text_document
2873                        .uri
2874                        .as_str(),
2875                    "file:///root-1/main.rs"
2876                );
2877                assert_eq!(
2878                    params.text_document_position_params.position,
2879                    lsp::Position::new(0, 34)
2880                );
2881                Some(vec![
2882                    lsp::DocumentHighlight {
2883                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2884                        range: lsp::Range::new(
2885                            lsp::Position::new(0, 10),
2886                            lsp::Position::new(0, 16),
2887                        ),
2888                    },
2889                    lsp::DocumentHighlight {
2890                        kind: Some(lsp::DocumentHighlightKind::READ),
2891                        range: lsp::Range::new(
2892                            lsp::Position::new(0, 32),
2893                            lsp::Position::new(0, 38),
2894                        ),
2895                    },
2896                    lsp::DocumentHighlight {
2897                        kind: Some(lsp::DocumentHighlightKind::READ),
2898                        range: lsp::Range::new(
2899                            lsp::Position::new(0, 41),
2900                            lsp::Position::new(0, 47),
2901                        ),
2902                    },
2903                ])
2904            },
2905        );
2906
2907        let highlights = project_b
2908            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2909            .await
2910            .unwrap();
2911        buffer_b.read_with(cx_b, |buffer, _| {
2912            let snapshot = buffer.snapshot();
2913
2914            let highlights = highlights
2915                .into_iter()
2916                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2917                .collect::<Vec<_>>();
2918            assert_eq!(
2919                highlights,
2920                &[
2921                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2922                    (lsp::DocumentHighlightKind::READ, 32..38),
2923                    (lsp::DocumentHighlightKind::READ, 41..47)
2924                ]
2925            )
2926        });
2927    }
2928
2929    #[gpui::test(iterations = 10)]
2930    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2931        cx_a.foreground().forbid_parking();
2932        let mut lang_registry = Arc::new(LanguageRegistry::test());
2933        let fs = FakeFs::new(cx_a.background());
2934        fs.insert_tree(
2935            "/code",
2936            json!({
2937                "crate-1": {
2938                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2939                    "one.rs": "const ONE: usize = 1;",
2940                },
2941                "crate-2": {
2942                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2943                },
2944                "private": {
2945                    "passwords.txt": "the-password",
2946                }
2947            }),
2948        )
2949        .await;
2950
2951        // Set up a fake language server.
2952        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2953        Arc::get_mut(&mut lang_registry)
2954            .unwrap()
2955            .add(Arc::new(Language::new(
2956                LanguageConfig {
2957                    name: "Rust".into(),
2958                    path_suffixes: vec!["rs".to_string()],
2959                    language_server: Some(language_server_config),
2960                    ..Default::default()
2961                },
2962                Some(tree_sitter_rust::language()),
2963            )));
2964
2965        // Connect to a server as 2 clients.
2966        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2967        let client_a = server.create_client(cx_a, "user_a").await;
2968        let client_b = server.create_client(cx_b, "user_b").await;
2969
2970        // Share a project as client A
2971        let project_a = cx_a.update(|cx| {
2972            Project::local(
2973                client_a.clone(),
2974                client_a.user_store.clone(),
2975                lang_registry.clone(),
2976                fs.clone(),
2977                cx,
2978            )
2979        });
2980        let (worktree_a, _) = project_a
2981            .update(cx_a, |p, cx| {
2982                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2983            })
2984            .await
2985            .unwrap();
2986        worktree_a
2987            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2988            .await;
2989        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2990        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2991        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2992
2993        // Join the worktree as client B.
2994        let project_b = Project::remote(
2995            project_id,
2996            client_b.clone(),
2997            client_b.user_store.clone(),
2998            lang_registry.clone(),
2999            fs.clone(),
3000            &mut cx_b.to_async(),
3001        )
3002        .await
3003        .unwrap();
3004
3005        // Cause the language server to start.
3006        let _buffer = cx_b
3007            .background()
3008            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3009            .await
3010            .unwrap();
3011
3012        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3013        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3014            #[allow(deprecated)]
3015            Some(vec![lsp::SymbolInformation {
3016                name: "TWO".into(),
3017                location: lsp::Location {
3018                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3019                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3020                },
3021                kind: lsp::SymbolKind::CONSTANT,
3022                tags: None,
3023                container_name: None,
3024                deprecated: None,
3025            }])
3026        });
3027
3028        // Request the definition of a symbol as the guest.
3029        let symbols = project_b
3030            .update(cx_b, |p, cx| p.symbols("two", cx))
3031            .await
3032            .unwrap();
3033        assert_eq!(symbols.len(), 1);
3034        assert_eq!(symbols[0].name, "TWO");
3035
3036        // Open one of the returned symbols.
3037        let buffer_b_2 = project_b
3038            .update(cx_b, |project, cx| {
3039                project.open_buffer_for_symbol(&symbols[0], cx)
3040            })
3041            .await
3042            .unwrap();
3043        buffer_b_2.read_with(cx_b, |buffer, _| {
3044            assert_eq!(
3045                buffer.file().unwrap().path().as_ref(),
3046                Path::new("../crate-2/two.rs")
3047            );
3048        });
3049
3050        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3051        let mut fake_symbol = symbols[0].clone();
3052        fake_symbol.path = Path::new("/code/secrets").into();
3053        let error = project_b
3054            .update(cx_b, |project, cx| {
3055                project.open_buffer_for_symbol(&fake_symbol, cx)
3056            })
3057            .await
3058            .unwrap_err();
3059        assert!(error.to_string().contains("invalid symbol signature"));
3060    }
3061
3062    #[gpui::test(iterations = 10)]
3063    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3064        cx_a: &mut TestAppContext,
3065        cx_b: &mut TestAppContext,
3066        mut rng: StdRng,
3067    ) {
3068        cx_a.foreground().forbid_parking();
3069        let mut lang_registry = Arc::new(LanguageRegistry::test());
3070        let fs = FakeFs::new(cx_a.background());
3071        fs.insert_tree(
3072            "/root",
3073            json!({
3074                ".zed.toml": r#"collaborators = ["user_b"]"#,
3075                "a.rs": "const ONE: usize = b::TWO;",
3076                "b.rs": "const TWO: usize = 2",
3077            }),
3078        )
3079        .await;
3080
3081        // Set up a fake language server.
3082        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3083
3084        Arc::get_mut(&mut lang_registry)
3085            .unwrap()
3086            .add(Arc::new(Language::new(
3087                LanguageConfig {
3088                    name: "Rust".into(),
3089                    path_suffixes: vec!["rs".to_string()],
3090                    language_server: Some(language_server_config),
3091                    ..Default::default()
3092                },
3093                Some(tree_sitter_rust::language()),
3094            )));
3095
3096        // Connect to a server as 2 clients.
3097        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3098        let client_a = server.create_client(cx_a, "user_a").await;
3099        let client_b = server.create_client(cx_b, "user_b").await;
3100
3101        // Share a project as client A
3102        let project_a = cx_a.update(|cx| {
3103            Project::local(
3104                client_a.clone(),
3105                client_a.user_store.clone(),
3106                lang_registry.clone(),
3107                fs.clone(),
3108                cx,
3109            )
3110        });
3111
3112        let (worktree_a, _) = project_a
3113            .update(cx_a, |p, cx| {
3114                p.find_or_create_local_worktree("/root", true, cx)
3115            })
3116            .await
3117            .unwrap();
3118        worktree_a
3119            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3120            .await;
3121        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3122        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3123        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3124
3125        // Join the worktree as client B.
3126        let project_b = Project::remote(
3127            project_id,
3128            client_b.clone(),
3129            client_b.user_store.clone(),
3130            lang_registry.clone(),
3131            fs.clone(),
3132            &mut cx_b.to_async(),
3133        )
3134        .await
3135        .unwrap();
3136
3137        let buffer_b1 = cx_b
3138            .background()
3139            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3140            .await
3141            .unwrap();
3142
3143        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3144        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3145            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3146                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3147                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3148            )))
3149        });
3150
3151        let definitions;
3152        let buffer_b2;
3153        if rng.gen() {
3154            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3155            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3156        } else {
3157            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3158            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3159        }
3160
3161        let buffer_b2 = buffer_b2.await.unwrap();
3162        let definitions = definitions.await.unwrap();
3163        assert_eq!(definitions.len(), 1);
3164        assert_eq!(definitions[0].buffer, buffer_b2);
3165    }
3166
3167    #[gpui::test(iterations = 10)]
3168    async fn test_collaborating_with_code_actions(
3169        cx_a: &mut TestAppContext,
3170        cx_b: &mut TestAppContext,
3171    ) {
3172        cx_a.foreground().forbid_parking();
3173        let mut lang_registry = Arc::new(LanguageRegistry::test());
3174        let fs = FakeFs::new(cx_a.background());
3175        cx_b.update(|cx| editor::init(cx));
3176
3177        // Set up a fake language server.
3178        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3179        Arc::get_mut(&mut lang_registry)
3180            .unwrap()
3181            .add(Arc::new(Language::new(
3182                LanguageConfig {
3183                    name: "Rust".into(),
3184                    path_suffixes: vec!["rs".to_string()],
3185                    language_server: Some(language_server_config),
3186                    ..Default::default()
3187                },
3188                Some(tree_sitter_rust::language()),
3189            )));
3190
3191        // Connect to a server as 2 clients.
3192        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3193        let client_a = server.create_client(cx_a, "user_a").await;
3194        let client_b = server.create_client(cx_b, "user_b").await;
3195
3196        // Share a project as client A
3197        fs.insert_tree(
3198            "/a",
3199            json!({
3200                ".zed.toml": r#"collaborators = ["user_b"]"#,
3201                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3202                "other.rs": "pub fn foo() -> usize { 4 }",
3203            }),
3204        )
3205        .await;
3206        let project_a = cx_a.update(|cx| {
3207            Project::local(
3208                client_a.clone(),
3209                client_a.user_store.clone(),
3210                lang_registry.clone(),
3211                fs.clone(),
3212                cx,
3213            )
3214        });
3215        let (worktree_a, _) = project_a
3216            .update(cx_a, |p, cx| {
3217                p.find_or_create_local_worktree("/a", true, cx)
3218            })
3219            .await
3220            .unwrap();
3221        worktree_a
3222            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3223            .await;
3224        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3225        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3226        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3227
3228        // Join the worktree as client B.
3229        let project_b = Project::remote(
3230            project_id,
3231            client_b.clone(),
3232            client_b.user_store.clone(),
3233            lang_registry.clone(),
3234            fs.clone(),
3235            &mut cx_b.to_async(),
3236        )
3237        .await
3238        .unwrap();
3239        let mut params = cx_b.update(WorkspaceParams::test);
3240        params.languages = lang_registry.clone();
3241        params.client = client_b.client.clone();
3242        params.user_store = client_b.user_store.clone();
3243        params.project = project_b;
3244
3245        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3246        let editor_b = workspace_b
3247            .update(cx_b, |workspace, cx| {
3248                workspace.open_path((worktree_id, "main.rs"), cx)
3249            })
3250            .await
3251            .unwrap()
3252            .downcast::<Editor>()
3253            .unwrap();
3254
3255        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3256        fake_language_server
3257            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3258                assert_eq!(
3259                    params.text_document.uri,
3260                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3261                );
3262                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3263                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3264                None
3265            })
3266            .next()
3267            .await;
3268
3269        // Move cursor to a location that contains code actions.
3270        editor_b.update(cx_b, |editor, cx| {
3271            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3272            cx.focus(&editor_b);
3273        });
3274
3275        fake_language_server
3276            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3277                assert_eq!(
3278                    params.text_document.uri,
3279                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3280                );
3281                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3282                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3283
3284                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3285                    lsp::CodeAction {
3286                        title: "Inline into all callers".to_string(),
3287                        edit: Some(lsp::WorkspaceEdit {
3288                            changes: Some(
3289                                [
3290                                    (
3291                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3292                                        vec![lsp::TextEdit::new(
3293                                            lsp::Range::new(
3294                                                lsp::Position::new(1, 22),
3295                                                lsp::Position::new(1, 34),
3296                                            ),
3297                                            "4".to_string(),
3298                                        )],
3299                                    ),
3300                                    (
3301                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3302                                        vec![lsp::TextEdit::new(
3303                                            lsp::Range::new(
3304                                                lsp::Position::new(0, 0),
3305                                                lsp::Position::new(0, 27),
3306                                            ),
3307                                            "".to_string(),
3308                                        )],
3309                                    ),
3310                                ]
3311                                .into_iter()
3312                                .collect(),
3313                            ),
3314                            ..Default::default()
3315                        }),
3316                        data: Some(json!({
3317                            "codeActionParams": {
3318                                "range": {
3319                                    "start": {"line": 1, "column": 31},
3320                                    "end": {"line": 1, "column": 31},
3321                                }
3322                            }
3323                        })),
3324                        ..Default::default()
3325                    },
3326                )])
3327            })
3328            .next()
3329            .await;
3330
3331        // Toggle code actions and wait for them to display.
3332        editor_b.update(cx_b, |editor, cx| {
3333            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3334        });
3335        editor_b
3336            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3337            .await;
3338
3339        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3340
3341        // Confirming the code action will trigger a resolve request.
3342        let confirm_action = workspace_b
3343            .update(cx_b, |workspace, cx| {
3344                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3345            })
3346            .unwrap();
3347        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3348            lsp::CodeAction {
3349                title: "Inline into all callers".to_string(),
3350                edit: Some(lsp::WorkspaceEdit {
3351                    changes: Some(
3352                        [
3353                            (
3354                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3355                                vec![lsp::TextEdit::new(
3356                                    lsp::Range::new(
3357                                        lsp::Position::new(1, 22),
3358                                        lsp::Position::new(1, 34),
3359                                    ),
3360                                    "4".to_string(),
3361                                )],
3362                            ),
3363                            (
3364                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3365                                vec![lsp::TextEdit::new(
3366                                    lsp::Range::new(
3367                                        lsp::Position::new(0, 0),
3368                                        lsp::Position::new(0, 27),
3369                                    ),
3370                                    "".to_string(),
3371                                )],
3372                            ),
3373                        ]
3374                        .into_iter()
3375                        .collect(),
3376                    ),
3377                    ..Default::default()
3378                }),
3379                ..Default::default()
3380            }
3381        });
3382
3383        // After the action is confirmed, an editor containing both modified files is opened.
3384        confirm_action.await.unwrap();
3385        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3386            workspace
3387                .active_item(cx)
3388                .unwrap()
3389                .downcast::<Editor>()
3390                .unwrap()
3391        });
3392        code_action_editor.update(cx_b, |editor, cx| {
3393            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3394            editor.undo(&Undo, cx);
3395            assert_eq!(
3396                editor.text(cx),
3397                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3398            );
3399            editor.redo(&Redo, cx);
3400            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3401        });
3402    }
3403
3404    #[gpui::test(iterations = 10)]
3405    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3406        cx_a.foreground().forbid_parking();
3407        let mut lang_registry = Arc::new(LanguageRegistry::test());
3408        let fs = FakeFs::new(cx_a.background());
3409        cx_b.update(|cx| editor::init(cx));
3410
3411        // Set up a fake language server.
3412        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3413        Arc::get_mut(&mut lang_registry)
3414            .unwrap()
3415            .add(Arc::new(Language::new(
3416                LanguageConfig {
3417                    name: "Rust".into(),
3418                    path_suffixes: vec!["rs".to_string()],
3419                    language_server: Some(language_server_config),
3420                    ..Default::default()
3421                },
3422                Some(tree_sitter_rust::language()),
3423            )));
3424
3425        // Connect to a server as 2 clients.
3426        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3427        let client_a = server.create_client(cx_a, "user_a").await;
3428        let client_b = server.create_client(cx_b, "user_b").await;
3429
3430        // Share a project as client A
3431        fs.insert_tree(
3432            "/dir",
3433            json!({
3434                ".zed.toml": r#"collaborators = ["user_b"]"#,
3435                "one.rs": "const ONE: usize = 1;",
3436                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3437            }),
3438        )
3439        .await;
3440        let project_a = cx_a.update(|cx| {
3441            Project::local(
3442                client_a.clone(),
3443                client_a.user_store.clone(),
3444                lang_registry.clone(),
3445                fs.clone(),
3446                cx,
3447            )
3448        });
3449        let (worktree_a, _) = project_a
3450            .update(cx_a, |p, cx| {
3451                p.find_or_create_local_worktree("/dir", true, cx)
3452            })
3453            .await
3454            .unwrap();
3455        worktree_a
3456            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3457            .await;
3458        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3459        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3460        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3461
3462        // Join the worktree as client B.
3463        let project_b = Project::remote(
3464            project_id,
3465            client_b.clone(),
3466            client_b.user_store.clone(),
3467            lang_registry.clone(),
3468            fs.clone(),
3469            &mut cx_b.to_async(),
3470        )
3471        .await
3472        .unwrap();
3473        let mut params = cx_b.update(WorkspaceParams::test);
3474        params.languages = lang_registry.clone();
3475        params.client = client_b.client.clone();
3476        params.user_store = client_b.user_store.clone();
3477        params.project = project_b;
3478
3479        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3480        let editor_b = workspace_b
3481            .update(cx_b, |workspace, cx| {
3482                workspace.open_path((worktree_id, "one.rs"), cx)
3483            })
3484            .await
3485            .unwrap()
3486            .downcast::<Editor>()
3487            .unwrap();
3488        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3489
3490        // Move cursor to a location that can be renamed.
3491        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3492            editor.select_ranges([7..7], None, cx);
3493            editor.rename(&Rename, cx).unwrap()
3494        });
3495
3496        fake_language_server
3497            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3498                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3499                assert_eq!(params.position, lsp::Position::new(0, 7));
3500                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3501                    lsp::Position::new(0, 6),
3502                    lsp::Position::new(0, 9),
3503                )))
3504            })
3505            .next()
3506            .await
3507            .unwrap();
3508        prepare_rename.await.unwrap();
3509        editor_b.update(cx_b, |editor, cx| {
3510            let rename = editor.pending_rename().unwrap();
3511            let buffer = editor.buffer().read(cx).snapshot(cx);
3512            assert_eq!(
3513                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3514                6..9
3515            );
3516            rename.editor.update(cx, |rename_editor, cx| {
3517                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3518                    rename_buffer.edit([0..3], "THREE", cx);
3519                });
3520            });
3521        });
3522
3523        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3524            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3525        });
3526        fake_language_server
3527            .handle_request::<lsp::request::Rename, _>(|params, _| {
3528                assert_eq!(
3529                    params.text_document_position.text_document.uri.as_str(),
3530                    "file:///dir/one.rs"
3531                );
3532                assert_eq!(
3533                    params.text_document_position.position,
3534                    lsp::Position::new(0, 6)
3535                );
3536                assert_eq!(params.new_name, "THREE");
3537                Some(lsp::WorkspaceEdit {
3538                    changes: Some(
3539                        [
3540                            (
3541                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3542                                vec![lsp::TextEdit::new(
3543                                    lsp::Range::new(
3544                                        lsp::Position::new(0, 6),
3545                                        lsp::Position::new(0, 9),
3546                                    ),
3547                                    "THREE".to_string(),
3548                                )],
3549                            ),
3550                            (
3551                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3552                                vec![
3553                                    lsp::TextEdit::new(
3554                                        lsp::Range::new(
3555                                            lsp::Position::new(0, 24),
3556                                            lsp::Position::new(0, 27),
3557                                        ),
3558                                        "THREE".to_string(),
3559                                    ),
3560                                    lsp::TextEdit::new(
3561                                        lsp::Range::new(
3562                                            lsp::Position::new(0, 35),
3563                                            lsp::Position::new(0, 38),
3564                                        ),
3565                                        "THREE".to_string(),
3566                                    ),
3567                                ],
3568                            ),
3569                        ]
3570                        .into_iter()
3571                        .collect(),
3572                    ),
3573                    ..Default::default()
3574                })
3575            })
3576            .next()
3577            .await
3578            .unwrap();
3579        confirm_rename.await.unwrap();
3580
3581        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3582            workspace
3583                .active_item(cx)
3584                .unwrap()
3585                .downcast::<Editor>()
3586                .unwrap()
3587        });
3588        rename_editor.update(cx_b, |editor, cx| {
3589            assert_eq!(
3590                editor.text(cx),
3591                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3592            );
3593            editor.undo(&Undo, cx);
3594            assert_eq!(
3595                editor.text(cx),
3596                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3597            );
3598            editor.redo(&Redo, cx);
3599            assert_eq!(
3600                editor.text(cx),
3601                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3602            );
3603        });
3604
3605        // Ensure temporary rename edits cannot be undone/redone.
3606        editor_b.update(cx_b, |editor, cx| {
3607            editor.undo(&Undo, cx);
3608            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3609            editor.undo(&Undo, cx);
3610            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3611            editor.redo(&Redo, cx);
3612            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3613        })
3614    }
3615
3616    #[gpui::test(iterations = 10)]
3617    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3618        cx_a.foreground().forbid_parking();
3619
3620        // Connect to a server as 2 clients.
3621        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3622        let client_a = server.create_client(cx_a, "user_a").await;
3623        let client_b = server.create_client(cx_b, "user_b").await;
3624
3625        // Create an org that includes these 2 users.
3626        let db = &server.app_state.db;
3627        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3628        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3629            .await
3630            .unwrap();
3631        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3632            .await
3633            .unwrap();
3634
3635        // Create a channel that includes all the users.
3636        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3637        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3638            .await
3639            .unwrap();
3640        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3641            .await
3642            .unwrap();
3643        db.create_channel_message(
3644            channel_id,
3645            client_b.current_user_id(&cx_b),
3646            "hello A, it's B.",
3647            OffsetDateTime::now_utc(),
3648            1,
3649        )
3650        .await
3651        .unwrap();
3652
3653        let channels_a = cx_a
3654            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3655        channels_a
3656            .condition(cx_a, |list, _| list.available_channels().is_some())
3657            .await;
3658        channels_a.read_with(cx_a, |list, _| {
3659            assert_eq!(
3660                list.available_channels().unwrap(),
3661                &[ChannelDetails {
3662                    id: channel_id.to_proto(),
3663                    name: "test-channel".to_string()
3664                }]
3665            )
3666        });
3667        let channel_a = channels_a.update(cx_a, |this, cx| {
3668            this.get_channel(channel_id.to_proto(), cx).unwrap()
3669        });
3670        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3671        channel_a
3672            .condition(&cx_a, |channel, _| {
3673                channel_messages(channel)
3674                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3675            })
3676            .await;
3677
3678        let channels_b = cx_b
3679            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3680        channels_b
3681            .condition(cx_b, |list, _| list.available_channels().is_some())
3682            .await;
3683        channels_b.read_with(cx_b, |list, _| {
3684            assert_eq!(
3685                list.available_channels().unwrap(),
3686                &[ChannelDetails {
3687                    id: channel_id.to_proto(),
3688                    name: "test-channel".to_string()
3689                }]
3690            )
3691        });
3692
3693        let channel_b = channels_b.update(cx_b, |this, cx| {
3694            this.get_channel(channel_id.to_proto(), cx).unwrap()
3695        });
3696        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3697        channel_b
3698            .condition(&cx_b, |channel, _| {
3699                channel_messages(channel)
3700                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3701            })
3702            .await;
3703
3704        channel_a
3705            .update(cx_a, |channel, cx| {
3706                channel
3707                    .send_message("oh, hi B.".to_string(), cx)
3708                    .unwrap()
3709                    .detach();
3710                let task = channel.send_message("sup".to_string(), cx).unwrap();
3711                assert_eq!(
3712                    channel_messages(channel),
3713                    &[
3714                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3715                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3716                        ("user_a".to_string(), "sup".to_string(), true)
3717                    ]
3718                );
3719                task
3720            })
3721            .await
3722            .unwrap();
3723
3724        channel_b
3725            .condition(&cx_b, |channel, _| {
3726                channel_messages(channel)
3727                    == [
3728                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3729                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3730                        ("user_a".to_string(), "sup".to_string(), false),
3731                    ]
3732            })
3733            .await;
3734
3735        assert_eq!(
3736            server
3737                .state()
3738                .await
3739                .channel(channel_id)
3740                .unwrap()
3741                .connection_ids
3742                .len(),
3743            2
3744        );
3745        cx_b.update(|_| drop(channel_b));
3746        server
3747            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3748            .await;
3749
3750        cx_a.update(|_| drop(channel_a));
3751        server
3752            .condition(|state| state.channel(channel_id).is_none())
3753            .await;
3754    }
3755
3756    #[gpui::test(iterations = 10)]
3757    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3758        cx_a.foreground().forbid_parking();
3759
3760        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3761        let client_a = server.create_client(cx_a, "user_a").await;
3762
3763        let db = &server.app_state.db;
3764        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3765        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3766        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3767            .await
3768            .unwrap();
3769        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3770            .await
3771            .unwrap();
3772
3773        let channels_a = cx_a
3774            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3775        channels_a
3776            .condition(cx_a, |list, _| list.available_channels().is_some())
3777            .await;
3778        let channel_a = channels_a.update(cx_a, |this, cx| {
3779            this.get_channel(channel_id.to_proto(), cx).unwrap()
3780        });
3781
3782        // Messages aren't allowed to be too long.
3783        channel_a
3784            .update(cx_a, |channel, cx| {
3785                let long_body = "this is long.\n".repeat(1024);
3786                channel.send_message(long_body, cx).unwrap()
3787            })
3788            .await
3789            .unwrap_err();
3790
3791        // Messages aren't allowed to be blank.
3792        channel_a.update(cx_a, |channel, cx| {
3793            channel.send_message(String::new(), cx).unwrap_err()
3794        });
3795
3796        // Leading and trailing whitespace are trimmed.
3797        channel_a
3798            .update(cx_a, |channel, cx| {
3799                channel
3800                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3801                    .unwrap()
3802            })
3803            .await
3804            .unwrap();
3805        assert_eq!(
3806            db.get_channel_messages(channel_id, 10, None)
3807                .await
3808                .unwrap()
3809                .iter()
3810                .map(|m| &m.body)
3811                .collect::<Vec<_>>(),
3812            &["surrounded by whitespace"]
3813        );
3814    }
3815
3816    #[gpui::test(iterations = 10)]
3817    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3818        cx_a.foreground().forbid_parking();
3819
3820        // Connect to a server as 2 clients.
3821        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3822        let client_a = server.create_client(cx_a, "user_a").await;
3823        let client_b = server.create_client(cx_b, "user_b").await;
3824        let mut status_b = client_b.status();
3825
3826        // Create an org that includes these 2 users.
3827        let db = &server.app_state.db;
3828        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3829        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3830            .await
3831            .unwrap();
3832        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3833            .await
3834            .unwrap();
3835
3836        // Create a channel that includes all the users.
3837        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3838        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3839            .await
3840            .unwrap();
3841        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3842            .await
3843            .unwrap();
3844        db.create_channel_message(
3845            channel_id,
3846            client_b.current_user_id(&cx_b),
3847            "hello A, it's B.",
3848            OffsetDateTime::now_utc(),
3849            2,
3850        )
3851        .await
3852        .unwrap();
3853
3854        let channels_a = cx_a
3855            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3856        channels_a
3857            .condition(cx_a, |list, _| list.available_channels().is_some())
3858            .await;
3859
3860        channels_a.read_with(cx_a, |list, _| {
3861            assert_eq!(
3862                list.available_channels().unwrap(),
3863                &[ChannelDetails {
3864                    id: channel_id.to_proto(),
3865                    name: "test-channel".to_string()
3866                }]
3867            )
3868        });
3869        let channel_a = channels_a.update(cx_a, |this, cx| {
3870            this.get_channel(channel_id.to_proto(), cx).unwrap()
3871        });
3872        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3873        channel_a
3874            .condition(&cx_a, |channel, _| {
3875                channel_messages(channel)
3876                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3877            })
3878            .await;
3879
3880        let channels_b = cx_b
3881            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3882        channels_b
3883            .condition(cx_b, |list, _| list.available_channels().is_some())
3884            .await;
3885        channels_b.read_with(cx_b, |list, _| {
3886            assert_eq!(
3887                list.available_channels().unwrap(),
3888                &[ChannelDetails {
3889                    id: channel_id.to_proto(),
3890                    name: "test-channel".to_string()
3891                }]
3892            )
3893        });
3894
3895        let channel_b = channels_b.update(cx_b, |this, cx| {
3896            this.get_channel(channel_id.to_proto(), cx).unwrap()
3897        });
3898        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3899        channel_b
3900            .condition(&cx_b, |channel, _| {
3901                channel_messages(channel)
3902                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3903            })
3904            .await;
3905
3906        // Disconnect client B, ensuring we can still access its cached channel data.
3907        server.forbid_connections();
3908        server.disconnect_client(client_b.current_user_id(&cx_b));
3909        cx_b.foreground().advance_clock(Duration::from_secs(3));
3910        while !matches!(
3911            status_b.next().await,
3912            Some(client::Status::ReconnectionError { .. })
3913        ) {}
3914
3915        channels_b.read_with(cx_b, |channels, _| {
3916            assert_eq!(
3917                channels.available_channels().unwrap(),
3918                [ChannelDetails {
3919                    id: channel_id.to_proto(),
3920                    name: "test-channel".to_string()
3921                }]
3922            )
3923        });
3924        channel_b.read_with(cx_b, |channel, _| {
3925            assert_eq!(
3926                channel_messages(channel),
3927                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3928            )
3929        });
3930
3931        // Send a message from client B while it is disconnected.
3932        channel_b
3933            .update(cx_b, |channel, cx| {
3934                let task = channel
3935                    .send_message("can you see this?".to_string(), cx)
3936                    .unwrap();
3937                assert_eq!(
3938                    channel_messages(channel),
3939                    &[
3940                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3941                        ("user_b".to_string(), "can you see this?".to_string(), true)
3942                    ]
3943                );
3944                task
3945            })
3946            .await
3947            .unwrap_err();
3948
3949        // Send a message from client A while B is disconnected.
3950        channel_a
3951            .update(cx_a, |channel, cx| {
3952                channel
3953                    .send_message("oh, hi B.".to_string(), cx)
3954                    .unwrap()
3955                    .detach();
3956                let task = channel.send_message("sup".to_string(), cx).unwrap();
3957                assert_eq!(
3958                    channel_messages(channel),
3959                    &[
3960                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3961                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3962                        ("user_a".to_string(), "sup".to_string(), true)
3963                    ]
3964                );
3965                task
3966            })
3967            .await
3968            .unwrap();
3969
3970        // Give client B a chance to reconnect.
3971        server.allow_connections();
3972        cx_b.foreground().advance_clock(Duration::from_secs(10));
3973
3974        // Verify that B sees the new messages upon reconnection, as well as the message client B
3975        // sent while offline.
3976        channel_b
3977            .condition(&cx_b, |channel, _| {
3978                channel_messages(channel)
3979                    == [
3980                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3981                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3982                        ("user_a".to_string(), "sup".to_string(), false),
3983                        ("user_b".to_string(), "can you see this?".to_string(), false),
3984                    ]
3985            })
3986            .await;
3987
3988        // Ensure client A and B can communicate normally after reconnection.
3989        channel_a
3990            .update(cx_a, |channel, cx| {
3991                channel.send_message("you online?".to_string(), cx).unwrap()
3992            })
3993            .await
3994            .unwrap();
3995        channel_b
3996            .condition(&cx_b, |channel, _| {
3997                channel_messages(channel)
3998                    == [
3999                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4000                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4001                        ("user_a".to_string(), "sup".to_string(), false),
4002                        ("user_b".to_string(), "can you see this?".to_string(), false),
4003                        ("user_a".to_string(), "you online?".to_string(), false),
4004                    ]
4005            })
4006            .await;
4007
4008        channel_b
4009            .update(cx_b, |channel, cx| {
4010                channel.send_message("yep".to_string(), cx).unwrap()
4011            })
4012            .await
4013            .unwrap();
4014        channel_a
4015            .condition(&cx_a, |channel, _| {
4016                channel_messages(channel)
4017                    == [
4018                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4019                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4020                        ("user_a".to_string(), "sup".to_string(), false),
4021                        ("user_b".to_string(), "can you see this?".to_string(), false),
4022                        ("user_a".to_string(), "you online?".to_string(), false),
4023                        ("user_b".to_string(), "yep".to_string(), false),
4024                    ]
4025            })
4026            .await;
4027    }
4028
4029    #[gpui::test(iterations = 10)]
4030    async fn test_contacts(
4031        cx_a: &mut TestAppContext,
4032        cx_b: &mut TestAppContext,
4033        cx_c: &mut TestAppContext,
4034    ) {
4035        cx_a.foreground().forbid_parking();
4036        let lang_registry = Arc::new(LanguageRegistry::test());
4037        let fs = FakeFs::new(cx_a.background());
4038
4039        // Connect to a server as 3 clients.
4040        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4041        let client_a = server.create_client(cx_a, "user_a").await;
4042        let client_b = server.create_client(cx_b, "user_b").await;
4043        let client_c = server.create_client(cx_c, "user_c").await;
4044
4045        // Share a worktree as client A.
4046        fs.insert_tree(
4047            "/a",
4048            json!({
4049                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4050            }),
4051        )
4052        .await;
4053
4054        let project_a = cx_a.update(|cx| {
4055            Project::local(
4056                client_a.clone(),
4057                client_a.user_store.clone(),
4058                lang_registry.clone(),
4059                fs.clone(),
4060                cx,
4061            )
4062        });
4063        let (worktree_a, _) = project_a
4064            .update(cx_a, |p, cx| {
4065                p.find_or_create_local_worktree("/a", true, cx)
4066            })
4067            .await
4068            .unwrap();
4069        worktree_a
4070            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4071            .await;
4072
4073        client_a
4074            .user_store
4075            .condition(&cx_a, |user_store, _| {
4076                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4077            })
4078            .await;
4079        client_b
4080            .user_store
4081            .condition(&cx_b, |user_store, _| {
4082                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4083            })
4084            .await;
4085        client_c
4086            .user_store
4087            .condition(&cx_c, |user_store, _| {
4088                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4089            })
4090            .await;
4091
4092        let project_id = project_a
4093            .update(cx_a, |project, _| project.next_remote_id())
4094            .await;
4095        project_a
4096            .update(cx_a, |project, cx| project.share(cx))
4097            .await
4098            .unwrap();
4099
4100        let _project_b = Project::remote(
4101            project_id,
4102            client_b.clone(),
4103            client_b.user_store.clone(),
4104            lang_registry.clone(),
4105            fs.clone(),
4106            &mut cx_b.to_async(),
4107        )
4108        .await
4109        .unwrap();
4110
4111        client_a
4112            .user_store
4113            .condition(&cx_a, |user_store, _| {
4114                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4115            })
4116            .await;
4117        client_b
4118            .user_store
4119            .condition(&cx_b, |user_store, _| {
4120                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4121            })
4122            .await;
4123        client_c
4124            .user_store
4125            .condition(&cx_c, |user_store, _| {
4126                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4127            })
4128            .await;
4129
4130        project_a
4131            .condition(&cx_a, |project, _| {
4132                project.collaborators().contains_key(&client_b.peer_id)
4133            })
4134            .await;
4135
4136        cx_a.update(move |_| drop(project_a));
4137        client_a
4138            .user_store
4139            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4140            .await;
4141        client_b
4142            .user_store
4143            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4144            .await;
4145        client_c
4146            .user_store
4147            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4148            .await;
4149
4150        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4151            user_store
4152                .contacts()
4153                .iter()
4154                .map(|contact| {
4155                    let worktrees = contact
4156                        .projects
4157                        .iter()
4158                        .map(|p| {
4159                            (
4160                                p.worktree_root_names[0].as_str(),
4161                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4162                            )
4163                        })
4164                        .collect();
4165                    (contact.user.github_login.as_str(), worktrees)
4166                })
4167                .collect()
4168        }
4169    }
4170
4171    #[gpui::test(iterations = 10)]
4172    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4173        cx_a.foreground().forbid_parking();
4174        let fs = FakeFs::new(cx_a.background());
4175
4176        // 2 clients connect to a server.
4177        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4178        let mut client_a = server.create_client(cx_a, "user_a").await;
4179        let mut client_b = server.create_client(cx_b, "user_b").await;
4180        cx_a.update(editor::init);
4181        cx_b.update(editor::init);
4182
4183        // Client A shares a project.
4184        fs.insert_tree(
4185            "/a",
4186            json!({
4187                ".zed.toml": r#"collaborators = ["user_b"]"#,
4188                "1.txt": "one",
4189                "2.txt": "two",
4190                "3.txt": "three",
4191            }),
4192        )
4193        .await;
4194        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4195        project_a
4196            .update(cx_a, |project, cx| project.share(cx))
4197            .await
4198            .unwrap();
4199
4200        // Client B joins the project.
4201        let project_b = client_b
4202            .build_remote_project(
4203                project_a
4204                    .read_with(cx_a, |project, _| project.remote_id())
4205                    .unwrap(),
4206                cx_b,
4207            )
4208            .await;
4209
4210        // Client A opens some editors.
4211        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4212        let editor_a1 = workspace_a
4213            .update(cx_a, |workspace, cx| {
4214                workspace.open_path((worktree_id, "1.txt"), cx)
4215            })
4216            .await
4217            .unwrap();
4218        let editor_a2 = workspace_a
4219            .update(cx_a, |workspace, cx| {
4220                workspace.open_path((worktree_id, "2.txt"), cx)
4221            })
4222            .await
4223            .unwrap();
4224
4225        // Client B opens an editor.
4226        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4227        let editor_b1 = workspace_b
4228            .update(cx_b, |workspace, cx| {
4229                workspace.open_path((worktree_id, "1.txt"), cx)
4230            })
4231            .await
4232            .unwrap();
4233
4234        // Client B starts following client A.
4235        workspace_b
4236            .update(cx_b, |workspace, cx| {
4237                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4238                let leader_id = project_b.read(cx).collaborators().keys().next().unwrap();
4239                workspace.follow(*leader_id, cx)
4240            })
4241            .await
4242            .unwrap();
4243    }
4244
4245    #[gpui::test(iterations = 100)]
4246    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4247        cx.foreground().forbid_parking();
4248        let max_peers = env::var("MAX_PEERS")
4249            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4250            .unwrap_or(5);
4251        let max_operations = env::var("OPERATIONS")
4252            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4253            .unwrap_or(10);
4254
4255        let rng = Arc::new(Mutex::new(rng));
4256
4257        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4258        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4259
4260        let fs = FakeFs::new(cx.background());
4261        fs.insert_tree(
4262            "/_collab",
4263            json!({
4264                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4265            }),
4266        )
4267        .await;
4268
4269        let operations = Rc::new(Cell::new(0));
4270        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4271        let mut clients = Vec::new();
4272
4273        let mut next_entity_id = 100000;
4274        let mut host_cx = TestAppContext::new(
4275            cx.foreground_platform(),
4276            cx.platform(),
4277            cx.foreground(),
4278            cx.background(),
4279            cx.font_cache(),
4280            cx.leak_detector(),
4281            next_entity_id,
4282        );
4283        let host = server.create_client(&mut host_cx, "host").await;
4284        let host_project = host_cx.update(|cx| {
4285            Project::local(
4286                host.client.clone(),
4287                host.user_store.clone(),
4288                Arc::new(LanguageRegistry::test()),
4289                fs.clone(),
4290                cx,
4291            )
4292        });
4293        let host_project_id = host_project
4294            .update(&mut host_cx, |p, _| p.next_remote_id())
4295            .await;
4296
4297        let (collab_worktree, _) = host_project
4298            .update(&mut host_cx, |project, cx| {
4299                project.find_or_create_local_worktree("/_collab", true, cx)
4300            })
4301            .await
4302            .unwrap();
4303        collab_worktree
4304            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4305            .await;
4306        host_project
4307            .update(&mut host_cx, |project, cx| project.share(cx))
4308            .await
4309            .unwrap();
4310
4311        clients.push(cx.foreground().spawn(host.simulate_host(
4312            host_project,
4313            language_server_config,
4314            operations.clone(),
4315            max_operations,
4316            rng.clone(),
4317            host_cx,
4318        )));
4319
4320        while operations.get() < max_operations {
4321            cx.background().simulate_random_delay().await;
4322            if clients.len() >= max_peers {
4323                break;
4324            } else if rng.lock().gen_bool(0.05) {
4325                operations.set(operations.get() + 1);
4326
4327                let guest_id = clients.len();
4328                log::info!("Adding guest {}", guest_id);
4329                next_entity_id += 100000;
4330                let mut guest_cx = TestAppContext::new(
4331                    cx.foreground_platform(),
4332                    cx.platform(),
4333                    cx.foreground(),
4334                    cx.background(),
4335                    cx.font_cache(),
4336                    cx.leak_detector(),
4337                    next_entity_id,
4338                );
4339                let guest = server
4340                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4341                    .await;
4342                let guest_project = Project::remote(
4343                    host_project_id,
4344                    guest.client.clone(),
4345                    guest.user_store.clone(),
4346                    guest_lang_registry.clone(),
4347                    FakeFs::new(cx.background()),
4348                    &mut guest_cx.to_async(),
4349                )
4350                .await
4351                .unwrap();
4352                clients.push(cx.foreground().spawn(guest.simulate_guest(
4353                    guest_id,
4354                    guest_project,
4355                    operations.clone(),
4356                    max_operations,
4357                    rng.clone(),
4358                    guest_cx,
4359                )));
4360
4361                log::info!("Guest {} added", guest_id);
4362            }
4363        }
4364
4365        let mut clients = futures::future::join_all(clients).await;
4366        cx.foreground().run_until_parked();
4367
4368        let (host_client, mut host_cx) = clients.remove(0);
4369        let host_project = host_client.project.as_ref().unwrap();
4370        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4371            project
4372                .worktrees(cx)
4373                .map(|worktree| {
4374                    let snapshot = worktree.read(cx).snapshot();
4375                    (snapshot.id(), snapshot)
4376                })
4377                .collect::<BTreeMap<_, _>>()
4378        });
4379
4380        host_client
4381            .project
4382            .as_ref()
4383            .unwrap()
4384            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4385
4386        for (guest_client, mut guest_cx) in clients.into_iter() {
4387            let guest_id = guest_client.client.id();
4388            let worktree_snapshots =
4389                guest_client
4390                    .project
4391                    .as_ref()
4392                    .unwrap()
4393                    .read_with(&guest_cx, |project, cx| {
4394                        project
4395                            .worktrees(cx)
4396                            .map(|worktree| {
4397                                let worktree = worktree.read(cx);
4398                                (worktree.id(), worktree.snapshot())
4399                            })
4400                            .collect::<BTreeMap<_, _>>()
4401                    });
4402
4403            assert_eq!(
4404                worktree_snapshots.keys().collect::<Vec<_>>(),
4405                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4406                "guest {} has different worktrees than the host",
4407                guest_id
4408            );
4409            for (id, host_snapshot) in &host_worktree_snapshots {
4410                let guest_snapshot = &worktree_snapshots[id];
4411                assert_eq!(
4412                    guest_snapshot.root_name(),
4413                    host_snapshot.root_name(),
4414                    "guest {} has different root name than the host for worktree {}",
4415                    guest_id,
4416                    id
4417                );
4418                assert_eq!(
4419                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4420                    host_snapshot.entries(false).collect::<Vec<_>>(),
4421                    "guest {} has different snapshot than the host for worktree {}",
4422                    guest_id,
4423                    id
4424                );
4425            }
4426
4427            guest_client
4428                .project
4429                .as_ref()
4430                .unwrap()
4431                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4432
4433            for guest_buffer in &guest_client.buffers {
4434                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4435                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4436                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4437                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4438                        guest_id, guest_client.peer_id, buffer_id
4439                    ))
4440                });
4441                let path = host_buffer
4442                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4443
4444                assert_eq!(
4445                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4446                    0,
4447                    "guest {}, buffer {}, path {:?} has deferred operations",
4448                    guest_id,
4449                    buffer_id,
4450                    path,
4451                );
4452                assert_eq!(
4453                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4454                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4455                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4456                    guest_id,
4457                    buffer_id,
4458                    path
4459                );
4460            }
4461
4462            guest_cx.update(|_| drop(guest_client));
4463        }
4464
4465        host_cx.update(|_| drop(host_client));
4466    }
4467
4468    struct TestServer {
4469        peer: Arc<Peer>,
4470        app_state: Arc<AppState>,
4471        server: Arc<Server>,
4472        foreground: Rc<executor::Foreground>,
4473        notifications: mpsc::UnboundedReceiver<()>,
4474        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4475        forbid_connections: Arc<AtomicBool>,
4476        _test_db: TestDb,
4477    }
4478
4479    impl TestServer {
4480        async fn start(
4481            foreground: Rc<executor::Foreground>,
4482            background: Arc<executor::Background>,
4483        ) -> Self {
4484            let test_db = TestDb::fake(background);
4485            let app_state = Self::build_app_state(&test_db).await;
4486            let peer = Peer::new();
4487            let notifications = mpsc::unbounded();
4488            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4489            Self {
4490                peer,
4491                app_state,
4492                server,
4493                foreground,
4494                notifications: notifications.1,
4495                connection_killers: Default::default(),
4496                forbid_connections: Default::default(),
4497                _test_db: test_db,
4498            }
4499        }
4500
4501        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4502            cx.update(|cx| {
4503                let settings = Settings::test(cx);
4504                cx.set_global(settings);
4505            });
4506
4507            let http = FakeHttpClient::with_404_response();
4508            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4509            let client_name = name.to_string();
4510            let mut client = Client::new(http.clone());
4511            let server = self.server.clone();
4512            let connection_killers = self.connection_killers.clone();
4513            let forbid_connections = self.forbid_connections.clone();
4514            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4515
4516            Arc::get_mut(&mut client)
4517                .unwrap()
4518                .override_authenticate(move |cx| {
4519                    cx.spawn(|_| async move {
4520                        let access_token = "the-token".to_string();
4521                        Ok(Credentials {
4522                            user_id: user_id.0 as u64,
4523                            access_token,
4524                        })
4525                    })
4526                })
4527                .override_establish_connection(move |credentials, cx| {
4528                    assert_eq!(credentials.user_id, user_id.0 as u64);
4529                    assert_eq!(credentials.access_token, "the-token");
4530
4531                    let server = server.clone();
4532                    let connection_killers = connection_killers.clone();
4533                    let forbid_connections = forbid_connections.clone();
4534                    let client_name = client_name.clone();
4535                    let connection_id_tx = connection_id_tx.clone();
4536                    cx.spawn(move |cx| async move {
4537                        if forbid_connections.load(SeqCst) {
4538                            Err(EstablishConnectionError::other(anyhow!(
4539                                "server is forbidding connections"
4540                            )))
4541                        } else {
4542                            let (client_conn, server_conn, kill_conn) =
4543                                Connection::in_memory(cx.background());
4544                            connection_killers.lock().insert(user_id, kill_conn);
4545                            cx.background()
4546                                .spawn(server.handle_connection(
4547                                    server_conn,
4548                                    client_name,
4549                                    user_id,
4550                                    Some(connection_id_tx),
4551                                    cx.background(),
4552                                ))
4553                                .detach();
4554                            Ok(client_conn)
4555                        }
4556                    })
4557                });
4558
4559            client
4560                .authenticate_and_connect(&cx.to_async())
4561                .await
4562                .unwrap();
4563
4564            Channel::init(&client);
4565            Project::init(&client);
4566            cx.update(|cx| {
4567                workspace::init(&client, cx);
4568            });
4569
4570            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4571            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4572
4573            let client = TestClient {
4574                client,
4575                peer_id,
4576                user_store,
4577                language_registry: Arc::new(LanguageRegistry::test()),
4578                project: Default::default(),
4579                buffers: Default::default(),
4580            };
4581            client.wait_for_current_user(cx).await;
4582            client
4583        }
4584
4585        fn disconnect_client(&self, user_id: UserId) {
4586            self.connection_killers.lock().remove(&user_id);
4587        }
4588
4589        fn forbid_connections(&self) {
4590            self.forbid_connections.store(true, SeqCst);
4591        }
4592
4593        fn allow_connections(&self) {
4594            self.forbid_connections.store(false, SeqCst);
4595        }
4596
4597        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4598            let mut config = Config::default();
4599            config.session_secret = "a".repeat(32);
4600            config.database_url = test_db.url.clone();
4601            let github_client = github::AppClient::test();
4602            Arc::new(AppState {
4603                db: test_db.db().clone(),
4604                handlebars: Default::default(),
4605                auth_client: auth::build_client("", ""),
4606                repo_client: github::RepoClient::test(&github_client),
4607                github_client,
4608                config,
4609            })
4610        }
4611
4612        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4613            self.server.store.read()
4614        }
4615
4616        async fn condition<F>(&mut self, mut predicate: F)
4617        where
4618            F: FnMut(&Store) -> bool,
4619        {
4620            async_std::future::timeout(Duration::from_millis(500), async {
4621                while !(predicate)(&*self.server.store.read()) {
4622                    self.foreground.start_waiting();
4623                    self.notifications.next().await;
4624                    self.foreground.finish_waiting();
4625                }
4626            })
4627            .await
4628            .expect("condition timed out");
4629        }
4630    }
4631
4632    impl Drop for TestServer {
4633        fn drop(&mut self) {
4634            self.peer.reset();
4635        }
4636    }
4637
4638    struct TestClient {
4639        client: Arc<Client>,
4640        pub peer_id: PeerId,
4641        pub user_store: ModelHandle<UserStore>,
4642        language_registry: Arc<LanguageRegistry>,
4643        project: Option<ModelHandle<Project>>,
4644        buffers: HashSet<ModelHandle<language::Buffer>>,
4645    }
4646
4647    impl Deref for TestClient {
4648        type Target = Arc<Client>;
4649
4650        fn deref(&self) -> &Self::Target {
4651            &self.client
4652        }
4653    }
4654
4655    impl TestClient {
4656        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4657            UserId::from_proto(
4658                self.user_store
4659                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4660            )
4661        }
4662
4663        async fn wait_for_current_user(&self, cx: &TestAppContext) {
4664            let mut authed_user = self
4665                .user_store
4666                .read_with(cx, |user_store, _| user_store.watch_current_user());
4667            while authed_user.next().await.unwrap().is_none() {}
4668        }
4669
4670        async fn build_local_project(
4671            &mut self,
4672            fs: Arc<FakeFs>,
4673            root_path: impl AsRef<Path>,
4674            cx: &mut TestAppContext,
4675        ) -> (ModelHandle<Project>, WorktreeId) {
4676            let project = cx.update(|cx| {
4677                Project::local(
4678                    self.client.clone(),
4679                    self.user_store.clone(),
4680                    self.language_registry.clone(),
4681                    fs,
4682                    cx,
4683                )
4684            });
4685            self.project = Some(project.clone());
4686            let (worktree, _) = project
4687                .update(cx, |p, cx| {
4688                    p.find_or_create_local_worktree(root_path, true, cx)
4689                })
4690                .await
4691                .unwrap();
4692            worktree
4693                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
4694                .await;
4695            project
4696                .update(cx, |project, _| project.next_remote_id())
4697                .await;
4698            (project, worktree.read_with(cx, |tree, _| tree.id()))
4699        }
4700
4701        async fn build_remote_project(
4702            &mut self,
4703            project_id: u64,
4704            cx: &mut TestAppContext,
4705        ) -> ModelHandle<Project> {
4706            let project = Project::remote(
4707                project_id,
4708                self.client.clone(),
4709                self.user_store.clone(),
4710                self.language_registry.clone(),
4711                FakeFs::new(cx.background()),
4712                &mut cx.to_async(),
4713            )
4714            .await
4715            .unwrap();
4716            self.project = Some(project.clone());
4717            project
4718        }
4719
4720        fn build_workspace(
4721            &self,
4722            project: &ModelHandle<Project>,
4723            cx: &mut TestAppContext,
4724        ) -> ViewHandle<Workspace> {
4725            let (window_id, _) = cx.add_window(|cx| EmptyView);
4726            cx.add_view(window_id, |cx| {
4727                let fs = project.read(cx).fs().clone();
4728                Workspace::new(
4729                    &WorkspaceParams {
4730                        fs,
4731                        project: project.clone(),
4732                        user_store: self.user_store.clone(),
4733                        languages: self.language_registry.clone(),
4734                        channel_list: cx.add_model(|cx| {
4735                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
4736                        }),
4737                        client: self.client.clone(),
4738                    },
4739                    cx,
4740                )
4741            })
4742        }
4743
4744        fn simulate_host(
4745            mut self,
4746            project: ModelHandle<Project>,
4747            mut language_server_config: LanguageServerConfig,
4748            operations: Rc<Cell<usize>>,
4749            max_operations: usize,
4750            rng: Arc<Mutex<StdRng>>,
4751            mut cx: TestAppContext,
4752        ) -> impl Future<Output = (Self, TestAppContext)> {
4753            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4754
4755            // Set up a fake language server.
4756            language_server_config.set_fake_initializer({
4757                let rng = rng.clone();
4758                let files = files.clone();
4759                let project = project.downgrade();
4760                move |fake_server| {
4761                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4762                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4763                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4764                                range: lsp::Range::new(
4765                                    lsp::Position::new(0, 0),
4766                                    lsp::Position::new(0, 0),
4767                                ),
4768                                new_text: "the-new-text".to_string(),
4769                            })),
4770                            ..Default::default()
4771                        }]))
4772                    });
4773
4774                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4775                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4776                            lsp::CodeAction {
4777                                title: "the-code-action".to_string(),
4778                                ..Default::default()
4779                            },
4780                        )])
4781                    });
4782
4783                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4784                        |params, _| {
4785                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4786                                params.position,
4787                                params.position,
4788                            )))
4789                        },
4790                    );
4791
4792                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4793                        let files = files.clone();
4794                        let rng = rng.clone();
4795                        move |_, _| {
4796                            let files = files.lock();
4797                            let mut rng = rng.lock();
4798                            let count = rng.gen_range::<usize, _>(1..3);
4799                            let files = (0..count)
4800                                .map(|_| files.choose(&mut *rng).unwrap())
4801                                .collect::<Vec<_>>();
4802                            log::info!("LSP: Returning definitions in files {:?}", &files);
4803                            Some(lsp::GotoDefinitionResponse::Array(
4804                                files
4805                                    .into_iter()
4806                                    .map(|file| lsp::Location {
4807                                        uri: lsp::Url::from_file_path(file).unwrap(),
4808                                        range: Default::default(),
4809                                    })
4810                                    .collect(),
4811                            ))
4812                        }
4813                    });
4814
4815                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4816                        let rng = rng.clone();
4817                        let project = project.clone();
4818                        move |params, mut cx| {
4819                            if let Some(project) = project.upgrade(&cx) {
4820                                project.update(&mut cx, |project, cx| {
4821                                    let path = params
4822                                        .text_document_position_params
4823                                        .text_document
4824                                        .uri
4825                                        .to_file_path()
4826                                        .unwrap();
4827                                    let (worktree, relative_path) =
4828                                        project.find_local_worktree(&path, cx)?;
4829                                    let project_path =
4830                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4831                                    let buffer =
4832                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4833
4834                                    let mut highlights = Vec::new();
4835                                    let highlight_count = rng.lock().gen_range(1..=5);
4836                                    let mut prev_end = 0;
4837                                    for _ in 0..highlight_count {
4838                                        let range =
4839                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4840                                        let start = buffer
4841                                            .offset_to_point_utf16(range.start)
4842                                            .to_lsp_position();
4843                                        let end = buffer
4844                                            .offset_to_point_utf16(range.end)
4845                                            .to_lsp_position();
4846                                        highlights.push(lsp::DocumentHighlight {
4847                                            range: lsp::Range::new(start, end),
4848                                            kind: Some(lsp::DocumentHighlightKind::READ),
4849                                        });
4850                                        prev_end = range.end;
4851                                    }
4852                                    Some(highlights)
4853                                })
4854                            } else {
4855                                None
4856                            }
4857                        }
4858                    });
4859                }
4860            });
4861
4862            project.update(&mut cx, |project, _| {
4863                project.languages().add(Arc::new(Language::new(
4864                    LanguageConfig {
4865                        name: "Rust".into(),
4866                        path_suffixes: vec!["rs".to_string()],
4867                        language_server: Some(language_server_config),
4868                        ..Default::default()
4869                    },
4870                    None,
4871                )));
4872            });
4873
4874            async move {
4875                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4876                while operations.get() < max_operations {
4877                    operations.set(operations.get() + 1);
4878
4879                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4880                    match distribution {
4881                        0..=20 if !files.lock().is_empty() => {
4882                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4883                            let mut path = path.as_path();
4884                            while let Some(parent_path) = path.parent() {
4885                                path = parent_path;
4886                                if rng.lock().gen() {
4887                                    break;
4888                                }
4889                            }
4890
4891                            log::info!("Host: find/create local worktree {:?}", path);
4892                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4893                                project.find_or_create_local_worktree(path, true, cx)
4894                            });
4895                            let find_or_create_worktree = async move {
4896                                find_or_create_worktree.await.unwrap();
4897                            };
4898                            if rng.lock().gen() {
4899                                cx.background().spawn(find_or_create_worktree).detach();
4900                            } else {
4901                                find_or_create_worktree.await;
4902                            }
4903                        }
4904                        10..=80 if !files.lock().is_empty() => {
4905                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4906                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4907                                let (worktree, path) = project
4908                                    .update(&mut cx, |project, cx| {
4909                                        project.find_or_create_local_worktree(
4910                                            file.clone(),
4911                                            true,
4912                                            cx,
4913                                        )
4914                                    })
4915                                    .await
4916                                    .unwrap();
4917                                let project_path =
4918                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4919                                log::info!(
4920                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4921                                    file,
4922                                    project_path.0,
4923                                    project_path.1
4924                                );
4925                                let buffer = project
4926                                    .update(&mut cx, |project, cx| {
4927                                        project.open_buffer(project_path, cx)
4928                                    })
4929                                    .await
4930                                    .unwrap();
4931                                self.buffers.insert(buffer.clone());
4932                                buffer
4933                            } else {
4934                                self.buffers
4935                                    .iter()
4936                                    .choose(&mut *rng.lock())
4937                                    .unwrap()
4938                                    .clone()
4939                            };
4940
4941                            if rng.lock().gen_bool(0.1) {
4942                                cx.update(|cx| {
4943                                    log::info!(
4944                                        "Host: dropping buffer {:?}",
4945                                        buffer.read(cx).file().unwrap().full_path(cx)
4946                                    );
4947                                    self.buffers.remove(&buffer);
4948                                    drop(buffer);
4949                                });
4950                            } else {
4951                                buffer.update(&mut cx, |buffer, cx| {
4952                                    log::info!(
4953                                        "Host: updating buffer {:?} ({})",
4954                                        buffer.file().unwrap().full_path(cx),
4955                                        buffer.remote_id()
4956                                    );
4957                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4958                                });
4959                            }
4960                        }
4961                        _ => loop {
4962                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4963                            let mut path = PathBuf::new();
4964                            path.push("/");
4965                            for _ in 0..path_component_count {
4966                                let letter = rng.lock().gen_range(b'a'..=b'z');
4967                                path.push(std::str::from_utf8(&[letter]).unwrap());
4968                            }
4969                            path.set_extension("rs");
4970                            let parent_path = path.parent().unwrap();
4971
4972                            log::info!("Host: creating file {:?}", path,);
4973
4974                            if fs.create_dir(&parent_path).await.is_ok()
4975                                && fs.create_file(&path, Default::default()).await.is_ok()
4976                            {
4977                                files.lock().push(path);
4978                                break;
4979                            } else {
4980                                log::info!("Host: cannot create file");
4981                            }
4982                        },
4983                    }
4984
4985                    cx.background().simulate_random_delay().await;
4986                }
4987
4988                log::info!("Host done");
4989
4990                self.project = Some(project);
4991                (self, cx)
4992            }
4993        }
4994
4995        pub async fn simulate_guest(
4996            mut self,
4997            guest_id: usize,
4998            project: ModelHandle<Project>,
4999            operations: Rc<Cell<usize>>,
5000            max_operations: usize,
5001            rng: Arc<Mutex<StdRng>>,
5002            mut cx: TestAppContext,
5003        ) -> (Self, TestAppContext) {
5004            while operations.get() < max_operations {
5005                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
5006                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
5007                        project
5008                            .worktrees(&cx)
5009                            .filter(|worktree| {
5010                                let worktree = worktree.read(cx);
5011                                worktree.is_visible()
5012                                    && worktree.entries(false).any(|e| e.is_file())
5013                            })
5014                            .choose(&mut *rng.lock())
5015                    }) {
5016                        worktree
5017                    } else {
5018                        cx.background().simulate_random_delay().await;
5019                        continue;
5020                    };
5021
5022                    operations.set(operations.get() + 1);
5023                    let (worktree_root_name, project_path) =
5024                        worktree.read_with(&cx, |worktree, _| {
5025                            let entry = worktree
5026                                .entries(false)
5027                                .filter(|e| e.is_file())
5028                                .choose(&mut *rng.lock())
5029                                .unwrap();
5030                            (
5031                                worktree.root_name().to_string(),
5032                                (worktree.id(), entry.path.clone()),
5033                            )
5034                        });
5035                    log::info!(
5036                        "Guest {}: opening path {:?} in worktree {} ({})",
5037                        guest_id,
5038                        project_path.1,
5039                        project_path.0,
5040                        worktree_root_name,
5041                    );
5042                    let buffer = project
5043                        .update(&mut cx, |project, cx| {
5044                            project.open_buffer(project_path.clone(), cx)
5045                        })
5046                        .await
5047                        .unwrap();
5048                    log::info!(
5049                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
5050                        guest_id,
5051                        project_path.1,
5052                        project_path.0,
5053                        worktree_root_name,
5054                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
5055                    );
5056                    self.buffers.insert(buffer.clone());
5057                    buffer
5058                } else {
5059                    operations.set(operations.get() + 1);
5060
5061                    self.buffers
5062                        .iter()
5063                        .choose(&mut *rng.lock())
5064                        .unwrap()
5065                        .clone()
5066                };
5067
5068                let choice = rng.lock().gen_range(0..100);
5069                match choice {
5070                    0..=9 => {
5071                        cx.update(|cx| {
5072                            log::info!(
5073                                "Guest {}: dropping buffer {:?}",
5074                                guest_id,
5075                                buffer.read(cx).file().unwrap().full_path(cx)
5076                            );
5077                            self.buffers.remove(&buffer);
5078                            drop(buffer);
5079                        });
5080                    }
5081                    10..=19 => {
5082                        let completions = project.update(&mut cx, |project, cx| {
5083                            log::info!(
5084                                "Guest {}: requesting completions for buffer {} ({:?})",
5085                                guest_id,
5086                                buffer.read(cx).remote_id(),
5087                                buffer.read(cx).file().unwrap().full_path(cx)
5088                            );
5089                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5090                            project.completions(&buffer, offset, cx)
5091                        });
5092                        let completions = cx.background().spawn(async move {
5093                            completions.await.expect("completions request failed");
5094                        });
5095                        if rng.lock().gen_bool(0.3) {
5096                            log::info!("Guest {}: detaching completions request", guest_id);
5097                            completions.detach();
5098                        } else {
5099                            completions.await;
5100                        }
5101                    }
5102                    20..=29 => {
5103                        let code_actions = project.update(&mut cx, |project, cx| {
5104                            log::info!(
5105                                "Guest {}: requesting code actions for buffer {} ({:?})",
5106                                guest_id,
5107                                buffer.read(cx).remote_id(),
5108                                buffer.read(cx).file().unwrap().full_path(cx)
5109                            );
5110                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
5111                            project.code_actions(&buffer, range, cx)
5112                        });
5113                        let code_actions = cx.background().spawn(async move {
5114                            code_actions.await.expect("code actions request failed");
5115                        });
5116                        if rng.lock().gen_bool(0.3) {
5117                            log::info!("Guest {}: detaching code actions request", guest_id);
5118                            code_actions.detach();
5119                        } else {
5120                            code_actions.await;
5121                        }
5122                    }
5123                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5124                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5125                            log::info!(
5126                                "Guest {}: saving buffer {} ({:?})",
5127                                guest_id,
5128                                buffer.remote_id(),
5129                                buffer.file().unwrap().full_path(cx)
5130                            );
5131                            (buffer.version(), buffer.save(cx))
5132                        });
5133                        let save = cx.background().spawn(async move {
5134                            let (saved_version, _) = save.await.expect("save request failed");
5135                            assert!(saved_version.observed_all(&requested_version));
5136                        });
5137                        if rng.lock().gen_bool(0.3) {
5138                            log::info!("Guest {}: detaching save request", guest_id);
5139                            save.detach();
5140                        } else {
5141                            save.await;
5142                        }
5143                    }
5144                    40..=44 => {
5145                        let prepare_rename = project.update(&mut cx, |project, cx| {
5146                            log::info!(
5147                                "Guest {}: preparing rename for buffer {} ({:?})",
5148                                guest_id,
5149                                buffer.read(cx).remote_id(),
5150                                buffer.read(cx).file().unwrap().full_path(cx)
5151                            );
5152                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5153                            project.prepare_rename(buffer, offset, cx)
5154                        });
5155                        let prepare_rename = cx.background().spawn(async move {
5156                            prepare_rename.await.expect("prepare rename request failed");
5157                        });
5158                        if rng.lock().gen_bool(0.3) {
5159                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5160                            prepare_rename.detach();
5161                        } else {
5162                            prepare_rename.await;
5163                        }
5164                    }
5165                    45..=49 => {
5166                        let definitions = project.update(&mut cx, |project, cx| {
5167                            log::info!(
5168                                "Guest {}: requesting definitions for buffer {} ({:?})",
5169                                guest_id,
5170                                buffer.read(cx).remote_id(),
5171                                buffer.read(cx).file().unwrap().full_path(cx)
5172                            );
5173                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5174                            project.definition(&buffer, offset, cx)
5175                        });
5176                        let definitions = cx.background().spawn(async move {
5177                            definitions.await.expect("definitions request failed")
5178                        });
5179                        if rng.lock().gen_bool(0.3) {
5180                            log::info!("Guest {}: detaching definitions request", guest_id);
5181                            definitions.detach();
5182                        } else {
5183                            self.buffers
5184                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5185                        }
5186                    }
5187                    50..=54 => {
5188                        let highlights = project.update(&mut cx, |project, cx| {
5189                            log::info!(
5190                                "Guest {}: requesting highlights for buffer {} ({:?})",
5191                                guest_id,
5192                                buffer.read(cx).remote_id(),
5193                                buffer.read(cx).file().unwrap().full_path(cx)
5194                            );
5195                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5196                            project.document_highlights(&buffer, offset, cx)
5197                        });
5198                        let highlights = cx.background().spawn(async move {
5199                            highlights.await.expect("highlights request failed");
5200                        });
5201                        if rng.lock().gen_bool(0.3) {
5202                            log::info!("Guest {}: detaching highlights request", guest_id);
5203                            highlights.detach();
5204                        } else {
5205                            highlights.await;
5206                        }
5207                    }
5208                    55..=59 => {
5209                        let search = project.update(&mut cx, |project, cx| {
5210                            let query = rng.lock().gen_range('a'..='z');
5211                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5212                            project.search(SearchQuery::text(query, false, false), cx)
5213                        });
5214                        let search = cx
5215                            .background()
5216                            .spawn(async move { search.await.expect("search request failed") });
5217                        if rng.lock().gen_bool(0.3) {
5218                            log::info!("Guest {}: detaching search request", guest_id);
5219                            search.detach();
5220                        } else {
5221                            self.buffers.extend(search.await.into_keys());
5222                        }
5223                    }
5224                    _ => {
5225                        buffer.update(&mut cx, |buffer, cx| {
5226                            log::info!(
5227                                "Guest {}: updating buffer {} ({:?})",
5228                                guest_id,
5229                                buffer.remote_id(),
5230                                buffer.file().unwrap().full_path(cx)
5231                            );
5232                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5233                        });
5234                    }
5235                }
5236                cx.background().simulate_random_delay().await;
5237            }
5238
5239            log::info!("Guest {} done", guest_id);
5240
5241            self.project = Some(project);
5242            (self, cx)
5243        }
5244    }
5245
5246    impl Drop for TestClient {
5247        fn drop(&mut self) {
5248            self.client.tear_down();
5249        }
5250    }
5251
5252    impl Executor for Arc<gpui::executor::Background> {
5253        type Timer = gpui::executor::Timer;
5254
5255        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5256            self.spawn(future).detach();
5257        }
5258
5259        fn timer(&self, duration: Duration) -> Self::Timer {
5260            self.as_ref().timer(duration)
5261        }
5262    }
5263
5264    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5265        channel
5266            .messages()
5267            .cursor::<()>()
5268            .map(|m| {
5269                (
5270                    m.sender.github_login.clone(),
5271                    m.body.clone(),
5272                    m.is_pending(),
5273                )
5274            })
5275            .collect()
5276    }
5277
5278    struct EmptyView;
5279
5280    impl gpui::Entity for EmptyView {
5281        type Event = ();
5282    }
5283
5284    impl gpui::View for EmptyView {
5285        fn ui_name() -> &'static str {
5286            "empty view"
5287        }
5288
5289        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5290            gpui::Element::boxed(gpui::elements::Empty)
5291        }
5292    }
5293}