rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_io::Timer;
  10use async_std::task;
  11use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  12use collections::{HashMap, HashSet};
  13use futures::{channel::mpsc, future::BoxFuture, FutureExt, SinkExt, StreamExt};
  14use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{
  21    any::TypeId,
  22    future::Future,
  23    sync::Arc,
  24    time::{Duration, Instant},
  25};
  26use store::{Store, Worktree};
  27use surf::StatusCode;
  28use tide::log;
  29use tide::{
  30    http::headers::{HeaderName, CONNECTION, UPGRADE},
  31    Request, Response,
  32};
  33use time::OffsetDateTime;
  34
  35type MessageHandler = Box<
  36    dyn Send
  37        + Sync
  38        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  39>;
  40
  41pub struct Server {
  42    peer: Arc<Peer>,
  43    store: RwLock<Store>,
  44    app_state: Arc<AppState>,
  45    handlers: HashMap<TypeId, MessageHandler>,
  46    notifications: Option<mpsc::UnboundedSender<()>>,
  47}
  48
  49pub trait Executor: Send + Clone {
  50    type Timer: Send + Future;
  51    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  52    fn timer(&self, duration: Duration) -> Self::Timer;
  53}
  54
  55#[derive(Clone)]
  56pub struct RealExecutor;
  57
  58const MESSAGE_COUNT_PER_PAGE: usize = 100;
  59const MAX_MESSAGE_LEN: usize = 1024;
  60
  61impl Server {
  62    pub fn new(
  63        app_state: Arc<AppState>,
  64        peer: Arc<Peer>,
  65        notifications: Option<mpsc::UnboundedSender<()>>,
  66    ) -> Arc<Self> {
  67        let mut server = Self {
  68            peer,
  69            app_state,
  70            store: Default::default(),
  71            handlers: Default::default(),
  72            notifications,
  73        };
  74
  75        server
  76            .add_request_handler(Server::ping)
  77            .add_request_handler(Server::register_project)
  78            .add_message_handler(Server::unregister_project)
  79            .add_request_handler(Server::share_project)
  80            .add_message_handler(Server::unshare_project)
  81            .add_request_handler(Server::join_project)
  82            .add_message_handler(Server::leave_project)
  83            .add_request_handler(Server::register_worktree)
  84            .add_message_handler(Server::unregister_worktree)
  85            .add_request_handler(Server::update_worktree)
  86            .add_message_handler(Server::start_language_server)
  87            .add_message_handler(Server::update_language_server)
  88            .add_message_handler(Server::update_diagnostic_summary)
  89            .add_request_handler(Server::forward_project_request::<proto::GetDefinition>)
  90            .add_request_handler(Server::forward_project_request::<proto::GetReferences>)
  91            .add_request_handler(Server::forward_project_request::<proto::SearchProject>)
  92            .add_request_handler(Server::forward_project_request::<proto::GetDocumentHighlights>)
  93            .add_request_handler(Server::forward_project_request::<proto::GetProjectSymbols>)
  94            .add_request_handler(Server::forward_project_request::<proto::OpenBufferForSymbol>)
  95            .add_request_handler(Server::forward_project_request::<proto::OpenBufferById>)
  96            .add_request_handler(Server::forward_project_request::<proto::OpenBufferByPath>)
  97            .add_request_handler(Server::forward_project_request::<proto::GetCompletions>)
  98            .add_request_handler(
  99                Server::forward_project_request::<proto::ApplyCompletionAdditionalEdits>,
 100            )
 101            .add_request_handler(Server::forward_project_request::<proto::GetCodeActions>)
 102            .add_request_handler(Server::forward_project_request::<proto::ApplyCodeAction>)
 103            .add_request_handler(Server::forward_project_request::<proto::PrepareRename>)
 104            .add_request_handler(Server::forward_project_request::<proto::PerformRename>)
 105            .add_request_handler(Server::forward_project_request::<proto::FormatBuffers>)
 106            .add_request_handler(Server::update_buffer)
 107            .add_message_handler(Server::update_buffer_file)
 108            .add_message_handler(Server::buffer_reloaded)
 109            .add_message_handler(Server::buffer_saved)
 110            .add_request_handler(Server::save_buffer)
 111            .add_request_handler(Server::get_channels)
 112            .add_request_handler(Server::get_users)
 113            .add_request_handler(Server::join_channel)
 114            .add_message_handler(Server::leave_channel)
 115            .add_request_handler(Server::send_channel_message)
 116            .add_request_handler(Server::follow)
 117            .add_request_handler(Server::get_channel_messages);
 118
 119        Arc::new(server)
 120    }
 121
 122    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 123    where
 124        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 125        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 126        M: EnvelopedMessage,
 127    {
 128        let prev_handler = self.handlers.insert(
 129            TypeId::of::<M>(),
 130            Box::new(move |server, envelope| {
 131                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 132                (handler)(server, *envelope).boxed()
 133            }),
 134        );
 135        if prev_handler.is_some() {
 136            panic!("registered a handler for the same message twice");
 137        }
 138        self
 139    }
 140
 141    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 142    where
 143        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 144        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 145        M: RequestMessage,
 146    {
 147        self.add_message_handler(move |server, envelope| {
 148            let receipt = envelope.receipt();
 149            let response = (handler)(server.clone(), envelope);
 150            async move {
 151                match response.await {
 152                    Ok(response) => {
 153                        server.peer.respond(receipt, response)?;
 154                        Ok(())
 155                    }
 156                    Err(error) => {
 157                        server.peer.respond_with_error(
 158                            receipt,
 159                            proto::Error {
 160                                message: error.to_string(),
 161                            },
 162                        )?;
 163                        Err(error)
 164                    }
 165                }
 166            }
 167        })
 168    }
 169
 170    pub fn handle_connection<E: Executor>(
 171        self: &Arc<Self>,
 172        connection: Connection,
 173        addr: String,
 174        user_id: UserId,
 175        mut send_connection_id: Option<mpsc::Sender<ConnectionId>>,
 176        executor: E,
 177    ) -> impl Future<Output = ()> {
 178        let mut this = self.clone();
 179        async move {
 180            let (connection_id, handle_io, mut incoming_rx) = this
 181                .peer
 182                .add_connection(connection, {
 183                    let executor = executor.clone();
 184                    move |duration| {
 185                        let timer = executor.timer(duration);
 186                        async move {
 187                            timer.await;
 188                        }
 189                    }
 190                })
 191                .await;
 192
 193            if let Some(send_connection_id) = send_connection_id.as_mut() {
 194                let _ = send_connection_id.send(connection_id).await;
 195            }
 196
 197            this.state_mut().add_connection(connection_id, user_id);
 198            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 199                log::error!("error updating contacts for {:?}: {}", user_id, err);
 200            }
 201
 202            let handle_io = handle_io.fuse();
 203            futures::pin_mut!(handle_io);
 204            loop {
 205                let next_message = incoming_rx.next().fuse();
 206                futures::pin_mut!(next_message);
 207                futures::select_biased! {
 208                    result = handle_io => {
 209                        if let Err(err) = result {
 210                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 211                        }
 212                        break;
 213                    }
 214                    message = next_message => {
 215                        if let Some(message) = message {
 216                            let start_time = Instant::now();
 217                            let type_name = message.payload_type_name();
 218                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 219                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 220                                let notifications = this.notifications.clone();
 221                                let is_background = message.is_background();
 222                                let handle_message = (handler)(this.clone(), message);
 223                                let handle_message = async move {
 224                                    if let Err(err) = handle_message.await {
 225                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 226                                    } else {
 227                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 228                                    }
 229                                    if let Some(mut notifications) = notifications {
 230                                        let _ = notifications.send(()).await;
 231                                    }
 232                                };
 233                                if is_background {
 234                                    executor.spawn_detached(handle_message);
 235                                } else {
 236                                    handle_message.await;
 237                                }
 238                            } else {
 239                                log::warn!("unhandled message: {}", type_name);
 240                            }
 241                        } else {
 242                            log::info!("rpc connection closed {:?}", addr);
 243                            break;
 244                        }
 245                    }
 246                }
 247            }
 248
 249            if let Err(err) = this.sign_out(connection_id).await {
 250                log::error!("error signing out connection {:?} - {:?}", addr, err);
 251            }
 252        }
 253    }
 254
 255    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 256        self.peer.disconnect(connection_id);
 257        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 258
 259        for (project_id, project) in removed_connection.hosted_projects {
 260            if let Some(share) = project.share {
 261                broadcast(
 262                    connection_id,
 263                    share.guests.keys().copied().collect(),
 264                    |conn_id| {
 265                        self.peer
 266                            .send(conn_id, proto::UnshareProject { project_id })
 267                    },
 268                )?;
 269            }
 270        }
 271
 272        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 273            broadcast(connection_id, peer_ids, |conn_id| {
 274                self.peer.send(
 275                    conn_id,
 276                    proto::RemoveProjectCollaborator {
 277                        project_id,
 278                        peer_id: connection_id.0,
 279                    },
 280                )
 281            })?;
 282        }
 283
 284        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 285        Ok(())
 286    }
 287
 288    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 289        Ok(proto::Ack {})
 290    }
 291
 292    async fn register_project(
 293        mut self: Arc<Server>,
 294        request: TypedEnvelope<proto::RegisterProject>,
 295    ) -> tide::Result<proto::RegisterProjectResponse> {
 296        let project_id = {
 297            let mut state = self.state_mut();
 298            let user_id = state.user_id_for_connection(request.sender_id)?;
 299            state.register_project(request.sender_id, user_id)
 300        };
 301        Ok(proto::RegisterProjectResponse { project_id })
 302    }
 303
 304    async fn unregister_project(
 305        mut self: Arc<Server>,
 306        request: TypedEnvelope<proto::UnregisterProject>,
 307    ) -> tide::Result<()> {
 308        let project = self
 309            .state_mut()
 310            .unregister_project(request.payload.project_id, request.sender_id)?;
 311        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 312        Ok(())
 313    }
 314
 315    async fn share_project(
 316        mut self: Arc<Server>,
 317        request: TypedEnvelope<proto::ShareProject>,
 318    ) -> tide::Result<proto::Ack> {
 319        self.state_mut()
 320            .share_project(request.payload.project_id, request.sender_id);
 321        Ok(proto::Ack {})
 322    }
 323
 324    async fn unshare_project(
 325        mut self: Arc<Server>,
 326        request: TypedEnvelope<proto::UnshareProject>,
 327    ) -> tide::Result<()> {
 328        let project_id = request.payload.project_id;
 329        let project = self
 330            .state_mut()
 331            .unshare_project(project_id, request.sender_id)?;
 332
 333        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 334            self.peer
 335                .send(conn_id, proto::UnshareProject { project_id })
 336        })?;
 337        self.update_contacts_for_users(&project.authorized_user_ids)?;
 338        Ok(())
 339    }
 340
 341    async fn join_project(
 342        mut self: Arc<Server>,
 343        request: TypedEnvelope<proto::JoinProject>,
 344    ) -> tide::Result<proto::JoinProjectResponse> {
 345        let project_id = request.payload.project_id;
 346
 347        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 348        let (response, connection_ids, contact_user_ids) = self
 349            .state_mut()
 350            .join_project(request.sender_id, user_id, project_id)
 351            .and_then(|joined| {
 352                let share = joined.project.share()?;
 353                let peer_count = share.guests.len();
 354                let mut collaborators = Vec::with_capacity(peer_count);
 355                collaborators.push(proto::Collaborator {
 356                    peer_id: joined.project.host_connection_id.0,
 357                    replica_id: 0,
 358                    user_id: joined.project.host_user_id.to_proto(),
 359                });
 360                let worktrees = share
 361                    .worktrees
 362                    .iter()
 363                    .filter_map(|(id, shared_worktree)| {
 364                        let worktree = joined.project.worktrees.get(&id)?;
 365                        Some(proto::Worktree {
 366                            id: *id,
 367                            root_name: worktree.root_name.clone(),
 368                            entries: shared_worktree.entries.values().cloned().collect(),
 369                            diagnostic_summaries: shared_worktree
 370                                .diagnostic_summaries
 371                                .values()
 372                                .cloned()
 373                                .collect(),
 374                            visible: worktree.visible,
 375                        })
 376                    })
 377                    .collect();
 378                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 379                    if *peer_conn_id != request.sender_id {
 380                        collaborators.push(proto::Collaborator {
 381                            peer_id: peer_conn_id.0,
 382                            replica_id: *peer_replica_id as u32,
 383                            user_id: peer_user_id.to_proto(),
 384                        });
 385                    }
 386                }
 387                let response = proto::JoinProjectResponse {
 388                    worktrees,
 389                    replica_id: joined.replica_id as u32,
 390                    collaborators,
 391                    language_servers: joined.project.language_servers.clone(),
 392                };
 393                let connection_ids = joined.project.connection_ids();
 394                let contact_user_ids = joined.project.authorized_user_ids();
 395                Ok((response, connection_ids, contact_user_ids))
 396            })?;
 397
 398        broadcast(request.sender_id, connection_ids, |conn_id| {
 399            self.peer.send(
 400                conn_id,
 401                proto::AddProjectCollaborator {
 402                    project_id,
 403                    collaborator: Some(proto::Collaborator {
 404                        peer_id: request.sender_id.0,
 405                        replica_id: response.replica_id,
 406                        user_id: user_id.to_proto(),
 407                    }),
 408                },
 409            )
 410        })?;
 411        self.update_contacts_for_users(&contact_user_ids)?;
 412        Ok(response)
 413    }
 414
 415    async fn leave_project(
 416        mut self: Arc<Server>,
 417        request: TypedEnvelope<proto::LeaveProject>,
 418    ) -> tide::Result<()> {
 419        let sender_id = request.sender_id;
 420        let project_id = request.payload.project_id;
 421        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 422
 423        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 424            self.peer.send(
 425                conn_id,
 426                proto::RemoveProjectCollaborator {
 427                    project_id,
 428                    peer_id: sender_id.0,
 429                },
 430            )
 431        })?;
 432        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 433
 434        Ok(())
 435    }
 436
 437    async fn register_worktree(
 438        mut self: Arc<Server>,
 439        request: TypedEnvelope<proto::RegisterWorktree>,
 440    ) -> tide::Result<proto::Ack> {
 441        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 442
 443        let mut contact_user_ids = HashSet::default();
 444        contact_user_ids.insert(host_user_id);
 445        for github_login in &request.payload.authorized_logins {
 446            let contact_user_id = self.app_state.db.create_user(github_login, false).await?;
 447            contact_user_ids.insert(contact_user_id);
 448        }
 449
 450        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 451        let guest_connection_ids;
 452        {
 453            let mut state = self.state_mut();
 454            guest_connection_ids = state
 455                .read_project(request.payload.project_id, request.sender_id)?
 456                .guest_connection_ids();
 457            state.register_worktree(
 458                request.payload.project_id,
 459                request.payload.worktree_id,
 460                request.sender_id,
 461                Worktree {
 462                    authorized_user_ids: contact_user_ids.clone(),
 463                    root_name: request.payload.root_name.clone(),
 464                    visible: request.payload.visible,
 465                },
 466            )?;
 467        }
 468        broadcast(request.sender_id, guest_connection_ids, |connection_id| {
 469            self.peer
 470                .forward_send(request.sender_id, connection_id, request.payload.clone())
 471        })?;
 472        self.update_contacts_for_users(&contact_user_ids)?;
 473        Ok(proto::Ack {})
 474    }
 475
 476    async fn unregister_worktree(
 477        mut self: Arc<Server>,
 478        request: TypedEnvelope<proto::UnregisterWorktree>,
 479    ) -> tide::Result<()> {
 480        let project_id = request.payload.project_id;
 481        let worktree_id = request.payload.worktree_id;
 482        let (worktree, guest_connection_ids) =
 483            self.state_mut()
 484                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 485        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 486            self.peer.send(
 487                conn_id,
 488                proto::UnregisterWorktree {
 489                    project_id,
 490                    worktree_id,
 491                },
 492            )
 493        })?;
 494        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 495        Ok(())
 496    }
 497
 498    async fn update_worktree(
 499        mut self: Arc<Server>,
 500        request: TypedEnvelope<proto::UpdateWorktree>,
 501    ) -> tide::Result<proto::Ack> {
 502        let connection_ids = self.state_mut().update_worktree(
 503            request.sender_id,
 504            request.payload.project_id,
 505            request.payload.worktree_id,
 506            &request.payload.removed_entries,
 507            &request.payload.updated_entries,
 508        )?;
 509
 510        broadcast(request.sender_id, connection_ids, |connection_id| {
 511            self.peer
 512                .forward_send(request.sender_id, connection_id, request.payload.clone())
 513        })?;
 514
 515        Ok(proto::Ack {})
 516    }
 517
 518    async fn update_diagnostic_summary(
 519        mut self: Arc<Server>,
 520        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 521    ) -> tide::Result<()> {
 522        let summary = request
 523            .payload
 524            .summary
 525            .clone()
 526            .ok_or_else(|| anyhow!("invalid summary"))?;
 527        let receiver_ids = self.state_mut().update_diagnostic_summary(
 528            request.payload.project_id,
 529            request.payload.worktree_id,
 530            request.sender_id,
 531            summary,
 532        )?;
 533
 534        broadcast(request.sender_id, receiver_ids, |connection_id| {
 535            self.peer
 536                .forward_send(request.sender_id, connection_id, request.payload.clone())
 537        })?;
 538        Ok(())
 539    }
 540
 541    async fn start_language_server(
 542        mut self: Arc<Server>,
 543        request: TypedEnvelope<proto::StartLanguageServer>,
 544    ) -> tide::Result<()> {
 545        let receiver_ids = self.state_mut().start_language_server(
 546            request.payload.project_id,
 547            request.sender_id,
 548            request
 549                .payload
 550                .server
 551                .clone()
 552                .ok_or_else(|| anyhow!("invalid language server"))?,
 553        )?;
 554        broadcast(request.sender_id, receiver_ids, |connection_id| {
 555            self.peer
 556                .forward_send(request.sender_id, connection_id, request.payload.clone())
 557        })?;
 558        Ok(())
 559    }
 560
 561    async fn update_language_server(
 562        self: Arc<Server>,
 563        request: TypedEnvelope<proto::UpdateLanguageServer>,
 564    ) -> tide::Result<()> {
 565        let receiver_ids = self
 566            .state()
 567            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 568        broadcast(request.sender_id, receiver_ids, |connection_id| {
 569            self.peer
 570                .forward_send(request.sender_id, connection_id, request.payload.clone())
 571        })?;
 572        Ok(())
 573    }
 574
 575    async fn forward_project_request<T>(
 576        self: Arc<Server>,
 577        request: TypedEnvelope<T>,
 578    ) -> tide::Result<T::Response>
 579    where
 580        T: EntityMessage + RequestMessage,
 581    {
 582        let host_connection_id = self
 583            .state()
 584            .read_project(request.payload.remote_entity_id(), request.sender_id)?
 585            .host_connection_id;
 586        Ok(self
 587            .peer
 588            .forward_request(request.sender_id, host_connection_id, request.payload)
 589            .await?)
 590    }
 591
 592    async fn save_buffer(
 593        self: Arc<Server>,
 594        request: TypedEnvelope<proto::SaveBuffer>,
 595    ) -> tide::Result<proto::BufferSaved> {
 596        let host;
 597        let mut guests;
 598        {
 599            let state = self.state();
 600            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 601            host = project.host_connection_id;
 602            guests = project.guest_connection_ids()
 603        }
 604
 605        let response = self
 606            .peer
 607            .forward_request(request.sender_id, host, request.payload.clone())
 608            .await?;
 609
 610        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 611        broadcast(host, guests, |conn_id| {
 612            self.peer.forward_send(host, conn_id, response.clone())
 613        })?;
 614
 615        Ok(response)
 616    }
 617
 618    async fn update_buffer(
 619        self: Arc<Server>,
 620        request: TypedEnvelope<proto::UpdateBuffer>,
 621    ) -> tide::Result<proto::Ack> {
 622        let receiver_ids = self
 623            .state()
 624            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 625        broadcast(request.sender_id, receiver_ids, |connection_id| {
 626            self.peer
 627                .forward_send(request.sender_id, connection_id, request.payload.clone())
 628        })?;
 629        Ok(proto::Ack {})
 630    }
 631
 632    async fn update_buffer_file(
 633        self: Arc<Server>,
 634        request: TypedEnvelope<proto::UpdateBufferFile>,
 635    ) -> tide::Result<()> {
 636        let receiver_ids = self
 637            .state()
 638            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 639        broadcast(request.sender_id, receiver_ids, |connection_id| {
 640            self.peer
 641                .forward_send(request.sender_id, connection_id, request.payload.clone())
 642        })?;
 643        Ok(())
 644    }
 645
 646    async fn buffer_reloaded(
 647        self: Arc<Server>,
 648        request: TypedEnvelope<proto::BufferReloaded>,
 649    ) -> tide::Result<()> {
 650        let receiver_ids = self
 651            .state()
 652            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 653        broadcast(request.sender_id, receiver_ids, |connection_id| {
 654            self.peer
 655                .forward_send(request.sender_id, connection_id, request.payload.clone())
 656        })?;
 657        Ok(())
 658    }
 659
 660    async fn buffer_saved(
 661        self: Arc<Server>,
 662        request: TypedEnvelope<proto::BufferSaved>,
 663    ) -> tide::Result<()> {
 664        let receiver_ids = self
 665            .state()
 666            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 667        broadcast(request.sender_id, receiver_ids, |connection_id| {
 668            self.peer
 669                .forward_send(request.sender_id, connection_id, request.payload.clone())
 670        })?;
 671        Ok(())
 672    }
 673
 674    async fn follow(
 675        self: Arc<Self>,
 676        request: TypedEnvelope<proto::Follow>,
 677    ) -> tide::Result<proto::FollowResponse> {
 678        let leader_id = ConnectionId(request.payload.leader_id);
 679        if !self
 680            .state()
 681            .project_connection_ids(request.payload.project_id, request.sender_id)?
 682            .contains(&leader_id)
 683        {
 684            Err(anyhow!("no such peer"))?;
 685        }
 686        let response = self
 687            .peer
 688            .forward_request(request.sender_id, leader_id, request.payload)
 689            .await?;
 690        Ok(response)
 691    }
 692
 693    async fn get_channels(
 694        self: Arc<Server>,
 695        request: TypedEnvelope<proto::GetChannels>,
 696    ) -> tide::Result<proto::GetChannelsResponse> {
 697        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 698        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 699        Ok(proto::GetChannelsResponse {
 700            channels: channels
 701                .into_iter()
 702                .map(|chan| proto::Channel {
 703                    id: chan.id.to_proto(),
 704                    name: chan.name,
 705                })
 706                .collect(),
 707        })
 708    }
 709
 710    async fn get_users(
 711        self: Arc<Server>,
 712        request: TypedEnvelope<proto::GetUsers>,
 713    ) -> tide::Result<proto::GetUsersResponse> {
 714        let user_ids = request
 715            .payload
 716            .user_ids
 717            .into_iter()
 718            .map(UserId::from_proto)
 719            .collect();
 720        let users = self
 721            .app_state
 722            .db
 723            .get_users_by_ids(user_ids)
 724            .await?
 725            .into_iter()
 726            .map(|user| proto::User {
 727                id: user.id.to_proto(),
 728                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 729                github_login: user.github_login,
 730            })
 731            .collect();
 732        Ok(proto::GetUsersResponse { users })
 733    }
 734
 735    fn update_contacts_for_users<'a>(
 736        self: &Arc<Server>,
 737        user_ids: impl IntoIterator<Item = &'a UserId>,
 738    ) -> anyhow::Result<()> {
 739        let mut result = Ok(());
 740        let state = self.state();
 741        for user_id in user_ids {
 742            let contacts = state.contacts_for_user(*user_id);
 743            for connection_id in state.connection_ids_for_user(*user_id) {
 744                if let Err(error) = self.peer.send(
 745                    connection_id,
 746                    proto::UpdateContacts {
 747                        contacts: contacts.clone(),
 748                    },
 749                ) {
 750                    result = Err(error);
 751                }
 752            }
 753        }
 754        result
 755    }
 756
 757    async fn join_channel(
 758        mut self: Arc<Self>,
 759        request: TypedEnvelope<proto::JoinChannel>,
 760    ) -> tide::Result<proto::JoinChannelResponse> {
 761        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 762        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 763        if !self
 764            .app_state
 765            .db
 766            .can_user_access_channel(user_id, channel_id)
 767            .await?
 768        {
 769            Err(anyhow!("access denied"))?;
 770        }
 771
 772        self.state_mut().join_channel(request.sender_id, channel_id);
 773        let messages = self
 774            .app_state
 775            .db
 776            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 777            .await?
 778            .into_iter()
 779            .map(|msg| proto::ChannelMessage {
 780                id: msg.id.to_proto(),
 781                body: msg.body,
 782                timestamp: msg.sent_at.unix_timestamp() as u64,
 783                sender_id: msg.sender_id.to_proto(),
 784                nonce: Some(msg.nonce.as_u128().into()),
 785            })
 786            .collect::<Vec<_>>();
 787        Ok(proto::JoinChannelResponse {
 788            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 789            messages,
 790        })
 791    }
 792
 793    async fn leave_channel(
 794        mut self: Arc<Self>,
 795        request: TypedEnvelope<proto::LeaveChannel>,
 796    ) -> tide::Result<()> {
 797        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 798        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 799        if !self
 800            .app_state
 801            .db
 802            .can_user_access_channel(user_id, channel_id)
 803            .await?
 804        {
 805            Err(anyhow!("access denied"))?;
 806        }
 807
 808        self.state_mut()
 809            .leave_channel(request.sender_id, channel_id);
 810
 811        Ok(())
 812    }
 813
 814    async fn send_channel_message(
 815        self: Arc<Self>,
 816        request: TypedEnvelope<proto::SendChannelMessage>,
 817    ) -> tide::Result<proto::SendChannelMessageResponse> {
 818        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 819        let user_id;
 820        let connection_ids;
 821        {
 822            let state = self.state();
 823            user_id = state.user_id_for_connection(request.sender_id)?;
 824            connection_ids = state.channel_connection_ids(channel_id)?;
 825        }
 826
 827        // Validate the message body.
 828        let body = request.payload.body.trim().to_string();
 829        if body.len() > MAX_MESSAGE_LEN {
 830            return Err(anyhow!("message is too long"))?;
 831        }
 832        if body.is_empty() {
 833            return Err(anyhow!("message can't be blank"))?;
 834        }
 835
 836        let timestamp = OffsetDateTime::now_utc();
 837        let nonce = request
 838            .payload
 839            .nonce
 840            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 841
 842        let message_id = self
 843            .app_state
 844            .db
 845            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 846            .await?
 847            .to_proto();
 848        let message = proto::ChannelMessage {
 849            sender_id: user_id.to_proto(),
 850            id: message_id,
 851            body,
 852            timestamp: timestamp.unix_timestamp() as u64,
 853            nonce: Some(nonce),
 854        };
 855        broadcast(request.sender_id, connection_ids, |conn_id| {
 856            self.peer.send(
 857                conn_id,
 858                proto::ChannelMessageSent {
 859                    channel_id: channel_id.to_proto(),
 860                    message: Some(message.clone()),
 861                },
 862            )
 863        })?;
 864        Ok(proto::SendChannelMessageResponse {
 865            message: Some(message),
 866        })
 867    }
 868
 869    async fn get_channel_messages(
 870        self: Arc<Self>,
 871        request: TypedEnvelope<proto::GetChannelMessages>,
 872    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 873        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 874        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 875        if !self
 876            .app_state
 877            .db
 878            .can_user_access_channel(user_id, channel_id)
 879            .await?
 880        {
 881            Err(anyhow!("access denied"))?;
 882        }
 883
 884        let messages = self
 885            .app_state
 886            .db
 887            .get_channel_messages(
 888                channel_id,
 889                MESSAGE_COUNT_PER_PAGE,
 890                Some(MessageId::from_proto(request.payload.before_message_id)),
 891            )
 892            .await?
 893            .into_iter()
 894            .map(|msg| proto::ChannelMessage {
 895                id: msg.id.to_proto(),
 896                body: msg.body,
 897                timestamp: msg.sent_at.unix_timestamp() as u64,
 898                sender_id: msg.sender_id.to_proto(),
 899                nonce: Some(msg.nonce.as_u128().into()),
 900            })
 901            .collect::<Vec<_>>();
 902
 903        Ok(proto::GetChannelMessagesResponse {
 904            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 905            messages,
 906        })
 907    }
 908
 909    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 910        self.store.read()
 911    }
 912
 913    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 914        self.store.write()
 915    }
 916}
 917
 918impl Executor for RealExecutor {
 919    type Timer = Timer;
 920
 921    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 922        task::spawn(future);
 923    }
 924
 925    fn timer(&self, duration: Duration) -> Self::Timer {
 926        Timer::after(duration)
 927    }
 928}
 929
 930fn broadcast<F>(
 931    sender_id: ConnectionId,
 932    receiver_ids: Vec<ConnectionId>,
 933    mut f: F,
 934) -> anyhow::Result<()>
 935where
 936    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 937{
 938    let mut result = Ok(());
 939    for receiver_id in receiver_ids {
 940        if receiver_id != sender_id {
 941            if let Err(error) = f(receiver_id) {
 942                if result.is_ok() {
 943                    result = Err(error);
 944                }
 945            }
 946        }
 947    }
 948    result
 949}
 950
 951pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 952    let server = Server::new(app.state().clone(), rpc.clone(), None);
 953    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 954        let server = server.clone();
 955        async move {
 956            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 957
 958            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 959            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 960            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
 961            let client_protocol_version: Option<u32> = request
 962                .header("X-Zed-Protocol-Version")
 963                .and_then(|v| v.as_str().parse().ok());
 964
 965            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
 966                return Ok(Response::new(StatusCode::UpgradeRequired));
 967            }
 968
 969            let header = match request.header("Sec-Websocket-Key") {
 970                Some(h) => h.as_str(),
 971                None => return Err(anyhow!("expected sec-websocket-key"))?,
 972            };
 973
 974            let user_id = process_auth_header(&request).await?;
 975
 976            let mut response = Response::new(StatusCode::SwitchingProtocols);
 977            response.insert_header(UPGRADE, "websocket");
 978            response.insert_header(CONNECTION, "Upgrade");
 979            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
 980            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
 981            response.insert_header("Sec-Websocket-Version", "13");
 982
 983            let http_res: &mut tide::http::Response = response.as_mut();
 984            let upgrade_receiver = http_res.recv_upgrade().await;
 985            let addr = request.remote().unwrap_or("unknown").to_string();
 986            task::spawn(async move {
 987                if let Some(stream) = upgrade_receiver.await {
 988                    server
 989                        .handle_connection(
 990                            Connection::new(
 991                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
 992                            ),
 993                            addr,
 994                            user_id,
 995                            None,
 996                            RealExecutor,
 997                        )
 998                        .await;
 999                }
1000            });
1001
1002            Ok(response)
1003        }
1004    });
1005}
1006
1007fn header_contains_ignore_case<T>(
1008    request: &tide::Request<T>,
1009    header_name: HeaderName,
1010    value: &str,
1011) -> bool {
1012    request
1013        .header(header_name)
1014        .map(|h| {
1015            h.as_str()
1016                .split(',')
1017                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1018        })
1019        .unwrap_or(false)
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024    use super::*;
1025    use crate::{
1026        auth,
1027        db::{tests::TestDb, UserId},
1028        github, AppState, Config,
1029    };
1030    use ::rpc::Peer;
1031    use client::{
1032        self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1033        EstablishConnectionError, UserStore,
1034    };
1035    use collections::BTreeMap;
1036    use editor::{
1037        self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, Input, Redo, Rename,
1038        ToOffset, ToggleCodeActions, Undo,
1039    };
1040    use gpui::{executor, ModelHandle, TestAppContext, ViewHandle};
1041    use language::{
1042        tree_sitter_rust, Diagnostic, DiagnosticEntry, Language, LanguageConfig, LanguageRegistry,
1043        LanguageServerConfig, OffsetRangeExt, Point, ToLspPosition,
1044    };
1045    use lsp;
1046    use parking_lot::Mutex;
1047    use postage::barrier;
1048    use project::{
1049        fs::{FakeFs, Fs as _},
1050        search::SearchQuery,
1051        worktree::WorktreeHandle,
1052        DiagnosticSummary, Project, ProjectPath, WorktreeId,
1053    };
1054    use rand::prelude::*;
1055    use rpc::PeerId;
1056    use serde_json::json;
1057    use sqlx::types::time::OffsetDateTime;
1058    use std::{
1059        cell::Cell,
1060        env,
1061        ops::Deref,
1062        path::{Path, PathBuf},
1063        rc::Rc,
1064        sync::{
1065            atomic::{AtomicBool, Ordering::SeqCst},
1066            Arc,
1067        },
1068        time::Duration,
1069    };
1070    use workspace::{Settings, SplitDirection, Workspace, WorkspaceParams};
1071
1072    #[cfg(test)]
1073    #[ctor::ctor]
1074    fn init_logger() {
1075        if std::env::var("RUST_LOG").is_ok() {
1076            env_logger::init();
1077        }
1078    }
1079
1080    #[gpui::test(iterations = 10)]
1081    async fn test_share_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1082        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1083        let lang_registry = Arc::new(LanguageRegistry::test());
1084        let fs = FakeFs::new(cx_a.background());
1085        cx_a.foreground().forbid_parking();
1086
1087        // Connect to a server as 2 clients.
1088        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1089        let client_a = server.create_client(cx_a, "user_a").await;
1090        let client_b = server.create_client(cx_b, "user_b").await;
1091
1092        // Share a project as client A
1093        fs.insert_tree(
1094            "/a",
1095            json!({
1096                ".zed.toml": r#"collaborators = ["user_b"]"#,
1097                "a.txt": "a-contents",
1098                "b.txt": "b-contents",
1099            }),
1100        )
1101        .await;
1102        let project_a = cx_a.update(|cx| {
1103            Project::local(
1104                client_a.clone(),
1105                client_a.user_store.clone(),
1106                lang_registry.clone(),
1107                fs.clone(),
1108                cx,
1109            )
1110        });
1111        let (worktree_a, _) = project_a
1112            .update(cx_a, |p, cx| {
1113                p.find_or_create_local_worktree("/a", true, cx)
1114            })
1115            .await
1116            .unwrap();
1117        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1118        worktree_a
1119            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1120            .await;
1121        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1122        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1123
1124        // Join that project as client B
1125        let project_b = Project::remote(
1126            project_id,
1127            client_b.clone(),
1128            client_b.user_store.clone(),
1129            lang_registry.clone(),
1130            fs.clone(),
1131            &mut cx_b.to_async(),
1132        )
1133        .await
1134        .unwrap();
1135
1136        let replica_id_b = project_b.read_with(cx_b, |project, _| {
1137            assert_eq!(
1138                project
1139                    .collaborators()
1140                    .get(&client_a.peer_id)
1141                    .unwrap()
1142                    .user
1143                    .github_login,
1144                "user_a"
1145            );
1146            project.replica_id()
1147        });
1148        project_a
1149            .condition(&cx_a, |tree, _| {
1150                tree.collaborators()
1151                    .get(&client_b.peer_id)
1152                    .map_or(false, |collaborator| {
1153                        collaborator.replica_id == replica_id_b
1154                            && collaborator.user.github_login == "user_b"
1155                    })
1156            })
1157            .await;
1158
1159        // Open the same file as client B and client A.
1160        let buffer_b = project_b
1161            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1162            .await
1163            .unwrap();
1164        buffer_b.read_with(cx_b, |buf, _| assert_eq!(buf.text(), "b-contents"));
1165        project_a.read_with(cx_a, |project, cx| {
1166            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1167        });
1168        let buffer_a = project_a
1169            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1170            .await
1171            .unwrap();
1172
1173        let editor_b = cx_b.add_view(window_b, |cx| Editor::for_buffer(buffer_b, None, cx));
1174
1175        // TODO
1176        // // Create a selection set as client B and see that selection set as client A.
1177        // buffer_a
1178        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1179        //     .await;
1180
1181        // Edit the buffer as client B and see that edit as client A.
1182        editor_b.update(cx_b, |editor, cx| {
1183            editor.handle_input(&Input("ok, ".into()), cx)
1184        });
1185        buffer_a
1186            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1187            .await;
1188
1189        // TODO
1190        // // Remove the selection set as client B, see those selections disappear as client A.
1191        cx_b.update(move |_| drop(editor_b));
1192        // buffer_a
1193        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1194        //     .await;
1195
1196        // Dropping the client B's project removes client B from client A's collaborators.
1197        cx_b.update(move |_| drop(project_b));
1198        project_a
1199            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1200            .await;
1201    }
1202
1203    #[gpui::test(iterations = 10)]
1204    async fn test_unshare_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1205        let lang_registry = Arc::new(LanguageRegistry::test());
1206        let fs = FakeFs::new(cx_a.background());
1207        cx_a.foreground().forbid_parking();
1208
1209        // Connect to a server as 2 clients.
1210        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1211        let client_a = server.create_client(cx_a, "user_a").await;
1212        let client_b = server.create_client(cx_b, "user_b").await;
1213
1214        // Share a project as client A
1215        fs.insert_tree(
1216            "/a",
1217            json!({
1218                ".zed.toml": r#"collaborators = ["user_b"]"#,
1219                "a.txt": "a-contents",
1220                "b.txt": "b-contents",
1221            }),
1222        )
1223        .await;
1224        let project_a = cx_a.update(|cx| {
1225            Project::local(
1226                client_a.clone(),
1227                client_a.user_store.clone(),
1228                lang_registry.clone(),
1229                fs.clone(),
1230                cx,
1231            )
1232        });
1233        let (worktree_a, _) = project_a
1234            .update(cx_a, |p, cx| {
1235                p.find_or_create_local_worktree("/a", true, cx)
1236            })
1237            .await
1238            .unwrap();
1239        worktree_a
1240            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1241            .await;
1242        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1243        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1244        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1245        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1246
1247        // Join that project as client B
1248        let project_b = Project::remote(
1249            project_id,
1250            client_b.clone(),
1251            client_b.user_store.clone(),
1252            lang_registry.clone(),
1253            fs.clone(),
1254            &mut cx_b.to_async(),
1255        )
1256        .await
1257        .unwrap();
1258        project_b
1259            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1260            .await
1261            .unwrap();
1262
1263        // Unshare the project as client A
1264        project_a
1265            .update(cx_a, |project, cx| project.unshare(cx))
1266            .await
1267            .unwrap();
1268        project_b
1269            .condition(cx_b, |project, _| project.is_read_only())
1270            .await;
1271        assert!(worktree_a.read_with(cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1272        cx_b.update(|_| {
1273            drop(project_b);
1274        });
1275
1276        // Share the project again and ensure guests can still join.
1277        project_a
1278            .update(cx_a, |project, cx| project.share(cx))
1279            .await
1280            .unwrap();
1281        assert!(worktree_a.read_with(cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1282
1283        let project_b2 = Project::remote(
1284            project_id,
1285            client_b.clone(),
1286            client_b.user_store.clone(),
1287            lang_registry.clone(),
1288            fs.clone(),
1289            &mut cx_b.to_async(),
1290        )
1291        .await
1292        .unwrap();
1293        project_b2
1294            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1295            .await
1296            .unwrap();
1297    }
1298
1299    #[gpui::test(iterations = 10)]
1300    async fn test_propagate_saves_and_fs_changes(
1301        cx_a: &mut TestAppContext,
1302        cx_b: &mut TestAppContext,
1303        cx_c: &mut TestAppContext,
1304    ) {
1305        let lang_registry = Arc::new(LanguageRegistry::test());
1306        let fs = FakeFs::new(cx_a.background());
1307        cx_a.foreground().forbid_parking();
1308
1309        // Connect to a server as 3 clients.
1310        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1311        let client_a = server.create_client(cx_a, "user_a").await;
1312        let client_b = server.create_client(cx_b, "user_b").await;
1313        let client_c = server.create_client(cx_c, "user_c").await;
1314
1315        // Share a worktree as client A.
1316        fs.insert_tree(
1317            "/a",
1318            json!({
1319                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1320                "file1": "",
1321                "file2": ""
1322            }),
1323        )
1324        .await;
1325        let project_a = cx_a.update(|cx| {
1326            Project::local(
1327                client_a.clone(),
1328                client_a.user_store.clone(),
1329                lang_registry.clone(),
1330                fs.clone(),
1331                cx,
1332            )
1333        });
1334        let (worktree_a, _) = project_a
1335            .update(cx_a, |p, cx| {
1336                p.find_or_create_local_worktree("/a", true, cx)
1337            })
1338            .await
1339            .unwrap();
1340        worktree_a
1341            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1342            .await;
1343        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1344        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1345        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1346
1347        // Join that worktree as clients B and C.
1348        let project_b = Project::remote(
1349            project_id,
1350            client_b.clone(),
1351            client_b.user_store.clone(),
1352            lang_registry.clone(),
1353            fs.clone(),
1354            &mut cx_b.to_async(),
1355        )
1356        .await
1357        .unwrap();
1358        let project_c = Project::remote(
1359            project_id,
1360            client_c.clone(),
1361            client_c.user_store.clone(),
1362            lang_registry.clone(),
1363            fs.clone(),
1364            &mut cx_c.to_async(),
1365        )
1366        .await
1367        .unwrap();
1368        let worktree_b = project_b.read_with(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1369        let worktree_c = project_c.read_with(cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1370
1371        // Open and edit a buffer as both guests B and C.
1372        let buffer_b = project_b
1373            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1374            .await
1375            .unwrap();
1376        let buffer_c = project_c
1377            .update(cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1378            .await
1379            .unwrap();
1380        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1381        buffer_c.update(cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1382
1383        // Open and edit that buffer as the host.
1384        let buffer_a = project_a
1385            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1386            .await
1387            .unwrap();
1388
1389        buffer_a
1390            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1391            .await;
1392        buffer_a.update(cx_a, |buf, cx| {
1393            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1394        });
1395
1396        // Wait for edits to propagate
1397        buffer_a
1398            .condition(cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1399            .await;
1400        buffer_b
1401            .condition(cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1402            .await;
1403        buffer_c
1404            .condition(cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1405            .await;
1406
1407        // Edit the buffer as the host and concurrently save as guest B.
1408        let save_b = buffer_b.update(cx_b, |buf, cx| buf.save(cx));
1409        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1410        save_b.await.unwrap();
1411        assert_eq!(
1412            fs.load("/a/file1".as_ref()).await.unwrap(),
1413            "hi-a, i-am-c, i-am-b, i-am-a"
1414        );
1415        buffer_a.read_with(cx_a, |buf, _| assert!(!buf.is_dirty()));
1416        buffer_b.read_with(cx_b, |buf, _| assert!(!buf.is_dirty()));
1417        buffer_c.condition(cx_c, |buf, _| !buf.is_dirty()).await;
1418
1419        worktree_a.flush_fs_events(cx_a).await;
1420
1421        // Make changes on host's file system, see those changes on guest worktrees.
1422        fs.rename(
1423            "/a/file1".as_ref(),
1424            "/a/file1-renamed".as_ref(),
1425            Default::default(),
1426        )
1427        .await
1428        .unwrap();
1429
1430        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1431            .await
1432            .unwrap();
1433        fs.insert_file(Path::new("/a/file4"), "4".into()).await;
1434
1435        worktree_a
1436            .condition(&cx_a, |tree, _| {
1437                tree.paths()
1438                    .map(|p| p.to_string_lossy())
1439                    .collect::<Vec<_>>()
1440                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1441            })
1442            .await;
1443        worktree_b
1444            .condition(&cx_b, |tree, _| {
1445                tree.paths()
1446                    .map(|p| p.to_string_lossy())
1447                    .collect::<Vec<_>>()
1448                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1449            })
1450            .await;
1451        worktree_c
1452            .condition(&cx_c, |tree, _| {
1453                tree.paths()
1454                    .map(|p| p.to_string_lossy())
1455                    .collect::<Vec<_>>()
1456                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1457            })
1458            .await;
1459
1460        // Ensure buffer files are updated as well.
1461        buffer_a
1462            .condition(&cx_a, |buf, _| {
1463                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1464            })
1465            .await;
1466        buffer_b
1467            .condition(&cx_b, |buf, _| {
1468                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1469            })
1470            .await;
1471        buffer_c
1472            .condition(&cx_c, |buf, _| {
1473                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1474            })
1475            .await;
1476    }
1477
1478    #[gpui::test(iterations = 10)]
1479    async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1480        cx_a.foreground().forbid_parking();
1481        let lang_registry = Arc::new(LanguageRegistry::test());
1482        let fs = FakeFs::new(cx_a.background());
1483
1484        // Connect to a server as 2 clients.
1485        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1486        let client_a = server.create_client(cx_a, "user_a").await;
1487        let client_b = server.create_client(cx_b, "user_b").await;
1488
1489        // Share a project as client A
1490        fs.insert_tree(
1491            "/dir",
1492            json!({
1493                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1494                "a.txt": "a-contents",
1495            }),
1496        )
1497        .await;
1498
1499        let project_a = cx_a.update(|cx| {
1500            Project::local(
1501                client_a.clone(),
1502                client_a.user_store.clone(),
1503                lang_registry.clone(),
1504                fs.clone(),
1505                cx,
1506            )
1507        });
1508        let (worktree_a, _) = project_a
1509            .update(cx_a, |p, cx| {
1510                p.find_or_create_local_worktree("/dir", true, cx)
1511            })
1512            .await
1513            .unwrap();
1514        worktree_a
1515            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1516            .await;
1517        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1518        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1519        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1520
1521        // Join that project as client B
1522        let project_b = Project::remote(
1523            project_id,
1524            client_b.clone(),
1525            client_b.user_store.clone(),
1526            lang_registry.clone(),
1527            fs.clone(),
1528            &mut cx_b.to_async(),
1529        )
1530        .await
1531        .unwrap();
1532
1533        // Open a buffer as client B
1534        let buffer_b = project_b
1535            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1536            .await
1537            .unwrap();
1538
1539        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1540        buffer_b.read_with(cx_b, |buf, _| {
1541            assert!(buf.is_dirty());
1542            assert!(!buf.has_conflict());
1543        });
1544
1545        buffer_b.update(cx_b, |buf, cx| buf.save(cx)).await.unwrap();
1546        buffer_b
1547            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1548            .await;
1549        buffer_b.read_with(cx_b, |buf, _| {
1550            assert!(!buf.has_conflict());
1551        });
1552
1553        buffer_b.update(cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1554        buffer_b.read_with(cx_b, |buf, _| {
1555            assert!(buf.is_dirty());
1556            assert!(!buf.has_conflict());
1557        });
1558    }
1559
1560    #[gpui::test(iterations = 10)]
1561    async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1562        cx_a.foreground().forbid_parking();
1563        let lang_registry = Arc::new(LanguageRegistry::test());
1564        let fs = FakeFs::new(cx_a.background());
1565
1566        // Connect to a server as 2 clients.
1567        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1568        let client_a = server.create_client(cx_a, "user_a").await;
1569        let client_b = server.create_client(cx_b, "user_b").await;
1570
1571        // Share a project as client A
1572        fs.insert_tree(
1573            "/dir",
1574            json!({
1575                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1576                "a.txt": "a-contents",
1577            }),
1578        )
1579        .await;
1580
1581        let project_a = cx_a.update(|cx| {
1582            Project::local(
1583                client_a.clone(),
1584                client_a.user_store.clone(),
1585                lang_registry.clone(),
1586                fs.clone(),
1587                cx,
1588            )
1589        });
1590        let (worktree_a, _) = project_a
1591            .update(cx_a, |p, cx| {
1592                p.find_or_create_local_worktree("/dir", true, cx)
1593            })
1594            .await
1595            .unwrap();
1596        worktree_a
1597            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1598            .await;
1599        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1600        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1601        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1602
1603        // Join that project as client B
1604        let project_b = Project::remote(
1605            project_id,
1606            client_b.clone(),
1607            client_b.user_store.clone(),
1608            lang_registry.clone(),
1609            fs.clone(),
1610            &mut cx_b.to_async(),
1611        )
1612        .await
1613        .unwrap();
1614        let _worktree_b = project_b.update(cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1615
1616        // Open a buffer as client B
1617        let buffer_b = project_b
1618            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1619            .await
1620            .unwrap();
1621        buffer_b.read_with(cx_b, |buf, _| {
1622            assert!(!buf.is_dirty());
1623            assert!(!buf.has_conflict());
1624        });
1625
1626        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1627            .await
1628            .unwrap();
1629        buffer_b
1630            .condition(&cx_b, |buf, _| {
1631                buf.text() == "new contents" && !buf.is_dirty()
1632            })
1633            .await;
1634        buffer_b.read_with(cx_b, |buf, _| {
1635            assert!(!buf.has_conflict());
1636        });
1637    }
1638
1639    #[gpui::test(iterations = 10)]
1640    async fn test_editing_while_guest_opens_buffer(
1641        cx_a: &mut TestAppContext,
1642        cx_b: &mut TestAppContext,
1643    ) {
1644        cx_a.foreground().forbid_parking();
1645        let lang_registry = Arc::new(LanguageRegistry::test());
1646        let fs = FakeFs::new(cx_a.background());
1647
1648        // Connect to a server as 2 clients.
1649        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1650        let client_a = server.create_client(cx_a, "user_a").await;
1651        let client_b = server.create_client(cx_b, "user_b").await;
1652
1653        // Share a project as client A
1654        fs.insert_tree(
1655            "/dir",
1656            json!({
1657                ".zed.toml": r#"collaborators = ["user_b"]"#,
1658                "a.txt": "a-contents",
1659            }),
1660        )
1661        .await;
1662        let project_a = cx_a.update(|cx| {
1663            Project::local(
1664                client_a.clone(),
1665                client_a.user_store.clone(),
1666                lang_registry.clone(),
1667                fs.clone(),
1668                cx,
1669            )
1670        });
1671        let (worktree_a, _) = project_a
1672            .update(cx_a, |p, cx| {
1673                p.find_or_create_local_worktree("/dir", true, cx)
1674            })
1675            .await
1676            .unwrap();
1677        worktree_a
1678            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1679            .await;
1680        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1681        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1682        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1683
1684        // Join that project as client B
1685        let project_b = Project::remote(
1686            project_id,
1687            client_b.clone(),
1688            client_b.user_store.clone(),
1689            lang_registry.clone(),
1690            fs.clone(),
1691            &mut cx_b.to_async(),
1692        )
1693        .await
1694        .unwrap();
1695
1696        // Open a buffer as client A
1697        let buffer_a = project_a
1698            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1699            .await
1700            .unwrap();
1701
1702        // Start opening the same buffer as client B
1703        let buffer_b = cx_b
1704            .background()
1705            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1706
1707        // Edit the buffer as client A while client B is still opening it.
1708        cx_b.background().simulate_random_delay().await;
1709        buffer_a.update(cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1710        cx_b.background().simulate_random_delay().await;
1711        buffer_a.update(cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1712
1713        let text = buffer_a.read_with(cx_a, |buf, _| buf.text());
1714        let buffer_b = buffer_b.await.unwrap();
1715        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1716    }
1717
1718    #[gpui::test(iterations = 10)]
1719    async fn test_leaving_worktree_while_opening_buffer(
1720        cx_a: &mut TestAppContext,
1721        cx_b: &mut TestAppContext,
1722    ) {
1723        cx_a.foreground().forbid_parking();
1724        let lang_registry = Arc::new(LanguageRegistry::test());
1725        let fs = FakeFs::new(cx_a.background());
1726
1727        // Connect to a server as 2 clients.
1728        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1729        let client_a = server.create_client(cx_a, "user_a").await;
1730        let client_b = server.create_client(cx_b, "user_b").await;
1731
1732        // Share a project as client A
1733        fs.insert_tree(
1734            "/dir",
1735            json!({
1736                ".zed.toml": r#"collaborators = ["user_b"]"#,
1737                "a.txt": "a-contents",
1738            }),
1739        )
1740        .await;
1741        let project_a = cx_a.update(|cx| {
1742            Project::local(
1743                client_a.clone(),
1744                client_a.user_store.clone(),
1745                lang_registry.clone(),
1746                fs.clone(),
1747                cx,
1748            )
1749        });
1750        let (worktree_a, _) = project_a
1751            .update(cx_a, |p, cx| {
1752                p.find_or_create_local_worktree("/dir", true, cx)
1753            })
1754            .await
1755            .unwrap();
1756        worktree_a
1757            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1758            .await;
1759        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1760        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1761        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1762
1763        // Join that project as client B
1764        let project_b = Project::remote(
1765            project_id,
1766            client_b.clone(),
1767            client_b.user_store.clone(),
1768            lang_registry.clone(),
1769            fs.clone(),
1770            &mut cx_b.to_async(),
1771        )
1772        .await
1773        .unwrap();
1774
1775        // See that a guest has joined as client A.
1776        project_a
1777            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1778            .await;
1779
1780        // Begin opening a buffer as client B, but leave the project before the open completes.
1781        let buffer_b = cx_b
1782            .background()
1783            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1784        cx_b.update(|_| drop(project_b));
1785        drop(buffer_b);
1786
1787        // See that the guest has left.
1788        project_a
1789            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1790            .await;
1791    }
1792
1793    #[gpui::test(iterations = 10)]
1794    async fn test_leaving_project(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
1795        cx_a.foreground().forbid_parking();
1796        let lang_registry = Arc::new(LanguageRegistry::test());
1797        let fs = FakeFs::new(cx_a.background());
1798
1799        // Connect to a server as 2 clients.
1800        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1801        let client_a = server.create_client(cx_a, "user_a").await;
1802        let client_b = server.create_client(cx_b, "user_b").await;
1803
1804        // Share a project as client A
1805        fs.insert_tree(
1806            "/a",
1807            json!({
1808                ".zed.toml": r#"collaborators = ["user_b"]"#,
1809                "a.txt": "a-contents",
1810                "b.txt": "b-contents",
1811            }),
1812        )
1813        .await;
1814        let project_a = cx_a.update(|cx| {
1815            Project::local(
1816                client_a.clone(),
1817                client_a.user_store.clone(),
1818                lang_registry.clone(),
1819                fs.clone(),
1820                cx,
1821            )
1822        });
1823        let (worktree_a, _) = project_a
1824            .update(cx_a, |p, cx| {
1825                p.find_or_create_local_worktree("/a", true, cx)
1826            })
1827            .await
1828            .unwrap();
1829        worktree_a
1830            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1831            .await;
1832        let project_id = project_a
1833            .update(cx_a, |project, _| project.next_remote_id())
1834            .await;
1835        project_a
1836            .update(cx_a, |project, cx| project.share(cx))
1837            .await
1838            .unwrap();
1839
1840        // Join that project as client B
1841        let _project_b = Project::remote(
1842            project_id,
1843            client_b.clone(),
1844            client_b.user_store.clone(),
1845            lang_registry.clone(),
1846            fs.clone(),
1847            &mut cx_b.to_async(),
1848        )
1849        .await
1850        .unwrap();
1851
1852        // Client A sees that a guest has joined.
1853        project_a
1854            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1855            .await;
1856
1857        // Drop client B's connection and ensure client A observes client B leaving the project.
1858        client_b.disconnect(&cx_b.to_async()).unwrap();
1859        project_a
1860            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1861            .await;
1862
1863        // Rejoin the project as client B
1864        let _project_b = Project::remote(
1865            project_id,
1866            client_b.clone(),
1867            client_b.user_store.clone(),
1868            lang_registry.clone(),
1869            fs.clone(),
1870            &mut cx_b.to_async(),
1871        )
1872        .await
1873        .unwrap();
1874
1875        // Client A sees that a guest has re-joined.
1876        project_a
1877            .condition(cx_a, |p, _| p.collaborators().len() == 1)
1878            .await;
1879
1880        // Simulate connection loss for client B and ensure client A observes client B leaving the project.
1881        client_b.wait_for_current_user(cx_b).await;
1882        server.disconnect_client(client_b.current_user_id(cx_b));
1883        cx_a.foreground().advance_clock(Duration::from_secs(3));
1884        project_a
1885            .condition(cx_a, |p, _| p.collaborators().len() == 0)
1886            .await;
1887    }
1888
1889    #[gpui::test(iterations = 10)]
1890    async fn test_collaborating_with_diagnostics(
1891        cx_a: &mut TestAppContext,
1892        cx_b: &mut TestAppContext,
1893    ) {
1894        cx_a.foreground().forbid_parking();
1895        let mut lang_registry = Arc::new(LanguageRegistry::test());
1896        let fs = FakeFs::new(cx_a.background());
1897
1898        // Set up a fake language server.
1899        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
1900        Arc::get_mut(&mut lang_registry)
1901            .unwrap()
1902            .add(Arc::new(Language::new(
1903                LanguageConfig {
1904                    name: "Rust".into(),
1905                    path_suffixes: vec!["rs".to_string()],
1906                    language_server: Some(language_server_config),
1907                    ..Default::default()
1908                },
1909                Some(tree_sitter_rust::language()),
1910            )));
1911
1912        // Connect to a server as 2 clients.
1913        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
1914        let client_a = server.create_client(cx_a, "user_a").await;
1915        let client_b = server.create_client(cx_b, "user_b").await;
1916
1917        // Share a project as client A
1918        fs.insert_tree(
1919            "/a",
1920            json!({
1921                ".zed.toml": r#"collaborators = ["user_b"]"#,
1922                "a.rs": "let one = two",
1923                "other.rs": "",
1924            }),
1925        )
1926        .await;
1927        let project_a = cx_a.update(|cx| {
1928            Project::local(
1929                client_a.clone(),
1930                client_a.user_store.clone(),
1931                lang_registry.clone(),
1932                fs.clone(),
1933                cx,
1934            )
1935        });
1936        let (worktree_a, _) = project_a
1937            .update(cx_a, |p, cx| {
1938                p.find_or_create_local_worktree("/a", true, cx)
1939            })
1940            .await
1941            .unwrap();
1942        worktree_a
1943            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1944            .await;
1945        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
1946        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
1947        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
1948
1949        // Cause the language server to start.
1950        let _ = cx_a
1951            .background()
1952            .spawn(project_a.update(cx_a, |project, cx| {
1953                project.open_buffer(
1954                    ProjectPath {
1955                        worktree_id,
1956                        path: Path::new("other.rs").into(),
1957                    },
1958                    cx,
1959                )
1960            }))
1961            .await
1962            .unwrap();
1963
1964        // Simulate a language server reporting errors for a file.
1965        let mut fake_language_server = fake_language_servers.next().await.unwrap();
1966        fake_language_server
1967            .receive_notification::<lsp::notification::DidOpenTextDocument>()
1968            .await;
1969        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
1970            lsp::PublishDiagnosticsParams {
1971                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
1972                version: None,
1973                diagnostics: vec![lsp::Diagnostic {
1974                    severity: Some(lsp::DiagnosticSeverity::ERROR),
1975                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
1976                    message: "message 1".to_string(),
1977                    ..Default::default()
1978                }],
1979            },
1980        );
1981
1982        // Wait for server to see the diagnostics update.
1983        server
1984            .condition(|store| {
1985                let worktree = store
1986                    .project(project_id)
1987                    .unwrap()
1988                    .share
1989                    .as_ref()
1990                    .unwrap()
1991                    .worktrees
1992                    .get(&worktree_id.to_proto())
1993                    .unwrap();
1994
1995                !worktree.diagnostic_summaries.is_empty()
1996            })
1997            .await;
1998
1999        // Join the worktree as client B.
2000        let project_b = Project::remote(
2001            project_id,
2002            client_b.clone(),
2003            client_b.user_store.clone(),
2004            lang_registry.clone(),
2005            fs.clone(),
2006            &mut cx_b.to_async(),
2007        )
2008        .await
2009        .unwrap();
2010
2011        project_b.read_with(cx_b, |project, cx| {
2012            assert_eq!(
2013                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2014                &[(
2015                    ProjectPath {
2016                        worktree_id,
2017                        path: Arc::from(Path::new("a.rs")),
2018                    },
2019                    DiagnosticSummary {
2020                        error_count: 1,
2021                        warning_count: 0,
2022                        ..Default::default()
2023                    },
2024                )]
2025            )
2026        });
2027
2028        // Simulate a language server reporting more errors for a file.
2029        fake_language_server.notify::<lsp::notification::PublishDiagnostics>(
2030            lsp::PublishDiagnosticsParams {
2031                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2032                version: None,
2033                diagnostics: vec![
2034                    lsp::Diagnostic {
2035                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2036                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2037                        message: "message 1".to_string(),
2038                        ..Default::default()
2039                    },
2040                    lsp::Diagnostic {
2041                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2042                        range: lsp::Range::new(
2043                            lsp::Position::new(0, 10),
2044                            lsp::Position::new(0, 13),
2045                        ),
2046                        message: "message 2".to_string(),
2047                        ..Default::default()
2048                    },
2049                ],
2050            },
2051        );
2052
2053        // Client b gets the updated summaries
2054        project_b
2055            .condition(&cx_b, |project, cx| {
2056                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2057                    == &[(
2058                        ProjectPath {
2059                            worktree_id,
2060                            path: Arc::from(Path::new("a.rs")),
2061                        },
2062                        DiagnosticSummary {
2063                            error_count: 1,
2064                            warning_count: 1,
2065                            ..Default::default()
2066                        },
2067                    )]
2068            })
2069            .await;
2070
2071        // Open the file with the errors on client B. They should be present.
2072        let buffer_b = cx_b
2073            .background()
2074            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2075            .await
2076            .unwrap();
2077
2078        buffer_b.read_with(cx_b, |buffer, _| {
2079            assert_eq!(
2080                buffer
2081                    .snapshot()
2082                    .diagnostics_in_range::<_, Point>(0..buffer.len(), false)
2083                    .map(|entry| entry)
2084                    .collect::<Vec<_>>(),
2085                &[
2086                    DiagnosticEntry {
2087                        range: Point::new(0, 4)..Point::new(0, 7),
2088                        diagnostic: Diagnostic {
2089                            group_id: 0,
2090                            message: "message 1".to_string(),
2091                            severity: lsp::DiagnosticSeverity::ERROR,
2092                            is_primary: true,
2093                            ..Default::default()
2094                        }
2095                    },
2096                    DiagnosticEntry {
2097                        range: Point::new(0, 10)..Point::new(0, 13),
2098                        diagnostic: Diagnostic {
2099                            group_id: 1,
2100                            severity: lsp::DiagnosticSeverity::WARNING,
2101                            message: "message 2".to_string(),
2102                            is_primary: true,
2103                            ..Default::default()
2104                        }
2105                    }
2106                ]
2107            );
2108        });
2109    }
2110
2111    #[gpui::test(iterations = 10)]
2112    async fn test_collaborating_with_completion(
2113        cx_a: &mut TestAppContext,
2114        cx_b: &mut TestAppContext,
2115    ) {
2116        cx_a.foreground().forbid_parking();
2117        let mut lang_registry = Arc::new(LanguageRegistry::test());
2118        let fs = FakeFs::new(cx_a.background());
2119
2120        // Set up a fake language server.
2121        let (mut language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2122        language_server_config.set_fake_capabilities(lsp::ServerCapabilities {
2123            completion_provider: Some(lsp::CompletionOptions {
2124                trigger_characters: Some(vec![".".to_string()]),
2125                ..Default::default()
2126            }),
2127            ..Default::default()
2128        });
2129        Arc::get_mut(&mut lang_registry)
2130            .unwrap()
2131            .add(Arc::new(Language::new(
2132                LanguageConfig {
2133                    name: "Rust".into(),
2134                    path_suffixes: vec!["rs".to_string()],
2135                    language_server: Some(language_server_config),
2136                    ..Default::default()
2137                },
2138                Some(tree_sitter_rust::language()),
2139            )));
2140
2141        // Connect to a server as 2 clients.
2142        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2143        let client_a = server.create_client(cx_a, "user_a").await;
2144        let client_b = server.create_client(cx_b, "user_b").await;
2145
2146        // Share a project as client A
2147        fs.insert_tree(
2148            "/a",
2149            json!({
2150                ".zed.toml": r#"collaborators = ["user_b"]"#,
2151                "main.rs": "fn main() { a }",
2152                "other.rs": "",
2153            }),
2154        )
2155        .await;
2156        let project_a = cx_a.update(|cx| {
2157            Project::local(
2158                client_a.clone(),
2159                client_a.user_store.clone(),
2160                lang_registry.clone(),
2161                fs.clone(),
2162                cx,
2163            )
2164        });
2165        let (worktree_a, _) = project_a
2166            .update(cx_a, |p, cx| {
2167                p.find_or_create_local_worktree("/a", true, cx)
2168            })
2169            .await
2170            .unwrap();
2171        worktree_a
2172            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2173            .await;
2174        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2175        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2176        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2177
2178        // Join the worktree as client B.
2179        let project_b = Project::remote(
2180            project_id,
2181            client_b.clone(),
2182            client_b.user_store.clone(),
2183            lang_registry.clone(),
2184            fs.clone(),
2185            &mut cx_b.to_async(),
2186        )
2187        .await
2188        .unwrap();
2189
2190        // Open a file in an editor as the guest.
2191        let buffer_b = project_b
2192            .update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2193            .await
2194            .unwrap();
2195        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2196        let editor_b = cx_b.add_view(window_b, |cx| {
2197            Editor::for_buffer(buffer_b.clone(), Some(project_b.clone()), cx)
2198        });
2199
2200        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2201        buffer_b
2202            .condition(&cx_b, |buffer, _| !buffer.completion_triggers().is_empty())
2203            .await;
2204
2205        // Type a completion trigger character as the guest.
2206        editor_b.update(cx_b, |editor, cx| {
2207            editor.select_ranges([13..13], None, cx);
2208            editor.handle_input(&Input(".".into()), cx);
2209            cx.focus(&editor_b);
2210        });
2211
2212        // Receive a completion request as the host's language server.
2213        // Return some completions from the host's language server.
2214        cx_a.foreground().start_waiting();
2215        fake_language_server
2216            .handle_request::<lsp::request::Completion, _>(|params, _| {
2217                assert_eq!(
2218                    params.text_document_position.text_document.uri,
2219                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2220                );
2221                assert_eq!(
2222                    params.text_document_position.position,
2223                    lsp::Position::new(0, 14),
2224                );
2225
2226                Some(lsp::CompletionResponse::Array(vec![
2227                    lsp::CompletionItem {
2228                        label: "first_method(…)".into(),
2229                        detail: Some("fn(&mut self, B) -> C".into()),
2230                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2231                            new_text: "first_method($1)".to_string(),
2232                            range: lsp::Range::new(
2233                                lsp::Position::new(0, 14),
2234                                lsp::Position::new(0, 14),
2235                            ),
2236                        })),
2237                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2238                        ..Default::default()
2239                    },
2240                    lsp::CompletionItem {
2241                        label: "second_method(…)".into(),
2242                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2243                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2244                            new_text: "second_method()".to_string(),
2245                            range: lsp::Range::new(
2246                                lsp::Position::new(0, 14),
2247                                lsp::Position::new(0, 14),
2248                            ),
2249                        })),
2250                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2251                        ..Default::default()
2252                    },
2253                ]))
2254            })
2255            .next()
2256            .await
2257            .unwrap();
2258        cx_a.foreground().finish_waiting();
2259
2260        // Open the buffer on the host.
2261        let buffer_a = project_a
2262            .update(cx_a, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx))
2263            .await
2264            .unwrap();
2265        buffer_a
2266            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2267            .await;
2268
2269        // Confirm a completion on the guest.
2270        editor_b
2271            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2272            .await;
2273        editor_b.update(cx_b, |editor, cx| {
2274            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2275            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2276        });
2277
2278        // Return a resolved completion from the host's language server.
2279        // The resolved completion has an additional text edit.
2280        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(
2281            |params, _| {
2282                assert_eq!(params.label, "first_method(…)");
2283                lsp::CompletionItem {
2284                    label: "first_method(…)".into(),
2285                    detail: Some("fn(&mut self, B) -> C".into()),
2286                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2287                        new_text: "first_method($1)".to_string(),
2288                        range: lsp::Range::new(
2289                            lsp::Position::new(0, 14),
2290                            lsp::Position::new(0, 14),
2291                        ),
2292                    })),
2293                    additional_text_edits: Some(vec![lsp::TextEdit {
2294                        new_text: "use d::SomeTrait;\n".to_string(),
2295                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2296                    }]),
2297                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2298                    ..Default::default()
2299                }
2300            },
2301        );
2302
2303        // The additional edit is applied.
2304        buffer_a
2305            .condition(&cx_a, |buffer, _| {
2306                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2307            })
2308            .await;
2309        buffer_b
2310            .condition(&cx_b, |buffer, _| {
2311                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2312            })
2313            .await;
2314    }
2315
2316    #[gpui::test(iterations = 10)]
2317    async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2318        cx_a.foreground().forbid_parking();
2319        let mut lang_registry = Arc::new(LanguageRegistry::test());
2320        let fs = FakeFs::new(cx_a.background());
2321
2322        // Set up a fake language server.
2323        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2324        Arc::get_mut(&mut lang_registry)
2325            .unwrap()
2326            .add(Arc::new(Language::new(
2327                LanguageConfig {
2328                    name: "Rust".into(),
2329                    path_suffixes: vec!["rs".to_string()],
2330                    language_server: Some(language_server_config),
2331                    ..Default::default()
2332                },
2333                Some(tree_sitter_rust::language()),
2334            )));
2335
2336        // Connect to a server as 2 clients.
2337        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2338        let client_a = server.create_client(cx_a, "user_a").await;
2339        let client_b = server.create_client(cx_b, "user_b").await;
2340
2341        // Share a project as client A
2342        fs.insert_tree(
2343            "/a",
2344            json!({
2345                ".zed.toml": r#"collaborators = ["user_b"]"#,
2346                "a.rs": "let one = two",
2347            }),
2348        )
2349        .await;
2350        let project_a = cx_a.update(|cx| {
2351            Project::local(
2352                client_a.clone(),
2353                client_a.user_store.clone(),
2354                lang_registry.clone(),
2355                fs.clone(),
2356                cx,
2357            )
2358        });
2359        let (worktree_a, _) = project_a
2360            .update(cx_a, |p, cx| {
2361                p.find_or_create_local_worktree("/a", true, cx)
2362            })
2363            .await
2364            .unwrap();
2365        worktree_a
2366            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2367            .await;
2368        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2369        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2370        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2371
2372        // Join the worktree as client B.
2373        let project_b = Project::remote(
2374            project_id,
2375            client_b.clone(),
2376            client_b.user_store.clone(),
2377            lang_registry.clone(),
2378            fs.clone(),
2379            &mut cx_b.to_async(),
2380        )
2381        .await
2382        .unwrap();
2383
2384        let buffer_b = cx_b
2385            .background()
2386            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2387            .await
2388            .unwrap();
2389
2390        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2391        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_, _| {
2392            Some(vec![
2393                lsp::TextEdit {
2394                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2395                    new_text: "h".to_string(),
2396                },
2397                lsp::TextEdit {
2398                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2399                    new_text: "y".to_string(),
2400                },
2401            ])
2402        });
2403
2404        project_b
2405            .update(cx_b, |project, cx| {
2406                project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2407            })
2408            .await
2409            .unwrap();
2410        assert_eq!(
2411            buffer_b.read_with(cx_b, |buffer, _| buffer.text()),
2412            "let honey = two"
2413        );
2414    }
2415
2416    #[gpui::test(iterations = 10)]
2417    async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2418        cx_a.foreground().forbid_parking();
2419        let mut lang_registry = Arc::new(LanguageRegistry::test());
2420        let fs = FakeFs::new(cx_a.background());
2421        fs.insert_tree(
2422            "/root-1",
2423            json!({
2424                ".zed.toml": r#"collaborators = ["user_b"]"#,
2425                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2426            }),
2427        )
2428        .await;
2429        fs.insert_tree(
2430            "/root-2",
2431            json!({
2432                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2433            }),
2434        )
2435        .await;
2436
2437        // Set up a fake language server.
2438        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2439        Arc::get_mut(&mut lang_registry)
2440            .unwrap()
2441            .add(Arc::new(Language::new(
2442                LanguageConfig {
2443                    name: "Rust".into(),
2444                    path_suffixes: vec!["rs".to_string()],
2445                    language_server: Some(language_server_config),
2446                    ..Default::default()
2447                },
2448                Some(tree_sitter_rust::language()),
2449            )));
2450
2451        // Connect to a server as 2 clients.
2452        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2453        let client_a = server.create_client(cx_a, "user_a").await;
2454        let client_b = server.create_client(cx_b, "user_b").await;
2455
2456        // Share a project as client A
2457        let project_a = cx_a.update(|cx| {
2458            Project::local(
2459                client_a.clone(),
2460                client_a.user_store.clone(),
2461                lang_registry.clone(),
2462                fs.clone(),
2463                cx,
2464            )
2465        });
2466        let (worktree_a, _) = project_a
2467            .update(cx_a, |p, cx| {
2468                p.find_or_create_local_worktree("/root-1", true, cx)
2469            })
2470            .await
2471            .unwrap();
2472        worktree_a
2473            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2474            .await;
2475        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2476        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2477        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2478
2479        // Join the worktree as client B.
2480        let project_b = Project::remote(
2481            project_id,
2482            client_b.clone(),
2483            client_b.user_store.clone(),
2484            lang_registry.clone(),
2485            fs.clone(),
2486            &mut cx_b.to_async(),
2487        )
2488        .await
2489        .unwrap();
2490
2491        // Open the file on client B.
2492        let buffer_b = cx_b
2493            .background()
2494            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2495            .await
2496            .unwrap();
2497
2498        // Request the definition of a symbol as the guest.
2499        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2500        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2501            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2502                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2503                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2504            )))
2505        });
2506
2507        let definitions_1 = project_b
2508            .update(cx_b, |p, cx| p.definition(&buffer_b, 23, cx))
2509            .await
2510            .unwrap();
2511        cx_b.read(|cx| {
2512            assert_eq!(definitions_1.len(), 1);
2513            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2514            let target_buffer = definitions_1[0].buffer.read(cx);
2515            assert_eq!(
2516                target_buffer.text(),
2517                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2518            );
2519            assert_eq!(
2520                definitions_1[0].range.to_point(target_buffer),
2521                Point::new(0, 6)..Point::new(0, 9)
2522            );
2523        });
2524
2525        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2526        // the previous call to `definition`.
2527        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
2528            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2529                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2530                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2531            )))
2532        });
2533
2534        let definitions_2 = project_b
2535            .update(cx_b, |p, cx| p.definition(&buffer_b, 33, cx))
2536            .await
2537            .unwrap();
2538        cx_b.read(|cx| {
2539            assert_eq!(definitions_2.len(), 1);
2540            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2541            let target_buffer = definitions_2[0].buffer.read(cx);
2542            assert_eq!(
2543                target_buffer.text(),
2544                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2545            );
2546            assert_eq!(
2547                definitions_2[0].range.to_point(target_buffer),
2548                Point::new(1, 6)..Point::new(1, 11)
2549            );
2550        });
2551        assert_eq!(definitions_1[0].buffer, definitions_2[0].buffer);
2552    }
2553
2554    #[gpui::test(iterations = 10)]
2555    async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2556        cx_a.foreground().forbid_parking();
2557        let mut lang_registry = Arc::new(LanguageRegistry::test());
2558        let fs = FakeFs::new(cx_a.background());
2559        fs.insert_tree(
2560            "/root-1",
2561            json!({
2562                ".zed.toml": r#"collaborators = ["user_b"]"#,
2563                "one.rs": "const ONE: usize = 1;",
2564                "two.rs": "const TWO: usize = one::ONE + one::ONE;",
2565            }),
2566        )
2567        .await;
2568        fs.insert_tree(
2569            "/root-2",
2570            json!({
2571                "three.rs": "const THREE: usize = two::TWO + one::ONE;",
2572            }),
2573        )
2574        .await;
2575
2576        // Set up a fake language server.
2577        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2578        Arc::get_mut(&mut lang_registry)
2579            .unwrap()
2580            .add(Arc::new(Language::new(
2581                LanguageConfig {
2582                    name: "Rust".into(),
2583                    path_suffixes: vec!["rs".to_string()],
2584                    language_server: Some(language_server_config),
2585                    ..Default::default()
2586                },
2587                Some(tree_sitter_rust::language()),
2588            )));
2589
2590        // Connect to a server as 2 clients.
2591        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2592        let client_a = server.create_client(cx_a, "user_a").await;
2593        let client_b = server.create_client(cx_b, "user_b").await;
2594
2595        // Share a project as client A
2596        let project_a = cx_a.update(|cx| {
2597            Project::local(
2598                client_a.clone(),
2599                client_a.user_store.clone(),
2600                lang_registry.clone(),
2601                fs.clone(),
2602                cx,
2603            )
2604        });
2605        let (worktree_a, _) = project_a
2606            .update(cx_a, |p, cx| {
2607                p.find_or_create_local_worktree("/root-1", true, cx)
2608            })
2609            .await
2610            .unwrap();
2611        worktree_a
2612            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2613            .await;
2614        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2615        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2616        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2617
2618        // Join the worktree as client B.
2619        let project_b = Project::remote(
2620            project_id,
2621            client_b.clone(),
2622            client_b.user_store.clone(),
2623            lang_registry.clone(),
2624            fs.clone(),
2625            &mut cx_b.to_async(),
2626        )
2627        .await
2628        .unwrap();
2629
2630        // Open the file on client B.
2631        let buffer_b = cx_b
2632            .background()
2633            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
2634            .await
2635            .unwrap();
2636
2637        // Request references to a symbol as the guest.
2638        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2639        fake_language_server.handle_request::<lsp::request::References, _>(|params, _| {
2640            assert_eq!(
2641                params.text_document_position.text_document.uri.as_str(),
2642                "file:///root-1/one.rs"
2643            );
2644            Some(vec![
2645                lsp::Location {
2646                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2647                    range: lsp::Range::new(lsp::Position::new(0, 24), lsp::Position::new(0, 27)),
2648                },
2649                lsp::Location {
2650                    uri: lsp::Url::from_file_path("/root-1/two.rs").unwrap(),
2651                    range: lsp::Range::new(lsp::Position::new(0, 35), lsp::Position::new(0, 38)),
2652                },
2653                lsp::Location {
2654                    uri: lsp::Url::from_file_path("/root-2/three.rs").unwrap(),
2655                    range: lsp::Range::new(lsp::Position::new(0, 37), lsp::Position::new(0, 40)),
2656                },
2657            ])
2658        });
2659
2660        let references = project_b
2661            .update(cx_b, |p, cx| p.references(&buffer_b, 7, cx))
2662            .await
2663            .unwrap();
2664        cx_b.read(|cx| {
2665            assert_eq!(references.len(), 3);
2666            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2667
2668            let two_buffer = references[0].buffer.read(cx);
2669            let three_buffer = references[2].buffer.read(cx);
2670            assert_eq!(
2671                two_buffer.file().unwrap().path().as_ref(),
2672                Path::new("two.rs")
2673            );
2674            assert_eq!(references[1].buffer, references[0].buffer);
2675            assert_eq!(
2676                three_buffer.file().unwrap().full_path(cx),
2677                Path::new("three.rs")
2678            );
2679
2680            assert_eq!(references[0].range.to_offset(&two_buffer), 24..27);
2681            assert_eq!(references[1].range.to_offset(&two_buffer), 35..38);
2682            assert_eq!(references[2].range.to_offset(&three_buffer), 37..40);
2683        });
2684    }
2685
2686    #[gpui::test(iterations = 10)]
2687    async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2688        cx_a.foreground().forbid_parking();
2689        let lang_registry = Arc::new(LanguageRegistry::test());
2690        let fs = FakeFs::new(cx_a.background());
2691        fs.insert_tree(
2692            "/root-1",
2693            json!({
2694                ".zed.toml": r#"collaborators = ["user_b"]"#,
2695                "a": "hello world",
2696                "b": "goodnight moon",
2697                "c": "a world of goo",
2698                "d": "world champion of clown world",
2699            }),
2700        )
2701        .await;
2702        fs.insert_tree(
2703            "/root-2",
2704            json!({
2705                "e": "disney world is fun",
2706            }),
2707        )
2708        .await;
2709
2710        // Connect to a server as 2 clients.
2711        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2712        let client_a = server.create_client(cx_a, "user_a").await;
2713        let client_b = server.create_client(cx_b, "user_b").await;
2714
2715        // Share a project as client A
2716        let project_a = cx_a.update(|cx| {
2717            Project::local(
2718                client_a.clone(),
2719                client_a.user_store.clone(),
2720                lang_registry.clone(),
2721                fs.clone(),
2722                cx,
2723            )
2724        });
2725        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2726
2727        let (worktree_1, _) = project_a
2728            .update(cx_a, |p, cx| {
2729                p.find_or_create_local_worktree("/root-1", true, cx)
2730            })
2731            .await
2732            .unwrap();
2733        worktree_1
2734            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2735            .await;
2736        let (worktree_2, _) = project_a
2737            .update(cx_a, |p, cx| {
2738                p.find_or_create_local_worktree("/root-2", true, cx)
2739            })
2740            .await
2741            .unwrap();
2742        worktree_2
2743            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2744            .await;
2745
2746        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2747
2748        // Join the worktree as client B.
2749        let project_b = Project::remote(
2750            project_id,
2751            client_b.clone(),
2752            client_b.user_store.clone(),
2753            lang_registry.clone(),
2754            fs.clone(),
2755            &mut cx_b.to_async(),
2756        )
2757        .await
2758        .unwrap();
2759
2760        let results = project_b
2761            .update(cx_b, |project, cx| {
2762                project.search(SearchQuery::text("world", false, false), cx)
2763            })
2764            .await
2765            .unwrap();
2766
2767        let mut ranges_by_path = results
2768            .into_iter()
2769            .map(|(buffer, ranges)| {
2770                buffer.read_with(cx_b, |buffer, cx| {
2771                    let path = buffer.file().unwrap().full_path(cx);
2772                    let offset_ranges = ranges
2773                        .into_iter()
2774                        .map(|range| range.to_offset(buffer))
2775                        .collect::<Vec<_>>();
2776                    (path, offset_ranges)
2777                })
2778            })
2779            .collect::<Vec<_>>();
2780        ranges_by_path.sort_by_key(|(path, _)| path.clone());
2781
2782        assert_eq!(
2783            ranges_by_path,
2784            &[
2785                (PathBuf::from("root-1/a"), vec![6..11]),
2786                (PathBuf::from("root-1/c"), vec![2..7]),
2787                (PathBuf::from("root-1/d"), vec![0..5, 24..29]),
2788                (PathBuf::from("root-2/e"), vec![7..12]),
2789            ]
2790        );
2791    }
2792
2793    #[gpui::test(iterations = 10)]
2794    async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2795        cx_a.foreground().forbid_parking();
2796        let lang_registry = Arc::new(LanguageRegistry::test());
2797        let fs = FakeFs::new(cx_a.background());
2798        fs.insert_tree(
2799            "/root-1",
2800            json!({
2801                ".zed.toml": r#"collaborators = ["user_b"]"#,
2802                "main.rs": "fn double(number: i32) -> i32 { number + number }",
2803            }),
2804        )
2805        .await;
2806
2807        // Set up a fake language server.
2808        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2809        lang_registry.add(Arc::new(Language::new(
2810            LanguageConfig {
2811                name: "Rust".into(),
2812                path_suffixes: vec!["rs".to_string()],
2813                language_server: Some(language_server_config),
2814                ..Default::default()
2815            },
2816            Some(tree_sitter_rust::language()),
2817        )));
2818
2819        // Connect to a server as 2 clients.
2820        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2821        let client_a = server.create_client(cx_a, "user_a").await;
2822        let client_b = server.create_client(cx_b, "user_b").await;
2823
2824        // Share a project as client A
2825        let project_a = cx_a.update(|cx| {
2826            Project::local(
2827                client_a.clone(),
2828                client_a.user_store.clone(),
2829                lang_registry.clone(),
2830                fs.clone(),
2831                cx,
2832            )
2833        });
2834        let (worktree_a, _) = project_a
2835            .update(cx_a, |p, cx| {
2836                p.find_or_create_local_worktree("/root-1", true, cx)
2837            })
2838            .await
2839            .unwrap();
2840        worktree_a
2841            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2842            .await;
2843        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2844        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2845        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2846
2847        // Join the worktree as client B.
2848        let project_b = Project::remote(
2849            project_id,
2850            client_b.clone(),
2851            client_b.user_store.clone(),
2852            lang_registry.clone(),
2853            fs.clone(),
2854            &mut cx_b.to_async(),
2855        )
2856        .await
2857        .unwrap();
2858
2859        // Open the file on client B.
2860        let buffer_b = cx_b
2861            .background()
2862            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "main.rs"), cx)))
2863            .await
2864            .unwrap();
2865
2866        // Request document highlights as the guest.
2867        let mut fake_language_server = fake_language_servers.next().await.unwrap();
2868        fake_language_server.handle_request::<lsp::request::DocumentHighlightRequest, _>(
2869            |params, _| {
2870                assert_eq!(
2871                    params
2872                        .text_document_position_params
2873                        .text_document
2874                        .uri
2875                        .as_str(),
2876                    "file:///root-1/main.rs"
2877                );
2878                assert_eq!(
2879                    params.text_document_position_params.position,
2880                    lsp::Position::new(0, 34)
2881                );
2882                Some(vec![
2883                    lsp::DocumentHighlight {
2884                        kind: Some(lsp::DocumentHighlightKind::WRITE),
2885                        range: lsp::Range::new(
2886                            lsp::Position::new(0, 10),
2887                            lsp::Position::new(0, 16),
2888                        ),
2889                    },
2890                    lsp::DocumentHighlight {
2891                        kind: Some(lsp::DocumentHighlightKind::READ),
2892                        range: lsp::Range::new(
2893                            lsp::Position::new(0, 32),
2894                            lsp::Position::new(0, 38),
2895                        ),
2896                    },
2897                    lsp::DocumentHighlight {
2898                        kind: Some(lsp::DocumentHighlightKind::READ),
2899                        range: lsp::Range::new(
2900                            lsp::Position::new(0, 41),
2901                            lsp::Position::new(0, 47),
2902                        ),
2903                    },
2904                ])
2905            },
2906        );
2907
2908        let highlights = project_b
2909            .update(cx_b, |p, cx| p.document_highlights(&buffer_b, 34, cx))
2910            .await
2911            .unwrap();
2912        buffer_b.read_with(cx_b, |buffer, _| {
2913            let snapshot = buffer.snapshot();
2914
2915            let highlights = highlights
2916                .into_iter()
2917                .map(|highlight| (highlight.kind, highlight.range.to_offset(&snapshot)))
2918                .collect::<Vec<_>>();
2919            assert_eq!(
2920                highlights,
2921                &[
2922                    (lsp::DocumentHighlightKind::WRITE, 10..16),
2923                    (lsp::DocumentHighlightKind::READ, 32..38),
2924                    (lsp::DocumentHighlightKind::READ, 41..47)
2925                ]
2926            )
2927        });
2928    }
2929
2930    #[gpui::test(iterations = 10)]
2931    async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
2932        cx_a.foreground().forbid_parking();
2933        let mut lang_registry = Arc::new(LanguageRegistry::test());
2934        let fs = FakeFs::new(cx_a.background());
2935        fs.insert_tree(
2936            "/code",
2937            json!({
2938                "crate-1": {
2939                    ".zed.toml": r#"collaborators = ["user_b"]"#,
2940                    "one.rs": "const ONE: usize = 1;",
2941                },
2942                "crate-2": {
2943                    "two.rs": "const TWO: usize = 2; const THREE: usize = 3;",
2944                },
2945                "private": {
2946                    "passwords.txt": "the-password",
2947                }
2948            }),
2949        )
2950        .await;
2951
2952        // Set up a fake language server.
2953        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
2954        Arc::get_mut(&mut lang_registry)
2955            .unwrap()
2956            .add(Arc::new(Language::new(
2957                LanguageConfig {
2958                    name: "Rust".into(),
2959                    path_suffixes: vec!["rs".to_string()],
2960                    language_server: Some(language_server_config),
2961                    ..Default::default()
2962                },
2963                Some(tree_sitter_rust::language()),
2964            )));
2965
2966        // Connect to a server as 2 clients.
2967        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
2968        let client_a = server.create_client(cx_a, "user_a").await;
2969        let client_b = server.create_client(cx_b, "user_b").await;
2970
2971        // Share a project as client A
2972        let project_a = cx_a.update(|cx| {
2973            Project::local(
2974                client_a.clone(),
2975                client_a.user_store.clone(),
2976                lang_registry.clone(),
2977                fs.clone(),
2978                cx,
2979            )
2980        });
2981        let (worktree_a, _) = project_a
2982            .update(cx_a, |p, cx| {
2983                p.find_or_create_local_worktree("/code/crate-1", true, cx)
2984            })
2985            .await
2986            .unwrap();
2987        worktree_a
2988            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2989            .await;
2990        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
2991        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
2992        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
2993
2994        // Join the worktree as client B.
2995        let project_b = Project::remote(
2996            project_id,
2997            client_b.clone(),
2998            client_b.user_store.clone(),
2999            lang_registry.clone(),
3000            fs.clone(),
3001            &mut cx_b.to_async(),
3002        )
3003        .await
3004        .unwrap();
3005
3006        // Cause the language server to start.
3007        let _buffer = cx_b
3008            .background()
3009            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "one.rs"), cx)))
3010            .await
3011            .unwrap();
3012
3013        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3014        fake_language_server.handle_request::<lsp::request::WorkspaceSymbol, _>(|_, _| {
3015            #[allow(deprecated)]
3016            Some(vec![lsp::SymbolInformation {
3017                name: "TWO".into(),
3018                location: lsp::Location {
3019                    uri: lsp::Url::from_file_path("/code/crate-2/two.rs").unwrap(),
3020                    range: lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3021                },
3022                kind: lsp::SymbolKind::CONSTANT,
3023                tags: None,
3024                container_name: None,
3025                deprecated: None,
3026            }])
3027        });
3028
3029        // Request the definition of a symbol as the guest.
3030        let symbols = project_b
3031            .update(cx_b, |p, cx| p.symbols("two", cx))
3032            .await
3033            .unwrap();
3034        assert_eq!(symbols.len(), 1);
3035        assert_eq!(symbols[0].name, "TWO");
3036
3037        // Open one of the returned symbols.
3038        let buffer_b_2 = project_b
3039            .update(cx_b, |project, cx| {
3040                project.open_buffer_for_symbol(&symbols[0], cx)
3041            })
3042            .await
3043            .unwrap();
3044        buffer_b_2.read_with(cx_b, |buffer, _| {
3045            assert_eq!(
3046                buffer.file().unwrap().path().as_ref(),
3047                Path::new("../crate-2/two.rs")
3048            );
3049        });
3050
3051        // Attempt to craft a symbol and violate host's privacy by opening an arbitrary file.
3052        let mut fake_symbol = symbols[0].clone();
3053        fake_symbol.path = Path::new("/code/secrets").into();
3054        let error = project_b
3055            .update(cx_b, |project, cx| {
3056                project.open_buffer_for_symbol(&fake_symbol, cx)
3057            })
3058            .await
3059            .unwrap_err();
3060        assert!(error.to_string().contains("invalid symbol signature"));
3061    }
3062
3063    #[gpui::test(iterations = 10)]
3064    async fn test_open_buffer_while_getting_definition_pointing_to_it(
3065        cx_a: &mut TestAppContext,
3066        cx_b: &mut TestAppContext,
3067        mut rng: StdRng,
3068    ) {
3069        cx_a.foreground().forbid_parking();
3070        let mut lang_registry = Arc::new(LanguageRegistry::test());
3071        let fs = FakeFs::new(cx_a.background());
3072        fs.insert_tree(
3073            "/root",
3074            json!({
3075                ".zed.toml": r#"collaborators = ["user_b"]"#,
3076                "a.rs": "const ONE: usize = b::TWO;",
3077                "b.rs": "const TWO: usize = 2",
3078            }),
3079        )
3080        .await;
3081
3082        // Set up a fake language server.
3083        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3084
3085        Arc::get_mut(&mut lang_registry)
3086            .unwrap()
3087            .add(Arc::new(Language::new(
3088                LanguageConfig {
3089                    name: "Rust".into(),
3090                    path_suffixes: vec!["rs".to_string()],
3091                    language_server: Some(language_server_config),
3092                    ..Default::default()
3093                },
3094                Some(tree_sitter_rust::language()),
3095            )));
3096
3097        // Connect to a server as 2 clients.
3098        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3099        let client_a = server.create_client(cx_a, "user_a").await;
3100        let client_b = server.create_client(cx_b, "user_b").await;
3101
3102        // Share a project as client A
3103        let project_a = cx_a.update(|cx| {
3104            Project::local(
3105                client_a.clone(),
3106                client_a.user_store.clone(),
3107                lang_registry.clone(),
3108                fs.clone(),
3109                cx,
3110            )
3111        });
3112
3113        let (worktree_a, _) = project_a
3114            .update(cx_a, |p, cx| {
3115                p.find_or_create_local_worktree("/root", true, cx)
3116            })
3117            .await
3118            .unwrap();
3119        worktree_a
3120            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3121            .await;
3122        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3123        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3124        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3125
3126        // Join the worktree as client B.
3127        let project_b = Project::remote(
3128            project_id,
3129            client_b.clone(),
3130            client_b.user_store.clone(),
3131            lang_registry.clone(),
3132            fs.clone(),
3133            &mut cx_b.to_async(),
3134        )
3135        .await
3136        .unwrap();
3137
3138        let buffer_b1 = cx_b
3139            .background()
3140            .spawn(project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
3141            .await
3142            .unwrap();
3143
3144        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3145        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_, _| {
3146            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
3147                lsp::Url::from_file_path("/root/b.rs").unwrap(),
3148                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
3149            )))
3150        });
3151
3152        let definitions;
3153        let buffer_b2;
3154        if rng.gen() {
3155            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3156            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3157        } else {
3158            buffer_b2 = project_b.update(cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
3159            definitions = project_b.update(cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
3160        }
3161
3162        let buffer_b2 = buffer_b2.await.unwrap();
3163        let definitions = definitions.await.unwrap();
3164        assert_eq!(definitions.len(), 1);
3165        assert_eq!(definitions[0].buffer, buffer_b2);
3166    }
3167
3168    #[gpui::test(iterations = 10)]
3169    async fn test_collaborating_with_code_actions(
3170        cx_a: &mut TestAppContext,
3171        cx_b: &mut TestAppContext,
3172    ) {
3173        cx_a.foreground().forbid_parking();
3174        let mut lang_registry = Arc::new(LanguageRegistry::test());
3175        let fs = FakeFs::new(cx_a.background());
3176        cx_b.update(|cx| editor::init(cx));
3177
3178        // Set up a fake language server.
3179        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3180        Arc::get_mut(&mut lang_registry)
3181            .unwrap()
3182            .add(Arc::new(Language::new(
3183                LanguageConfig {
3184                    name: "Rust".into(),
3185                    path_suffixes: vec!["rs".to_string()],
3186                    language_server: Some(language_server_config),
3187                    ..Default::default()
3188                },
3189                Some(tree_sitter_rust::language()),
3190            )));
3191
3192        // Connect to a server as 2 clients.
3193        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3194        let client_a = server.create_client(cx_a, "user_a").await;
3195        let client_b = server.create_client(cx_b, "user_b").await;
3196
3197        // Share a project as client A
3198        fs.insert_tree(
3199            "/a",
3200            json!({
3201                ".zed.toml": r#"collaborators = ["user_b"]"#,
3202                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
3203                "other.rs": "pub fn foo() -> usize { 4 }",
3204            }),
3205        )
3206        .await;
3207        let project_a = cx_a.update(|cx| {
3208            Project::local(
3209                client_a.clone(),
3210                client_a.user_store.clone(),
3211                lang_registry.clone(),
3212                fs.clone(),
3213                cx,
3214            )
3215        });
3216        let (worktree_a, _) = project_a
3217            .update(cx_a, |p, cx| {
3218                p.find_or_create_local_worktree("/a", true, cx)
3219            })
3220            .await
3221            .unwrap();
3222        worktree_a
3223            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3224            .await;
3225        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3226        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3227        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3228
3229        // Join the worktree as client B.
3230        let project_b = Project::remote(
3231            project_id,
3232            client_b.clone(),
3233            client_b.user_store.clone(),
3234            lang_registry.clone(),
3235            fs.clone(),
3236            &mut cx_b.to_async(),
3237        )
3238        .await
3239        .unwrap();
3240        let mut params = cx_b.update(WorkspaceParams::test);
3241        params.languages = lang_registry.clone();
3242        params.client = client_b.client.clone();
3243        params.user_store = client_b.user_store.clone();
3244        params.project = project_b;
3245
3246        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3247        let editor_b = workspace_b
3248            .update(cx_b, |workspace, cx| {
3249                workspace.open_path((worktree_id, "main.rs"), cx)
3250            })
3251            .await
3252            .unwrap()
3253            .downcast::<Editor>()
3254            .unwrap();
3255
3256        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3257        fake_language_server
3258            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3259                assert_eq!(
3260                    params.text_document.uri,
3261                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3262                );
3263                assert_eq!(params.range.start, lsp::Position::new(0, 0));
3264                assert_eq!(params.range.end, lsp::Position::new(0, 0));
3265                None
3266            })
3267            .next()
3268            .await;
3269
3270        // Move cursor to a location that contains code actions.
3271        editor_b.update(cx_b, |editor, cx| {
3272            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
3273            cx.focus(&editor_b);
3274        });
3275
3276        fake_language_server
3277            .handle_request::<lsp::request::CodeActionRequest, _>(|params, _| {
3278                assert_eq!(
3279                    params.text_document.uri,
3280                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
3281                );
3282                assert_eq!(params.range.start, lsp::Position::new(1, 31));
3283                assert_eq!(params.range.end, lsp::Position::new(1, 31));
3284
3285                Some(vec![lsp::CodeActionOrCommand::CodeAction(
3286                    lsp::CodeAction {
3287                        title: "Inline into all callers".to_string(),
3288                        edit: Some(lsp::WorkspaceEdit {
3289                            changes: Some(
3290                                [
3291                                    (
3292                                        lsp::Url::from_file_path("/a/main.rs").unwrap(),
3293                                        vec![lsp::TextEdit::new(
3294                                            lsp::Range::new(
3295                                                lsp::Position::new(1, 22),
3296                                                lsp::Position::new(1, 34),
3297                                            ),
3298                                            "4".to_string(),
3299                                        )],
3300                                    ),
3301                                    (
3302                                        lsp::Url::from_file_path("/a/other.rs").unwrap(),
3303                                        vec![lsp::TextEdit::new(
3304                                            lsp::Range::new(
3305                                                lsp::Position::new(0, 0),
3306                                                lsp::Position::new(0, 27),
3307                                            ),
3308                                            "".to_string(),
3309                                        )],
3310                                    ),
3311                                ]
3312                                .into_iter()
3313                                .collect(),
3314                            ),
3315                            ..Default::default()
3316                        }),
3317                        data: Some(json!({
3318                            "codeActionParams": {
3319                                "range": {
3320                                    "start": {"line": 1, "column": 31},
3321                                    "end": {"line": 1, "column": 31},
3322                                }
3323                            }
3324                        })),
3325                        ..Default::default()
3326                    },
3327                )])
3328            })
3329            .next()
3330            .await;
3331
3332        // Toggle code actions and wait for them to display.
3333        editor_b.update(cx_b, |editor, cx| {
3334            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
3335        });
3336        editor_b
3337            .condition(&cx_b, |editor, _| editor.context_menu_visible())
3338            .await;
3339
3340        fake_language_server.remove_request_handler::<lsp::request::CodeActionRequest>();
3341
3342        // Confirming the code action will trigger a resolve request.
3343        let confirm_action = workspace_b
3344            .update(cx_b, |workspace, cx| {
3345                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
3346            })
3347            .unwrap();
3348        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_, _| {
3349            lsp::CodeAction {
3350                title: "Inline into all callers".to_string(),
3351                edit: Some(lsp::WorkspaceEdit {
3352                    changes: Some(
3353                        [
3354                            (
3355                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
3356                                vec![lsp::TextEdit::new(
3357                                    lsp::Range::new(
3358                                        lsp::Position::new(1, 22),
3359                                        lsp::Position::new(1, 34),
3360                                    ),
3361                                    "4".to_string(),
3362                                )],
3363                            ),
3364                            (
3365                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
3366                                vec![lsp::TextEdit::new(
3367                                    lsp::Range::new(
3368                                        lsp::Position::new(0, 0),
3369                                        lsp::Position::new(0, 27),
3370                                    ),
3371                                    "".to_string(),
3372                                )],
3373                            ),
3374                        ]
3375                        .into_iter()
3376                        .collect(),
3377                    ),
3378                    ..Default::default()
3379                }),
3380                ..Default::default()
3381            }
3382        });
3383
3384        // After the action is confirmed, an editor containing both modified files is opened.
3385        confirm_action.await.unwrap();
3386        let code_action_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3387            workspace
3388                .active_item(cx)
3389                .unwrap()
3390                .downcast::<Editor>()
3391                .unwrap()
3392        });
3393        code_action_editor.update(cx_b, |editor, cx| {
3394            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3395            editor.undo(&Undo, cx);
3396            assert_eq!(
3397                editor.text(cx),
3398                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
3399            );
3400            editor.redo(&Redo, cx);
3401            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
3402        });
3403    }
3404
3405    #[gpui::test(iterations = 10)]
3406    async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3407        cx_a.foreground().forbid_parking();
3408        let mut lang_registry = Arc::new(LanguageRegistry::test());
3409        let fs = FakeFs::new(cx_a.background());
3410        cx_b.update(|cx| editor::init(cx));
3411
3412        // Set up a fake language server.
3413        let (language_server_config, mut fake_language_servers) = LanguageServerConfig::fake();
3414        Arc::get_mut(&mut lang_registry)
3415            .unwrap()
3416            .add(Arc::new(Language::new(
3417                LanguageConfig {
3418                    name: "Rust".into(),
3419                    path_suffixes: vec!["rs".to_string()],
3420                    language_server: Some(language_server_config),
3421                    ..Default::default()
3422                },
3423                Some(tree_sitter_rust::language()),
3424            )));
3425
3426        // Connect to a server as 2 clients.
3427        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3428        let client_a = server.create_client(cx_a, "user_a").await;
3429        let client_b = server.create_client(cx_b, "user_b").await;
3430
3431        // Share a project as client A
3432        fs.insert_tree(
3433            "/dir",
3434            json!({
3435                ".zed.toml": r#"collaborators = ["user_b"]"#,
3436                "one.rs": "const ONE: usize = 1;",
3437                "two.rs": "const TWO: usize = one::ONE + one::ONE;"
3438            }),
3439        )
3440        .await;
3441        let project_a = cx_a.update(|cx| {
3442            Project::local(
3443                client_a.clone(),
3444                client_a.user_store.clone(),
3445                lang_registry.clone(),
3446                fs.clone(),
3447                cx,
3448            )
3449        });
3450        let (worktree_a, _) = project_a
3451            .update(cx_a, |p, cx| {
3452                p.find_or_create_local_worktree("/dir", true, cx)
3453            })
3454            .await
3455            .unwrap();
3456        worktree_a
3457            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3458            .await;
3459        let project_id = project_a.update(cx_a, |p, _| p.next_remote_id()).await;
3460        let worktree_id = worktree_a.read_with(cx_a, |tree, _| tree.id());
3461        project_a.update(cx_a, |p, cx| p.share(cx)).await.unwrap();
3462
3463        // Join the worktree as client B.
3464        let project_b = Project::remote(
3465            project_id,
3466            client_b.clone(),
3467            client_b.user_store.clone(),
3468            lang_registry.clone(),
3469            fs.clone(),
3470            &mut cx_b.to_async(),
3471        )
3472        .await
3473        .unwrap();
3474        let mut params = cx_b.update(WorkspaceParams::test);
3475        params.languages = lang_registry.clone();
3476        params.client = client_b.client.clone();
3477        params.user_store = client_b.user_store.clone();
3478        params.project = project_b;
3479
3480        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
3481        let editor_b = workspace_b
3482            .update(cx_b, |workspace, cx| {
3483                workspace.open_path((worktree_id, "one.rs"), cx)
3484            })
3485            .await
3486            .unwrap()
3487            .downcast::<Editor>()
3488            .unwrap();
3489        let mut fake_language_server = fake_language_servers.next().await.unwrap();
3490
3491        // Move cursor to a location that can be renamed.
3492        let prepare_rename = editor_b.update(cx_b, |editor, cx| {
3493            editor.select_ranges([7..7], None, cx);
3494            editor.rename(&Rename, cx).unwrap()
3495        });
3496
3497        fake_language_server
3498            .handle_request::<lsp::request::PrepareRenameRequest, _>(|params, _| {
3499                assert_eq!(params.text_document.uri.as_str(), "file:///dir/one.rs");
3500                assert_eq!(params.position, lsp::Position::new(0, 7));
3501                Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
3502                    lsp::Position::new(0, 6),
3503                    lsp::Position::new(0, 9),
3504                )))
3505            })
3506            .next()
3507            .await
3508            .unwrap();
3509        prepare_rename.await.unwrap();
3510        editor_b.update(cx_b, |editor, cx| {
3511            let rename = editor.pending_rename().unwrap();
3512            let buffer = editor.buffer().read(cx).snapshot(cx);
3513            assert_eq!(
3514                rename.range.start.to_offset(&buffer)..rename.range.end.to_offset(&buffer),
3515                6..9
3516            );
3517            rename.editor.update(cx, |rename_editor, cx| {
3518                rename_editor.buffer().update(cx, |rename_buffer, cx| {
3519                    rename_buffer.edit([0..3], "THREE", cx);
3520                });
3521            });
3522        });
3523
3524        let confirm_rename = workspace_b.update(cx_b, |workspace, cx| {
3525            Editor::confirm_rename(workspace, &ConfirmRename, cx).unwrap()
3526        });
3527        fake_language_server
3528            .handle_request::<lsp::request::Rename, _>(|params, _| {
3529                assert_eq!(
3530                    params.text_document_position.text_document.uri.as_str(),
3531                    "file:///dir/one.rs"
3532                );
3533                assert_eq!(
3534                    params.text_document_position.position,
3535                    lsp::Position::new(0, 6)
3536                );
3537                assert_eq!(params.new_name, "THREE");
3538                Some(lsp::WorkspaceEdit {
3539                    changes: Some(
3540                        [
3541                            (
3542                                lsp::Url::from_file_path("/dir/one.rs").unwrap(),
3543                                vec![lsp::TextEdit::new(
3544                                    lsp::Range::new(
3545                                        lsp::Position::new(0, 6),
3546                                        lsp::Position::new(0, 9),
3547                                    ),
3548                                    "THREE".to_string(),
3549                                )],
3550                            ),
3551                            (
3552                                lsp::Url::from_file_path("/dir/two.rs").unwrap(),
3553                                vec![
3554                                    lsp::TextEdit::new(
3555                                        lsp::Range::new(
3556                                            lsp::Position::new(0, 24),
3557                                            lsp::Position::new(0, 27),
3558                                        ),
3559                                        "THREE".to_string(),
3560                                    ),
3561                                    lsp::TextEdit::new(
3562                                        lsp::Range::new(
3563                                            lsp::Position::new(0, 35),
3564                                            lsp::Position::new(0, 38),
3565                                        ),
3566                                        "THREE".to_string(),
3567                                    ),
3568                                ],
3569                            ),
3570                        ]
3571                        .into_iter()
3572                        .collect(),
3573                    ),
3574                    ..Default::default()
3575                })
3576            })
3577            .next()
3578            .await
3579            .unwrap();
3580        confirm_rename.await.unwrap();
3581
3582        let rename_editor = workspace_b.read_with(cx_b, |workspace, cx| {
3583            workspace
3584                .active_item(cx)
3585                .unwrap()
3586                .downcast::<Editor>()
3587                .unwrap()
3588        });
3589        rename_editor.update(cx_b, |editor, cx| {
3590            assert_eq!(
3591                editor.text(cx),
3592                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3593            );
3594            editor.undo(&Undo, cx);
3595            assert_eq!(
3596                editor.text(cx),
3597                "const TWO: usize = one::ONE + one::ONE;\nconst ONE: usize = 1;"
3598            );
3599            editor.redo(&Redo, cx);
3600            assert_eq!(
3601                editor.text(cx),
3602                "const TWO: usize = one::THREE + one::THREE;\nconst THREE: usize = 1;"
3603            );
3604        });
3605
3606        // Ensure temporary rename edits cannot be undone/redone.
3607        editor_b.update(cx_b, |editor, cx| {
3608            editor.undo(&Undo, cx);
3609            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3610            editor.undo(&Undo, cx);
3611            assert_eq!(editor.text(cx), "const ONE: usize = 1;");
3612            editor.redo(&Redo, cx);
3613            assert_eq!(editor.text(cx), "const THREE: usize = 1;");
3614        })
3615    }
3616
3617    #[gpui::test(iterations = 10)]
3618    async fn test_basic_chat(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3619        cx_a.foreground().forbid_parking();
3620
3621        // Connect to a server as 2 clients.
3622        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3623        let client_a = server.create_client(cx_a, "user_a").await;
3624        let client_b = server.create_client(cx_b, "user_b").await;
3625
3626        // Create an org that includes these 2 users.
3627        let db = &server.app_state.db;
3628        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3629        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3630            .await
3631            .unwrap();
3632        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3633            .await
3634            .unwrap();
3635
3636        // Create a channel that includes all the users.
3637        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3638        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3639            .await
3640            .unwrap();
3641        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3642            .await
3643            .unwrap();
3644        db.create_channel_message(
3645            channel_id,
3646            client_b.current_user_id(&cx_b),
3647            "hello A, it's B.",
3648            OffsetDateTime::now_utc(),
3649            1,
3650        )
3651        .await
3652        .unwrap();
3653
3654        let channels_a = cx_a
3655            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3656        channels_a
3657            .condition(cx_a, |list, _| list.available_channels().is_some())
3658            .await;
3659        channels_a.read_with(cx_a, |list, _| {
3660            assert_eq!(
3661                list.available_channels().unwrap(),
3662                &[ChannelDetails {
3663                    id: channel_id.to_proto(),
3664                    name: "test-channel".to_string()
3665                }]
3666            )
3667        });
3668        let channel_a = channels_a.update(cx_a, |this, cx| {
3669            this.get_channel(channel_id.to_proto(), cx).unwrap()
3670        });
3671        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3672        channel_a
3673            .condition(&cx_a, |channel, _| {
3674                channel_messages(channel)
3675                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3676            })
3677            .await;
3678
3679        let channels_b = cx_b
3680            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3681        channels_b
3682            .condition(cx_b, |list, _| list.available_channels().is_some())
3683            .await;
3684        channels_b.read_with(cx_b, |list, _| {
3685            assert_eq!(
3686                list.available_channels().unwrap(),
3687                &[ChannelDetails {
3688                    id: channel_id.to_proto(),
3689                    name: "test-channel".to_string()
3690                }]
3691            )
3692        });
3693
3694        let channel_b = channels_b.update(cx_b, |this, cx| {
3695            this.get_channel(channel_id.to_proto(), cx).unwrap()
3696        });
3697        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3698        channel_b
3699            .condition(&cx_b, |channel, _| {
3700                channel_messages(channel)
3701                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3702            })
3703            .await;
3704
3705        channel_a
3706            .update(cx_a, |channel, cx| {
3707                channel
3708                    .send_message("oh, hi B.".to_string(), cx)
3709                    .unwrap()
3710                    .detach();
3711                let task = channel.send_message("sup".to_string(), cx).unwrap();
3712                assert_eq!(
3713                    channel_messages(channel),
3714                    &[
3715                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3716                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3717                        ("user_a".to_string(), "sup".to_string(), true)
3718                    ]
3719                );
3720                task
3721            })
3722            .await
3723            .unwrap();
3724
3725        channel_b
3726            .condition(&cx_b, |channel, _| {
3727                channel_messages(channel)
3728                    == [
3729                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3730                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3731                        ("user_a".to_string(), "sup".to_string(), false),
3732                    ]
3733            })
3734            .await;
3735
3736        assert_eq!(
3737            server
3738                .state()
3739                .await
3740                .channel(channel_id)
3741                .unwrap()
3742                .connection_ids
3743                .len(),
3744            2
3745        );
3746        cx_b.update(|_| drop(channel_b));
3747        server
3748            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3749            .await;
3750
3751        cx_a.update(|_| drop(channel_a));
3752        server
3753            .condition(|state| state.channel(channel_id).is_none())
3754            .await;
3755    }
3756
3757    #[gpui::test(iterations = 10)]
3758    async fn test_chat_message_validation(cx_a: &mut TestAppContext) {
3759        cx_a.foreground().forbid_parking();
3760
3761        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3762        let client_a = server.create_client(cx_a, "user_a").await;
3763
3764        let db = &server.app_state.db;
3765        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3766        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3767        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3768            .await
3769            .unwrap();
3770        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3771            .await
3772            .unwrap();
3773
3774        let channels_a = cx_a
3775            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3776        channels_a
3777            .condition(cx_a, |list, _| list.available_channels().is_some())
3778            .await;
3779        let channel_a = channels_a.update(cx_a, |this, cx| {
3780            this.get_channel(channel_id.to_proto(), cx).unwrap()
3781        });
3782
3783        // Messages aren't allowed to be too long.
3784        channel_a
3785            .update(cx_a, |channel, cx| {
3786                let long_body = "this is long.\n".repeat(1024);
3787                channel.send_message(long_body, cx).unwrap()
3788            })
3789            .await
3790            .unwrap_err();
3791
3792        // Messages aren't allowed to be blank.
3793        channel_a.update(cx_a, |channel, cx| {
3794            channel.send_message(String::new(), cx).unwrap_err()
3795        });
3796
3797        // Leading and trailing whitespace are trimmed.
3798        channel_a
3799            .update(cx_a, |channel, cx| {
3800                channel
3801                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3802                    .unwrap()
3803            })
3804            .await
3805            .unwrap();
3806        assert_eq!(
3807            db.get_channel_messages(channel_id, 10, None)
3808                .await
3809                .unwrap()
3810                .iter()
3811                .map(|m| &m.body)
3812                .collect::<Vec<_>>(),
3813            &["surrounded by whitespace"]
3814        );
3815    }
3816
3817    #[gpui::test(iterations = 10)]
3818    async fn test_chat_reconnection(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
3819        cx_a.foreground().forbid_parking();
3820
3821        // Connect to a server as 2 clients.
3822        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
3823        let client_a = server.create_client(cx_a, "user_a").await;
3824        let client_b = server.create_client(cx_b, "user_b").await;
3825        let mut status_b = client_b.status();
3826
3827        // Create an org that includes these 2 users.
3828        let db = &server.app_state.db;
3829        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3830        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3831            .await
3832            .unwrap();
3833        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3834            .await
3835            .unwrap();
3836
3837        // Create a channel that includes all the users.
3838        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3839        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3840            .await
3841            .unwrap();
3842        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3843            .await
3844            .unwrap();
3845        db.create_channel_message(
3846            channel_id,
3847            client_b.current_user_id(&cx_b),
3848            "hello A, it's B.",
3849            OffsetDateTime::now_utc(),
3850            2,
3851        )
3852        .await
3853        .unwrap();
3854
3855        let channels_a = cx_a
3856            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3857        channels_a
3858            .condition(cx_a, |list, _| list.available_channels().is_some())
3859            .await;
3860
3861        channels_a.read_with(cx_a, |list, _| {
3862            assert_eq!(
3863                list.available_channels().unwrap(),
3864                &[ChannelDetails {
3865                    id: channel_id.to_proto(),
3866                    name: "test-channel".to_string()
3867                }]
3868            )
3869        });
3870        let channel_a = channels_a.update(cx_a, |this, cx| {
3871            this.get_channel(channel_id.to_proto(), cx).unwrap()
3872        });
3873        channel_a.read_with(cx_a, |channel, _| assert!(channel.messages().is_empty()));
3874        channel_a
3875            .condition(&cx_a, |channel, _| {
3876                channel_messages(channel)
3877                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3878            })
3879            .await;
3880
3881        let channels_b = cx_b
3882            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3883        channels_b
3884            .condition(cx_b, |list, _| list.available_channels().is_some())
3885            .await;
3886        channels_b.read_with(cx_b, |list, _| {
3887            assert_eq!(
3888                list.available_channels().unwrap(),
3889                &[ChannelDetails {
3890                    id: channel_id.to_proto(),
3891                    name: "test-channel".to_string()
3892                }]
3893            )
3894        });
3895
3896        let channel_b = channels_b.update(cx_b, |this, cx| {
3897            this.get_channel(channel_id.to_proto(), cx).unwrap()
3898        });
3899        channel_b.read_with(cx_b, |channel, _| assert!(channel.messages().is_empty()));
3900        channel_b
3901            .condition(&cx_b, |channel, _| {
3902                channel_messages(channel)
3903                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3904            })
3905            .await;
3906
3907        // Disconnect client B, ensuring we can still access its cached channel data.
3908        server.forbid_connections();
3909        server.disconnect_client(client_b.current_user_id(&cx_b));
3910        cx_b.foreground().advance_clock(Duration::from_secs(3));
3911        while !matches!(
3912            status_b.next().await,
3913            Some(client::Status::ReconnectionError { .. })
3914        ) {}
3915
3916        channels_b.read_with(cx_b, |channels, _| {
3917            assert_eq!(
3918                channels.available_channels().unwrap(),
3919                [ChannelDetails {
3920                    id: channel_id.to_proto(),
3921                    name: "test-channel".to_string()
3922                }]
3923            )
3924        });
3925        channel_b.read_with(cx_b, |channel, _| {
3926            assert_eq!(
3927                channel_messages(channel),
3928                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3929            )
3930        });
3931
3932        // Send a message from client B while it is disconnected.
3933        channel_b
3934            .update(cx_b, |channel, cx| {
3935                let task = channel
3936                    .send_message("can you see this?".to_string(), cx)
3937                    .unwrap();
3938                assert_eq!(
3939                    channel_messages(channel),
3940                    &[
3941                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3942                        ("user_b".to_string(), "can you see this?".to_string(), true)
3943                    ]
3944                );
3945                task
3946            })
3947            .await
3948            .unwrap_err();
3949
3950        // Send a message from client A while B is disconnected.
3951        channel_a
3952            .update(cx_a, |channel, cx| {
3953                channel
3954                    .send_message("oh, hi B.".to_string(), cx)
3955                    .unwrap()
3956                    .detach();
3957                let task = channel.send_message("sup".to_string(), cx).unwrap();
3958                assert_eq!(
3959                    channel_messages(channel),
3960                    &[
3961                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3962                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3963                        ("user_a".to_string(), "sup".to_string(), true)
3964                    ]
3965                );
3966                task
3967            })
3968            .await
3969            .unwrap();
3970
3971        // Give client B a chance to reconnect.
3972        server.allow_connections();
3973        cx_b.foreground().advance_clock(Duration::from_secs(10));
3974
3975        // Verify that B sees the new messages upon reconnection, as well as the message client B
3976        // sent while offline.
3977        channel_b
3978            .condition(&cx_b, |channel, _| {
3979                channel_messages(channel)
3980                    == [
3981                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3982                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3983                        ("user_a".to_string(), "sup".to_string(), false),
3984                        ("user_b".to_string(), "can you see this?".to_string(), false),
3985                    ]
3986            })
3987            .await;
3988
3989        // Ensure client A and B can communicate normally after reconnection.
3990        channel_a
3991            .update(cx_a, |channel, cx| {
3992                channel.send_message("you online?".to_string(), cx).unwrap()
3993            })
3994            .await
3995            .unwrap();
3996        channel_b
3997            .condition(&cx_b, |channel, _| {
3998                channel_messages(channel)
3999                    == [
4000                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4001                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4002                        ("user_a".to_string(), "sup".to_string(), false),
4003                        ("user_b".to_string(), "can you see this?".to_string(), false),
4004                        ("user_a".to_string(), "you online?".to_string(), false),
4005                    ]
4006            })
4007            .await;
4008
4009        channel_b
4010            .update(cx_b, |channel, cx| {
4011                channel.send_message("yep".to_string(), cx).unwrap()
4012            })
4013            .await
4014            .unwrap();
4015        channel_a
4016            .condition(&cx_a, |channel, _| {
4017                channel_messages(channel)
4018                    == [
4019                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
4020                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
4021                        ("user_a".to_string(), "sup".to_string(), false),
4022                        ("user_b".to_string(), "can you see this?".to_string(), false),
4023                        ("user_a".to_string(), "you online?".to_string(), false),
4024                        ("user_b".to_string(), "yep".to_string(), false),
4025                    ]
4026            })
4027            .await;
4028    }
4029
4030    #[gpui::test(iterations = 10)]
4031    async fn test_contacts(
4032        cx_a: &mut TestAppContext,
4033        cx_b: &mut TestAppContext,
4034        cx_c: &mut TestAppContext,
4035    ) {
4036        cx_a.foreground().forbid_parking();
4037        let lang_registry = Arc::new(LanguageRegistry::test());
4038        let fs = FakeFs::new(cx_a.background());
4039
4040        // Connect to a server as 3 clients.
4041        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4042        let client_a = server.create_client(cx_a, "user_a").await;
4043        let client_b = server.create_client(cx_b, "user_b").await;
4044        let client_c = server.create_client(cx_c, "user_c").await;
4045
4046        // Share a worktree as client A.
4047        fs.insert_tree(
4048            "/a",
4049            json!({
4050                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
4051            }),
4052        )
4053        .await;
4054
4055        let project_a = cx_a.update(|cx| {
4056            Project::local(
4057                client_a.clone(),
4058                client_a.user_store.clone(),
4059                lang_registry.clone(),
4060                fs.clone(),
4061                cx,
4062            )
4063        });
4064        let (worktree_a, _) = project_a
4065            .update(cx_a, |p, cx| {
4066                p.find_or_create_local_worktree("/a", true, cx)
4067            })
4068            .await
4069            .unwrap();
4070        worktree_a
4071            .read_with(cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
4072            .await;
4073
4074        client_a
4075            .user_store
4076            .condition(&cx_a, |user_store, _| {
4077                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4078            })
4079            .await;
4080        client_b
4081            .user_store
4082            .condition(&cx_b, |user_store, _| {
4083                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4084            })
4085            .await;
4086        client_c
4087            .user_store
4088            .condition(&cx_c, |user_store, _| {
4089                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
4090            })
4091            .await;
4092
4093        let project_id = project_a
4094            .update(cx_a, |project, _| project.next_remote_id())
4095            .await;
4096        project_a
4097            .update(cx_a, |project, cx| project.share(cx))
4098            .await
4099            .unwrap();
4100
4101        let _project_b = Project::remote(
4102            project_id,
4103            client_b.clone(),
4104            client_b.user_store.clone(),
4105            lang_registry.clone(),
4106            fs.clone(),
4107            &mut cx_b.to_async(),
4108        )
4109        .await
4110        .unwrap();
4111
4112        client_a
4113            .user_store
4114            .condition(&cx_a, |user_store, _| {
4115                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4116            })
4117            .await;
4118        client_b
4119            .user_store
4120            .condition(&cx_b, |user_store, _| {
4121                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4122            })
4123            .await;
4124        client_c
4125            .user_store
4126            .condition(&cx_c, |user_store, _| {
4127                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
4128            })
4129            .await;
4130
4131        project_a
4132            .condition(&cx_a, |project, _| {
4133                project.collaborators().contains_key(&client_b.peer_id)
4134            })
4135            .await;
4136
4137        cx_a.update(move |_| drop(project_a));
4138        client_a
4139            .user_store
4140            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
4141            .await;
4142        client_b
4143            .user_store
4144            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
4145            .await;
4146        client_c
4147            .user_store
4148            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
4149            .await;
4150
4151        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
4152            user_store
4153                .contacts()
4154                .iter()
4155                .map(|contact| {
4156                    let worktrees = contact
4157                        .projects
4158                        .iter()
4159                        .map(|p| {
4160                            (
4161                                p.worktree_root_names[0].as_str(),
4162                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
4163                            )
4164                        })
4165                        .collect();
4166                    (contact.user.github_login.as_str(), worktrees)
4167                })
4168                .collect()
4169        }
4170    }
4171
4172    #[gpui::test(iterations = 10)]
4173    async fn test_following(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) {
4174        cx_a.foreground().forbid_parking();
4175        let fs = FakeFs::new(cx_a.background());
4176
4177        // 2 clients connect to a server.
4178        let mut server = TestServer::start(cx_a.foreground(), cx_a.background()).await;
4179        let mut client_a = server.create_client(cx_a, "user_a").await;
4180        let mut client_b = server.create_client(cx_b, "user_b").await;
4181        cx_a.update(editor::init);
4182        cx_b.update(editor::init);
4183
4184        // Client A shares a project.
4185        fs.insert_tree(
4186            "/a",
4187            json!({
4188                ".zed.toml": r#"collaborators = ["user_b"]"#,
4189                "1.txt": "one",
4190                "2.txt": "two",
4191                "3.txt": "three",
4192            }),
4193        )
4194        .await;
4195        let (project_a, worktree_id) = client_a.build_local_project(fs.clone(), "/a", cx_a).await;
4196        project_a
4197            .update(cx_a, |project, cx| project.share(cx))
4198            .await
4199            .unwrap();
4200
4201        // Client B joins the project.
4202        let project_b = client_b
4203            .build_remote_project(
4204                project_a
4205                    .read_with(cx_a, |project, _| project.remote_id())
4206                    .unwrap(),
4207                cx_b,
4208            )
4209            .await;
4210
4211        // Client A opens some editors.
4212        let workspace_a = client_a.build_workspace(&project_a, cx_a);
4213        let editor_a1 = workspace_a
4214            .update(cx_a, |workspace, cx| {
4215                workspace.open_path((worktree_id, "1.txt"), cx)
4216            })
4217            .await
4218            .unwrap();
4219        let editor_a2 = workspace_a
4220            .update(cx_a, |workspace, cx| {
4221                workspace.open_path((worktree_id, "2.txt"), cx)
4222            })
4223            .await
4224            .unwrap();
4225
4226        // Client B opens an editor.
4227        let workspace_b = client_b.build_workspace(&project_b, cx_b);
4228        let editor_b1 = workspace_b
4229            .update(cx_b, |workspace, cx| {
4230                workspace.open_path((worktree_id, "1.txt"), cx)
4231            })
4232            .await
4233            .unwrap();
4234
4235        // Client B starts following client A.
4236        workspace_b
4237            .update(cx_b, |workspace, cx| {
4238                workspace.split_pane(workspace.active_pane().clone(), SplitDirection::Right, cx);
4239                let leader_id = project_b.read(cx).collaborators().keys().next().unwrap();
4240                workspace.follow(*leader_id, cx)
4241            })
4242            .await
4243            .unwrap();
4244        assert_eq!(
4245            workspace_b.read_with(cx_b, |workspace, cx| workspace
4246                .active_item(cx)
4247                .unwrap()
4248                .project_path(cx)),
4249            Some((worktree_id, "2.txt").into())
4250        );
4251
4252        // When client A activates a different editor, client B does so as well.
4253        workspace_a.update(cx_a, |workspace, cx| {
4254            workspace.activate_item(editor_a1.as_ref(), cx)
4255        });
4256        workspace_b
4257            .condition(cx_b, |workspace, cx| {
4258                let active_item = workspace.active_item(cx).unwrap();
4259                active_item.project_path(cx) == Some((worktree_id, "1.txt").into())
4260            })
4261            .await;
4262    }
4263
4264    #[gpui::test(iterations = 100)]
4265    async fn test_random_collaboration(cx: &mut TestAppContext, rng: StdRng) {
4266        cx.foreground().forbid_parking();
4267        let max_peers = env::var("MAX_PEERS")
4268            .map(|i| i.parse().expect("invalid `MAX_PEERS` variable"))
4269            .unwrap_or(5);
4270        let max_operations = env::var("OPERATIONS")
4271            .map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
4272            .unwrap_or(10);
4273
4274        let rng = Arc::new(Mutex::new(rng));
4275
4276        let guest_lang_registry = Arc::new(LanguageRegistry::test());
4277        let (language_server_config, _fake_language_servers) = LanguageServerConfig::fake();
4278
4279        let fs = FakeFs::new(cx.background());
4280        fs.insert_tree(
4281            "/_collab",
4282            json!({
4283                ".zed.toml": r#"collaborators = ["guest-1", "guest-2", "guest-3", "guest-4", "guest-5"]"#
4284            }),
4285        )
4286        .await;
4287
4288        let operations = Rc::new(Cell::new(0));
4289        let mut server = TestServer::start(cx.foreground(), cx.background()).await;
4290        let mut clients = Vec::new();
4291
4292        let mut next_entity_id = 100000;
4293        let mut host_cx = TestAppContext::new(
4294            cx.foreground_platform(),
4295            cx.platform(),
4296            cx.foreground(),
4297            cx.background(),
4298            cx.font_cache(),
4299            cx.leak_detector(),
4300            next_entity_id,
4301        );
4302        let host = server.create_client(&mut host_cx, "host").await;
4303        let host_project = host_cx.update(|cx| {
4304            Project::local(
4305                host.client.clone(),
4306                host.user_store.clone(),
4307                Arc::new(LanguageRegistry::test()),
4308                fs.clone(),
4309                cx,
4310            )
4311        });
4312        let host_project_id = host_project
4313            .update(&mut host_cx, |p, _| p.next_remote_id())
4314            .await;
4315
4316        let (collab_worktree, _) = host_project
4317            .update(&mut host_cx, |project, cx| {
4318                project.find_or_create_local_worktree("/_collab", true, cx)
4319            })
4320            .await
4321            .unwrap();
4322        collab_worktree
4323            .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete())
4324            .await;
4325        host_project
4326            .update(&mut host_cx, |project, cx| project.share(cx))
4327            .await
4328            .unwrap();
4329
4330        clients.push(cx.foreground().spawn(host.simulate_host(
4331            host_project,
4332            language_server_config,
4333            operations.clone(),
4334            max_operations,
4335            rng.clone(),
4336            host_cx,
4337        )));
4338
4339        while operations.get() < max_operations {
4340            cx.background().simulate_random_delay().await;
4341            if clients.len() >= max_peers {
4342                break;
4343            } else if rng.lock().gen_bool(0.05) {
4344                operations.set(operations.get() + 1);
4345
4346                let guest_id = clients.len();
4347                log::info!("Adding guest {}", guest_id);
4348                next_entity_id += 100000;
4349                let mut guest_cx = TestAppContext::new(
4350                    cx.foreground_platform(),
4351                    cx.platform(),
4352                    cx.foreground(),
4353                    cx.background(),
4354                    cx.font_cache(),
4355                    cx.leak_detector(),
4356                    next_entity_id,
4357                );
4358                let guest = server
4359                    .create_client(&mut guest_cx, &format!("guest-{}", guest_id))
4360                    .await;
4361                let guest_project = Project::remote(
4362                    host_project_id,
4363                    guest.client.clone(),
4364                    guest.user_store.clone(),
4365                    guest_lang_registry.clone(),
4366                    FakeFs::new(cx.background()),
4367                    &mut guest_cx.to_async(),
4368                )
4369                .await
4370                .unwrap();
4371                clients.push(cx.foreground().spawn(guest.simulate_guest(
4372                    guest_id,
4373                    guest_project,
4374                    operations.clone(),
4375                    max_operations,
4376                    rng.clone(),
4377                    guest_cx,
4378                )));
4379
4380                log::info!("Guest {} added", guest_id);
4381            }
4382        }
4383
4384        let mut clients = futures::future::join_all(clients).await;
4385        cx.foreground().run_until_parked();
4386
4387        let (host_client, mut host_cx) = clients.remove(0);
4388        let host_project = host_client.project.as_ref().unwrap();
4389        let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| {
4390            project
4391                .worktrees(cx)
4392                .map(|worktree| {
4393                    let snapshot = worktree.read(cx).snapshot();
4394                    (snapshot.id(), snapshot)
4395                })
4396                .collect::<BTreeMap<_, _>>()
4397        });
4398
4399        host_client
4400            .project
4401            .as_ref()
4402            .unwrap()
4403            .read_with(&host_cx, |project, cx| project.check_invariants(cx));
4404
4405        for (guest_client, mut guest_cx) in clients.into_iter() {
4406            let guest_id = guest_client.client.id();
4407            let worktree_snapshots =
4408                guest_client
4409                    .project
4410                    .as_ref()
4411                    .unwrap()
4412                    .read_with(&guest_cx, |project, cx| {
4413                        project
4414                            .worktrees(cx)
4415                            .map(|worktree| {
4416                                let worktree = worktree.read(cx);
4417                                (worktree.id(), worktree.snapshot())
4418                            })
4419                            .collect::<BTreeMap<_, _>>()
4420                    });
4421
4422            assert_eq!(
4423                worktree_snapshots.keys().collect::<Vec<_>>(),
4424                host_worktree_snapshots.keys().collect::<Vec<_>>(),
4425                "guest {} has different worktrees than the host",
4426                guest_id
4427            );
4428            for (id, host_snapshot) in &host_worktree_snapshots {
4429                let guest_snapshot = &worktree_snapshots[id];
4430                assert_eq!(
4431                    guest_snapshot.root_name(),
4432                    host_snapshot.root_name(),
4433                    "guest {} has different root name than the host for worktree {}",
4434                    guest_id,
4435                    id
4436                );
4437                assert_eq!(
4438                    guest_snapshot.entries(false).collect::<Vec<_>>(),
4439                    host_snapshot.entries(false).collect::<Vec<_>>(),
4440                    "guest {} has different snapshot than the host for worktree {}",
4441                    guest_id,
4442                    id
4443                );
4444            }
4445
4446            guest_client
4447                .project
4448                .as_ref()
4449                .unwrap()
4450                .read_with(&guest_cx, |project, cx| project.check_invariants(cx));
4451
4452            for guest_buffer in &guest_client.buffers {
4453                let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id());
4454                let host_buffer = host_project.read_with(&host_cx, |project, cx| {
4455                    project.buffer_for_id(buffer_id, cx).expect(&format!(
4456                        "host does not have buffer for guest:{}, peer:{}, id:{}",
4457                        guest_id, guest_client.peer_id, buffer_id
4458                    ))
4459                });
4460                let path = host_buffer
4461                    .read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx));
4462
4463                assert_eq!(
4464                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()),
4465                    0,
4466                    "guest {}, buffer {}, path {:?} has deferred operations",
4467                    guest_id,
4468                    buffer_id,
4469                    path,
4470                );
4471                assert_eq!(
4472                    guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()),
4473                    host_buffer.read_with(&host_cx, |buffer, _| buffer.text()),
4474                    "guest {}, buffer {}, path {:?}, differs from the host's buffer",
4475                    guest_id,
4476                    buffer_id,
4477                    path
4478                );
4479            }
4480
4481            guest_cx.update(|_| drop(guest_client));
4482        }
4483
4484        host_cx.update(|_| drop(host_client));
4485    }
4486
4487    struct TestServer {
4488        peer: Arc<Peer>,
4489        app_state: Arc<AppState>,
4490        server: Arc<Server>,
4491        foreground: Rc<executor::Foreground>,
4492        notifications: mpsc::UnboundedReceiver<()>,
4493        connection_killers: Arc<Mutex<HashMap<UserId, barrier::Sender>>>,
4494        forbid_connections: Arc<AtomicBool>,
4495        _test_db: TestDb,
4496    }
4497
4498    impl TestServer {
4499        async fn start(
4500            foreground: Rc<executor::Foreground>,
4501            background: Arc<executor::Background>,
4502        ) -> Self {
4503            let test_db = TestDb::fake(background);
4504            let app_state = Self::build_app_state(&test_db).await;
4505            let peer = Peer::new();
4506            let notifications = mpsc::unbounded();
4507            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
4508            Self {
4509                peer,
4510                app_state,
4511                server,
4512                foreground,
4513                notifications: notifications.1,
4514                connection_killers: Default::default(),
4515                forbid_connections: Default::default(),
4516                _test_db: test_db,
4517            }
4518        }
4519
4520        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
4521            cx.update(|cx| {
4522                let settings = Settings::test(cx);
4523                cx.set_global(settings);
4524            });
4525
4526            let http = FakeHttpClient::with_404_response();
4527            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
4528            let client_name = name.to_string();
4529            let mut client = Client::new(http.clone());
4530            let server = self.server.clone();
4531            let connection_killers = self.connection_killers.clone();
4532            let forbid_connections = self.forbid_connections.clone();
4533            let (connection_id_tx, mut connection_id_rx) = mpsc::channel(16);
4534
4535            Arc::get_mut(&mut client)
4536                .unwrap()
4537                .override_authenticate(move |cx| {
4538                    cx.spawn(|_| async move {
4539                        let access_token = "the-token".to_string();
4540                        Ok(Credentials {
4541                            user_id: user_id.0 as u64,
4542                            access_token,
4543                        })
4544                    })
4545                })
4546                .override_establish_connection(move |credentials, cx| {
4547                    assert_eq!(credentials.user_id, user_id.0 as u64);
4548                    assert_eq!(credentials.access_token, "the-token");
4549
4550                    let server = server.clone();
4551                    let connection_killers = connection_killers.clone();
4552                    let forbid_connections = forbid_connections.clone();
4553                    let client_name = client_name.clone();
4554                    let connection_id_tx = connection_id_tx.clone();
4555                    cx.spawn(move |cx| async move {
4556                        if forbid_connections.load(SeqCst) {
4557                            Err(EstablishConnectionError::other(anyhow!(
4558                                "server is forbidding connections"
4559                            )))
4560                        } else {
4561                            let (client_conn, server_conn, kill_conn) =
4562                                Connection::in_memory(cx.background());
4563                            connection_killers.lock().insert(user_id, kill_conn);
4564                            cx.background()
4565                                .spawn(server.handle_connection(
4566                                    server_conn,
4567                                    client_name,
4568                                    user_id,
4569                                    Some(connection_id_tx),
4570                                    cx.background(),
4571                                ))
4572                                .detach();
4573                            Ok(client_conn)
4574                        }
4575                    })
4576                });
4577
4578            client
4579                .authenticate_and_connect(&cx.to_async())
4580                .await
4581                .unwrap();
4582
4583            Channel::init(&client);
4584            Project::init(&client);
4585            cx.update(|cx| {
4586                workspace::init(&client, cx);
4587            });
4588
4589            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
4590            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
4591
4592            let client = TestClient {
4593                client,
4594                peer_id,
4595                user_store,
4596                language_registry: Arc::new(LanguageRegistry::test()),
4597                project: Default::default(),
4598                buffers: Default::default(),
4599            };
4600            client.wait_for_current_user(cx).await;
4601            client
4602        }
4603
4604        fn disconnect_client(&self, user_id: UserId) {
4605            self.connection_killers.lock().remove(&user_id);
4606        }
4607
4608        fn forbid_connections(&self) {
4609            self.forbid_connections.store(true, SeqCst);
4610        }
4611
4612        fn allow_connections(&self) {
4613            self.forbid_connections.store(false, SeqCst);
4614        }
4615
4616        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
4617            let mut config = Config::default();
4618            config.session_secret = "a".repeat(32);
4619            config.database_url = test_db.url.clone();
4620            let github_client = github::AppClient::test();
4621            Arc::new(AppState {
4622                db: test_db.db().clone(),
4623                handlebars: Default::default(),
4624                auth_client: auth::build_client("", ""),
4625                repo_client: github::RepoClient::test(&github_client),
4626                github_client,
4627                config,
4628            })
4629        }
4630
4631        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
4632            self.server.store.read()
4633        }
4634
4635        async fn condition<F>(&mut self, mut predicate: F)
4636        where
4637            F: FnMut(&Store) -> bool,
4638        {
4639            async_std::future::timeout(Duration::from_millis(500), async {
4640                while !(predicate)(&*self.server.store.read()) {
4641                    self.foreground.start_waiting();
4642                    self.notifications.next().await;
4643                    self.foreground.finish_waiting();
4644                }
4645            })
4646            .await
4647            .expect("condition timed out");
4648        }
4649    }
4650
4651    impl Drop for TestServer {
4652        fn drop(&mut self) {
4653            self.peer.reset();
4654        }
4655    }
4656
4657    struct TestClient {
4658        client: Arc<Client>,
4659        pub peer_id: PeerId,
4660        pub user_store: ModelHandle<UserStore>,
4661        language_registry: Arc<LanguageRegistry>,
4662        project: Option<ModelHandle<Project>>,
4663        buffers: HashSet<ModelHandle<language::Buffer>>,
4664    }
4665
4666    impl Deref for TestClient {
4667        type Target = Arc<Client>;
4668
4669        fn deref(&self) -> &Self::Target {
4670            &self.client
4671        }
4672    }
4673
4674    impl TestClient {
4675        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
4676            UserId::from_proto(
4677                self.user_store
4678                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
4679            )
4680        }
4681
4682        async fn wait_for_current_user(&self, cx: &TestAppContext) {
4683            let mut authed_user = self
4684                .user_store
4685                .read_with(cx, |user_store, _| user_store.watch_current_user());
4686            while authed_user.next().await.unwrap().is_none() {}
4687        }
4688
4689        async fn build_local_project(
4690            &mut self,
4691            fs: Arc<FakeFs>,
4692            root_path: impl AsRef<Path>,
4693            cx: &mut TestAppContext,
4694        ) -> (ModelHandle<Project>, WorktreeId) {
4695            let project = cx.update(|cx| {
4696                Project::local(
4697                    self.client.clone(),
4698                    self.user_store.clone(),
4699                    self.language_registry.clone(),
4700                    fs,
4701                    cx,
4702                )
4703            });
4704            self.project = Some(project.clone());
4705            let (worktree, _) = project
4706                .update(cx, |p, cx| {
4707                    p.find_or_create_local_worktree(root_path, true, cx)
4708                })
4709                .await
4710                .unwrap();
4711            worktree
4712                .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete())
4713                .await;
4714            project
4715                .update(cx, |project, _| project.next_remote_id())
4716                .await;
4717            (project, worktree.read_with(cx, |tree, _| tree.id()))
4718        }
4719
4720        async fn build_remote_project(
4721            &mut self,
4722            project_id: u64,
4723            cx: &mut TestAppContext,
4724        ) -> ModelHandle<Project> {
4725            let project = Project::remote(
4726                project_id,
4727                self.client.clone(),
4728                self.user_store.clone(),
4729                self.language_registry.clone(),
4730                FakeFs::new(cx.background()),
4731                &mut cx.to_async(),
4732            )
4733            .await
4734            .unwrap();
4735            self.project = Some(project.clone());
4736            project
4737        }
4738
4739        fn build_workspace(
4740            &self,
4741            project: &ModelHandle<Project>,
4742            cx: &mut TestAppContext,
4743        ) -> ViewHandle<Workspace> {
4744            let (window_id, _) = cx.add_window(|cx| EmptyView);
4745            cx.add_view(window_id, |cx| {
4746                let fs = project.read(cx).fs().clone();
4747                Workspace::new(
4748                    &WorkspaceParams {
4749                        fs,
4750                        project: project.clone(),
4751                        user_store: self.user_store.clone(),
4752                        languages: self.language_registry.clone(),
4753                        channel_list: cx.add_model(|cx| {
4754                            ChannelList::new(self.user_store.clone(), self.client.clone(), cx)
4755                        }),
4756                        client: self.client.clone(),
4757                    },
4758                    cx,
4759                )
4760            })
4761        }
4762
4763        fn simulate_host(
4764            mut self,
4765            project: ModelHandle<Project>,
4766            mut language_server_config: LanguageServerConfig,
4767            operations: Rc<Cell<usize>>,
4768            max_operations: usize,
4769            rng: Arc<Mutex<StdRng>>,
4770            mut cx: TestAppContext,
4771        ) -> impl Future<Output = (Self, TestAppContext)> {
4772            let files: Arc<Mutex<Vec<PathBuf>>> = Default::default();
4773
4774            // Set up a fake language server.
4775            language_server_config.set_fake_initializer({
4776                let rng = rng.clone();
4777                let files = files.clone();
4778                let project = project.downgrade();
4779                move |fake_server| {
4780                    fake_server.handle_request::<lsp::request::Completion, _>(|_, _| {
4781                        Some(lsp::CompletionResponse::Array(vec![lsp::CompletionItem {
4782                            text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
4783                                range: lsp::Range::new(
4784                                    lsp::Position::new(0, 0),
4785                                    lsp::Position::new(0, 0),
4786                                ),
4787                                new_text: "the-new-text".to_string(),
4788                            })),
4789                            ..Default::default()
4790                        }]))
4791                    });
4792
4793                    fake_server.handle_request::<lsp::request::CodeActionRequest, _>(|_, _| {
4794                        Some(vec![lsp::CodeActionOrCommand::CodeAction(
4795                            lsp::CodeAction {
4796                                title: "the-code-action".to_string(),
4797                                ..Default::default()
4798                            },
4799                        )])
4800                    });
4801
4802                    fake_server.handle_request::<lsp::request::PrepareRenameRequest, _>(
4803                        |params, _| {
4804                            Some(lsp::PrepareRenameResponse::Range(lsp::Range::new(
4805                                params.position,
4806                                params.position,
4807                            )))
4808                        },
4809                    );
4810
4811                    fake_server.handle_request::<lsp::request::GotoDefinition, _>({
4812                        let files = files.clone();
4813                        let rng = rng.clone();
4814                        move |_, _| {
4815                            let files = files.lock();
4816                            let mut rng = rng.lock();
4817                            let count = rng.gen_range::<usize, _>(1..3);
4818                            let files = (0..count)
4819                                .map(|_| files.choose(&mut *rng).unwrap())
4820                                .collect::<Vec<_>>();
4821                            log::info!("LSP: Returning definitions in files {:?}", &files);
4822                            Some(lsp::GotoDefinitionResponse::Array(
4823                                files
4824                                    .into_iter()
4825                                    .map(|file| lsp::Location {
4826                                        uri: lsp::Url::from_file_path(file).unwrap(),
4827                                        range: Default::default(),
4828                                    })
4829                                    .collect(),
4830                            ))
4831                        }
4832                    });
4833
4834                    fake_server.handle_request::<lsp::request::DocumentHighlightRequest, _>({
4835                        let rng = rng.clone();
4836                        let project = project.clone();
4837                        move |params, mut cx| {
4838                            if let Some(project) = project.upgrade(&cx) {
4839                                project.update(&mut cx, |project, cx| {
4840                                    let path = params
4841                                        .text_document_position_params
4842                                        .text_document
4843                                        .uri
4844                                        .to_file_path()
4845                                        .unwrap();
4846                                    let (worktree, relative_path) =
4847                                        project.find_local_worktree(&path, cx)?;
4848                                    let project_path =
4849                                        ProjectPath::from((worktree.read(cx).id(), relative_path));
4850                                    let buffer =
4851                                        project.get_open_buffer(&project_path, cx)?.read(cx);
4852
4853                                    let mut highlights = Vec::new();
4854                                    let highlight_count = rng.lock().gen_range(1..=5);
4855                                    let mut prev_end = 0;
4856                                    for _ in 0..highlight_count {
4857                                        let range =
4858                                            buffer.random_byte_range(prev_end, &mut *rng.lock());
4859                                        let start = buffer
4860                                            .offset_to_point_utf16(range.start)
4861                                            .to_lsp_position();
4862                                        let end = buffer
4863                                            .offset_to_point_utf16(range.end)
4864                                            .to_lsp_position();
4865                                        highlights.push(lsp::DocumentHighlight {
4866                                            range: lsp::Range::new(start, end),
4867                                            kind: Some(lsp::DocumentHighlightKind::READ),
4868                                        });
4869                                        prev_end = range.end;
4870                                    }
4871                                    Some(highlights)
4872                                })
4873                            } else {
4874                                None
4875                            }
4876                        }
4877                    });
4878                }
4879            });
4880
4881            project.update(&mut cx, |project, _| {
4882                project.languages().add(Arc::new(Language::new(
4883                    LanguageConfig {
4884                        name: "Rust".into(),
4885                        path_suffixes: vec!["rs".to_string()],
4886                        language_server: Some(language_server_config),
4887                        ..Default::default()
4888                    },
4889                    None,
4890                )));
4891            });
4892
4893            async move {
4894                let fs = project.read_with(&cx, |project, _| project.fs().clone());
4895                while operations.get() < max_operations {
4896                    operations.set(operations.get() + 1);
4897
4898                    let distribution = rng.lock().gen_range::<usize, _>(0..100);
4899                    match distribution {
4900                        0..=20 if !files.lock().is_empty() => {
4901                            let path = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4902                            let mut path = path.as_path();
4903                            while let Some(parent_path) = path.parent() {
4904                                path = parent_path;
4905                                if rng.lock().gen() {
4906                                    break;
4907                                }
4908                            }
4909
4910                            log::info!("Host: find/create local worktree {:?}", path);
4911                            let find_or_create_worktree = project.update(&mut cx, |project, cx| {
4912                                project.find_or_create_local_worktree(path, true, cx)
4913                            });
4914                            let find_or_create_worktree = async move {
4915                                find_or_create_worktree.await.unwrap();
4916                            };
4917                            if rng.lock().gen() {
4918                                cx.background().spawn(find_or_create_worktree).detach();
4919                            } else {
4920                                find_or_create_worktree.await;
4921                            }
4922                        }
4923                        10..=80 if !files.lock().is_empty() => {
4924                            let buffer = if self.buffers.is_empty() || rng.lock().gen() {
4925                                let file = files.lock().choose(&mut *rng.lock()).unwrap().clone();
4926                                let (worktree, path) = project
4927                                    .update(&mut cx, |project, cx| {
4928                                        project.find_or_create_local_worktree(
4929                                            file.clone(),
4930                                            true,
4931                                            cx,
4932                                        )
4933                                    })
4934                                    .await
4935                                    .unwrap();
4936                                let project_path =
4937                                    worktree.read_with(&cx, |worktree, _| (worktree.id(), path));
4938                                log::info!(
4939                                    "Host: opening path {:?}, worktree {}, relative_path {:?}",
4940                                    file,
4941                                    project_path.0,
4942                                    project_path.1
4943                                );
4944                                let buffer = project
4945                                    .update(&mut cx, |project, cx| {
4946                                        project.open_buffer(project_path, cx)
4947                                    })
4948                                    .await
4949                                    .unwrap();
4950                                self.buffers.insert(buffer.clone());
4951                                buffer
4952                            } else {
4953                                self.buffers
4954                                    .iter()
4955                                    .choose(&mut *rng.lock())
4956                                    .unwrap()
4957                                    .clone()
4958                            };
4959
4960                            if rng.lock().gen_bool(0.1) {
4961                                cx.update(|cx| {
4962                                    log::info!(
4963                                        "Host: dropping buffer {:?}",
4964                                        buffer.read(cx).file().unwrap().full_path(cx)
4965                                    );
4966                                    self.buffers.remove(&buffer);
4967                                    drop(buffer);
4968                                });
4969                            } else {
4970                                buffer.update(&mut cx, |buffer, cx| {
4971                                    log::info!(
4972                                        "Host: updating buffer {:?} ({})",
4973                                        buffer.file().unwrap().full_path(cx),
4974                                        buffer.remote_id()
4975                                    );
4976                                    buffer.randomly_edit(&mut *rng.lock(), 5, cx)
4977                                });
4978                            }
4979                        }
4980                        _ => loop {
4981                            let path_component_count = rng.lock().gen_range::<usize, _>(1..=5);
4982                            let mut path = PathBuf::new();
4983                            path.push("/");
4984                            for _ in 0..path_component_count {
4985                                let letter = rng.lock().gen_range(b'a'..=b'z');
4986                                path.push(std::str::from_utf8(&[letter]).unwrap());
4987                            }
4988                            path.set_extension("rs");
4989                            let parent_path = path.parent().unwrap();
4990
4991                            log::info!("Host: creating file {:?}", path,);
4992
4993                            if fs.create_dir(&parent_path).await.is_ok()
4994                                && fs.create_file(&path, Default::default()).await.is_ok()
4995                            {
4996                                files.lock().push(path);
4997                                break;
4998                            } else {
4999                                log::info!("Host: cannot create file");
5000                            }
5001                        },
5002                    }
5003
5004                    cx.background().simulate_random_delay().await;
5005                }
5006
5007                log::info!("Host done");
5008
5009                self.project = Some(project);
5010                (self, cx)
5011            }
5012        }
5013
5014        pub async fn simulate_guest(
5015            mut self,
5016            guest_id: usize,
5017            project: ModelHandle<Project>,
5018            operations: Rc<Cell<usize>>,
5019            max_operations: usize,
5020            rng: Arc<Mutex<StdRng>>,
5021            mut cx: TestAppContext,
5022        ) -> (Self, TestAppContext) {
5023            while operations.get() < max_operations {
5024                let buffer = if self.buffers.is_empty() || rng.lock().gen() {
5025                    let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| {
5026                        project
5027                            .worktrees(&cx)
5028                            .filter(|worktree| {
5029                                let worktree = worktree.read(cx);
5030                                worktree.is_visible()
5031                                    && worktree.entries(false).any(|e| e.is_file())
5032                            })
5033                            .choose(&mut *rng.lock())
5034                    }) {
5035                        worktree
5036                    } else {
5037                        cx.background().simulate_random_delay().await;
5038                        continue;
5039                    };
5040
5041                    operations.set(operations.get() + 1);
5042                    let (worktree_root_name, project_path) =
5043                        worktree.read_with(&cx, |worktree, _| {
5044                            let entry = worktree
5045                                .entries(false)
5046                                .filter(|e| e.is_file())
5047                                .choose(&mut *rng.lock())
5048                                .unwrap();
5049                            (
5050                                worktree.root_name().to_string(),
5051                                (worktree.id(), entry.path.clone()),
5052                            )
5053                        });
5054                    log::info!(
5055                        "Guest {}: opening path {:?} in worktree {} ({})",
5056                        guest_id,
5057                        project_path.1,
5058                        project_path.0,
5059                        worktree_root_name,
5060                    );
5061                    let buffer = project
5062                        .update(&mut cx, |project, cx| {
5063                            project.open_buffer(project_path.clone(), cx)
5064                        })
5065                        .await
5066                        .unwrap();
5067                    log::info!(
5068                        "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}",
5069                        guest_id,
5070                        project_path.1,
5071                        project_path.0,
5072                        worktree_root_name,
5073                        buffer.read_with(&cx, |buffer, _| buffer.remote_id())
5074                    );
5075                    self.buffers.insert(buffer.clone());
5076                    buffer
5077                } else {
5078                    operations.set(operations.get() + 1);
5079
5080                    self.buffers
5081                        .iter()
5082                        .choose(&mut *rng.lock())
5083                        .unwrap()
5084                        .clone()
5085                };
5086
5087                let choice = rng.lock().gen_range(0..100);
5088                match choice {
5089                    0..=9 => {
5090                        cx.update(|cx| {
5091                            log::info!(
5092                                "Guest {}: dropping buffer {:?}",
5093                                guest_id,
5094                                buffer.read(cx).file().unwrap().full_path(cx)
5095                            );
5096                            self.buffers.remove(&buffer);
5097                            drop(buffer);
5098                        });
5099                    }
5100                    10..=19 => {
5101                        let completions = project.update(&mut cx, |project, cx| {
5102                            log::info!(
5103                                "Guest {}: requesting completions for buffer {} ({:?})",
5104                                guest_id,
5105                                buffer.read(cx).remote_id(),
5106                                buffer.read(cx).file().unwrap().full_path(cx)
5107                            );
5108                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5109                            project.completions(&buffer, offset, cx)
5110                        });
5111                        let completions = cx.background().spawn(async move {
5112                            completions.await.expect("completions request failed");
5113                        });
5114                        if rng.lock().gen_bool(0.3) {
5115                            log::info!("Guest {}: detaching completions request", guest_id);
5116                            completions.detach();
5117                        } else {
5118                            completions.await;
5119                        }
5120                    }
5121                    20..=29 => {
5122                        let code_actions = project.update(&mut cx, |project, cx| {
5123                            log::info!(
5124                                "Guest {}: requesting code actions for buffer {} ({:?})",
5125                                guest_id,
5126                                buffer.read(cx).remote_id(),
5127                                buffer.read(cx).file().unwrap().full_path(cx)
5128                            );
5129                            let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock());
5130                            project.code_actions(&buffer, range, cx)
5131                        });
5132                        let code_actions = cx.background().spawn(async move {
5133                            code_actions.await.expect("code actions request failed");
5134                        });
5135                        if rng.lock().gen_bool(0.3) {
5136                            log::info!("Guest {}: detaching code actions request", guest_id);
5137                            code_actions.detach();
5138                        } else {
5139                            code_actions.await;
5140                        }
5141                    }
5142                    30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => {
5143                        let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| {
5144                            log::info!(
5145                                "Guest {}: saving buffer {} ({:?})",
5146                                guest_id,
5147                                buffer.remote_id(),
5148                                buffer.file().unwrap().full_path(cx)
5149                            );
5150                            (buffer.version(), buffer.save(cx))
5151                        });
5152                        let save = cx.background().spawn(async move {
5153                            let (saved_version, _) = save.await.expect("save request failed");
5154                            assert!(saved_version.observed_all(&requested_version));
5155                        });
5156                        if rng.lock().gen_bool(0.3) {
5157                            log::info!("Guest {}: detaching save request", guest_id);
5158                            save.detach();
5159                        } else {
5160                            save.await;
5161                        }
5162                    }
5163                    40..=44 => {
5164                        let prepare_rename = project.update(&mut cx, |project, cx| {
5165                            log::info!(
5166                                "Guest {}: preparing rename for buffer {} ({:?})",
5167                                guest_id,
5168                                buffer.read(cx).remote_id(),
5169                                buffer.read(cx).file().unwrap().full_path(cx)
5170                            );
5171                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5172                            project.prepare_rename(buffer, offset, cx)
5173                        });
5174                        let prepare_rename = cx.background().spawn(async move {
5175                            prepare_rename.await.expect("prepare rename request failed");
5176                        });
5177                        if rng.lock().gen_bool(0.3) {
5178                            log::info!("Guest {}: detaching prepare rename request", guest_id);
5179                            prepare_rename.detach();
5180                        } else {
5181                            prepare_rename.await;
5182                        }
5183                    }
5184                    45..=49 => {
5185                        let definitions = project.update(&mut cx, |project, cx| {
5186                            log::info!(
5187                                "Guest {}: requesting definitions for buffer {} ({:?})",
5188                                guest_id,
5189                                buffer.read(cx).remote_id(),
5190                                buffer.read(cx).file().unwrap().full_path(cx)
5191                            );
5192                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5193                            project.definition(&buffer, offset, cx)
5194                        });
5195                        let definitions = cx.background().spawn(async move {
5196                            definitions.await.expect("definitions request failed")
5197                        });
5198                        if rng.lock().gen_bool(0.3) {
5199                            log::info!("Guest {}: detaching definitions request", guest_id);
5200                            definitions.detach();
5201                        } else {
5202                            self.buffers
5203                                .extend(definitions.await.into_iter().map(|loc| loc.buffer));
5204                        }
5205                    }
5206                    50..=54 => {
5207                        let highlights = project.update(&mut cx, |project, cx| {
5208                            log::info!(
5209                                "Guest {}: requesting highlights for buffer {} ({:?})",
5210                                guest_id,
5211                                buffer.read(cx).remote_id(),
5212                                buffer.read(cx).file().unwrap().full_path(cx)
5213                            );
5214                            let offset = rng.lock().gen_range(0..=buffer.read(cx).len());
5215                            project.document_highlights(&buffer, offset, cx)
5216                        });
5217                        let highlights = cx.background().spawn(async move {
5218                            highlights.await.expect("highlights request failed");
5219                        });
5220                        if rng.lock().gen_bool(0.3) {
5221                            log::info!("Guest {}: detaching highlights request", guest_id);
5222                            highlights.detach();
5223                        } else {
5224                            highlights.await;
5225                        }
5226                    }
5227                    55..=59 => {
5228                        let search = project.update(&mut cx, |project, cx| {
5229                            let query = rng.lock().gen_range('a'..='z');
5230                            log::info!("Guest {}: project-wide search {:?}", guest_id, query);
5231                            project.search(SearchQuery::text(query, false, false), cx)
5232                        });
5233                        let search = cx
5234                            .background()
5235                            .spawn(async move { search.await.expect("search request failed") });
5236                        if rng.lock().gen_bool(0.3) {
5237                            log::info!("Guest {}: detaching search request", guest_id);
5238                            search.detach();
5239                        } else {
5240                            self.buffers.extend(search.await.into_keys());
5241                        }
5242                    }
5243                    _ => {
5244                        buffer.update(&mut cx, |buffer, cx| {
5245                            log::info!(
5246                                "Guest {}: updating buffer {} ({:?})",
5247                                guest_id,
5248                                buffer.remote_id(),
5249                                buffer.file().unwrap().full_path(cx)
5250                            );
5251                            buffer.randomly_edit(&mut *rng.lock(), 5, cx)
5252                        });
5253                    }
5254                }
5255                cx.background().simulate_random_delay().await;
5256            }
5257
5258            log::info!("Guest {} done", guest_id);
5259
5260            self.project = Some(project);
5261            (self, cx)
5262        }
5263    }
5264
5265    impl Drop for TestClient {
5266        fn drop(&mut self) {
5267            self.client.tear_down();
5268        }
5269    }
5270
5271    impl Executor for Arc<gpui::executor::Background> {
5272        type Timer = gpui::executor::Timer;
5273
5274        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
5275            self.spawn(future).detach();
5276        }
5277
5278        fn timer(&self, duration: Duration) -> Self::Timer {
5279            self.as_ref().timer(duration)
5280        }
5281    }
5282
5283    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
5284        channel
5285            .messages()
5286            .cursor::<()>()
5287            .map(|m| {
5288                (
5289                    m.sender.github_login.clone(),
5290                    m.body.clone(),
5291                    m.is_pending(),
5292                )
5293            })
5294            .collect()
5295    }
5296
5297    struct EmptyView;
5298
5299    impl gpui::Entity for EmptyView {
5300        type Event = ();
5301    }
5302
5303    impl gpui::View for EmptyView {
5304        fn ui_name() -> &'static str {
5305            "empty view"
5306        }
5307
5308        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
5309            gpui::Element::boxed(gpui::elements::Empty)
5310        }
5311    }
5312}