rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44pub trait Executor {
  45    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F);
  46}
  47
  48pub struct RealExecutor;
  49
  50const MESSAGE_COUNT_PER_PAGE: usize = 100;
  51const MAX_MESSAGE_LEN: usize = 1024;
  52
  53impl Server {
  54    pub fn new(
  55        app_state: Arc<AppState>,
  56        peer: Arc<Peer>,
  57        notifications: Option<mpsc::Sender<()>>,
  58    ) -> Arc<Self> {
  59        let mut server = Self {
  60            peer,
  61            app_state,
  62            store: Default::default(),
  63            handlers: Default::default(),
  64            notifications,
  65        };
  66
  67        server
  68            .add_request_handler(Server::ping)
  69            .add_request_handler(Server::register_project)
  70            .add_message_handler(Server::unregister_project)
  71            .add_request_handler(Server::share_project)
  72            .add_message_handler(Server::unshare_project)
  73            .add_request_handler(Server::join_project)
  74            .add_message_handler(Server::leave_project)
  75            .add_request_handler(Server::register_worktree)
  76            .add_message_handler(Server::unregister_worktree)
  77            .add_request_handler(Server::share_worktree)
  78            .add_message_handler(Server::update_worktree)
  79            .add_message_handler(Server::update_diagnostic_summary)
  80            .add_message_handler(Server::disk_based_diagnostics_updating)
  81            .add_message_handler(Server::disk_based_diagnostics_updated)
  82            .add_request_handler(Server::get_definition)
  83            .add_request_handler(Server::open_buffer)
  84            .add_message_handler(Server::close_buffer)
  85            .add_request_handler(Server::update_buffer)
  86            .add_message_handler(Server::update_buffer_file)
  87            .add_message_handler(Server::buffer_reloaded)
  88            .add_message_handler(Server::buffer_saved)
  89            .add_request_handler(Server::save_buffer)
  90            .add_request_handler(Server::format_buffers)
  91            .add_request_handler(Server::get_completions)
  92            .add_request_handler(Server::apply_additional_edits_for_completion)
  93            .add_request_handler(Server::get_code_actions)
  94            .add_request_handler(Server::apply_code_action)
  95            .add_request_handler(Server::get_channels)
  96            .add_request_handler(Server::get_users)
  97            .add_request_handler(Server::join_channel)
  98            .add_message_handler(Server::leave_channel)
  99            .add_request_handler(Server::send_channel_message)
 100            .add_request_handler(Server::get_channel_messages);
 101
 102        Arc::new(server)
 103    }
 104
 105    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 106    where
 107        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 108        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 109        M: EnvelopedMessage,
 110    {
 111        let prev_handler = self.handlers.insert(
 112            TypeId::of::<M>(),
 113            Box::new(move |server, envelope| {
 114                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 115                (handler)(server, *envelope).boxed()
 116            }),
 117        );
 118        if prev_handler.is_some() {
 119            panic!("registered a handler for the same message twice");
 120        }
 121        self
 122    }
 123
 124    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 125    where
 126        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 127        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 128        M: RequestMessage,
 129    {
 130        self.add_message_handler(move |server, envelope| {
 131            let receipt = envelope.receipt();
 132            let response = (handler)(server.clone(), envelope);
 133            async move {
 134                match response.await {
 135                    Ok(response) => {
 136                        server.peer.respond(receipt, response)?;
 137                        Ok(())
 138                    }
 139                    Err(error) => {
 140                        server.peer.respond_with_error(
 141                            receipt,
 142                            proto::Error {
 143                                message: error.to_string(),
 144                            },
 145                        )?;
 146                        Err(error)
 147                    }
 148                }
 149            }
 150        })
 151    }
 152
 153    pub fn handle_connection<E: Executor>(
 154        self: &Arc<Self>,
 155        connection: Connection,
 156        addr: String,
 157        user_id: UserId,
 158        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 159        executor: E,
 160    ) -> impl Future<Output = ()> {
 161        let mut this = self.clone();
 162        async move {
 163            let (connection_id, handle_io, mut incoming_rx) =
 164                this.peer.add_connection(connection).await;
 165
 166            if let Some(send_connection_id) = send_connection_id.as_mut() {
 167                let _ = send_connection_id.send(connection_id).await;
 168            }
 169
 170            this.state_mut().add_connection(connection_id, user_id);
 171            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 172                log::error!("error updating contacts for {:?}: {}", user_id, err);
 173            }
 174
 175            let handle_io = handle_io.fuse();
 176            futures::pin_mut!(handle_io);
 177            loop {
 178                let next_message = incoming_rx.next().fuse();
 179                futures::pin_mut!(next_message);
 180                futures::select_biased! {
 181                    result = handle_io => {
 182                        if let Err(err) = result {
 183                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 184                        }
 185                        break;
 186                    }
 187                    message = next_message => {
 188                        if let Some(message) = message {
 189                            let start_time = Instant::now();
 190                            let type_name = message.payload_type_name();
 191                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 192                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 193                                let handle_message = (handler)(this.clone(), message);
 194                                let notifications = this.notifications.clone();
 195                                executor.spawn_detached(async move {
 196                                    if let Err(err) = handle_message.await {
 197                                        log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 198                                    } else {
 199                                        log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 200                                    }
 201                                    if let Some(mut notifications) = notifications {
 202                                        let _ = notifications.send(()).await;
 203                                    }
 204                                });
 205                            } else {
 206                                log::warn!("unhandled message: {}", type_name);
 207                            }
 208                        } else {
 209                            log::info!("rpc connection closed {:?}", addr);
 210                            break;
 211                        }
 212                    }
 213                }
 214            }
 215
 216            if let Err(err) = this.sign_out(connection_id).await {
 217                log::error!("error signing out connection {:?} - {:?}", addr, err);
 218            }
 219        }
 220    }
 221
 222    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 223        self.peer.disconnect(connection_id);
 224        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 225
 226        for (project_id, project) in removed_connection.hosted_projects {
 227            if let Some(share) = project.share {
 228                broadcast(
 229                    connection_id,
 230                    share.guests.keys().copied().collect(),
 231                    |conn_id| {
 232                        self.peer
 233                            .send(conn_id, proto::UnshareProject { project_id })
 234                    },
 235                )?;
 236            }
 237        }
 238
 239        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 240            broadcast(connection_id, peer_ids, |conn_id| {
 241                self.peer.send(
 242                    conn_id,
 243                    proto::RemoveProjectCollaborator {
 244                        project_id,
 245                        peer_id: connection_id.0,
 246                    },
 247                )
 248            })?;
 249        }
 250
 251        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 252        Ok(())
 253    }
 254
 255    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 256        Ok(proto::Ack {})
 257    }
 258
 259    async fn register_project(
 260        mut self: Arc<Server>,
 261        request: TypedEnvelope<proto::RegisterProject>,
 262    ) -> tide::Result<proto::RegisterProjectResponse> {
 263        let project_id = {
 264            let mut state = self.state_mut();
 265            let user_id = state.user_id_for_connection(request.sender_id)?;
 266            state.register_project(request.sender_id, user_id)
 267        };
 268        Ok(proto::RegisterProjectResponse { project_id })
 269    }
 270
 271    async fn unregister_project(
 272        mut self: Arc<Server>,
 273        request: TypedEnvelope<proto::UnregisterProject>,
 274    ) -> tide::Result<()> {
 275        let project = self
 276            .state_mut()
 277            .unregister_project(request.payload.project_id, request.sender_id)?;
 278        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 279        Ok(())
 280    }
 281
 282    async fn share_project(
 283        mut self: Arc<Server>,
 284        request: TypedEnvelope<proto::ShareProject>,
 285    ) -> tide::Result<proto::Ack> {
 286        self.state_mut()
 287            .share_project(request.payload.project_id, request.sender_id);
 288        Ok(proto::Ack {})
 289    }
 290
 291    async fn unshare_project(
 292        mut self: Arc<Server>,
 293        request: TypedEnvelope<proto::UnshareProject>,
 294    ) -> tide::Result<()> {
 295        let project_id = request.payload.project_id;
 296        let project = self
 297            .state_mut()
 298            .unshare_project(project_id, request.sender_id)?;
 299
 300        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 301            self.peer
 302                .send(conn_id, proto::UnshareProject { project_id })
 303        })?;
 304        self.update_contacts_for_users(&project.authorized_user_ids)?;
 305        Ok(())
 306    }
 307
 308    async fn join_project(
 309        mut self: Arc<Server>,
 310        request: TypedEnvelope<proto::JoinProject>,
 311    ) -> tide::Result<proto::JoinProjectResponse> {
 312        let project_id = request.payload.project_id;
 313
 314        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 315        let (response, connection_ids, contact_user_ids) = self
 316            .state_mut()
 317            .join_project(request.sender_id, user_id, project_id)
 318            .and_then(|joined| {
 319                let share = joined.project.share()?;
 320                let peer_count = share.guests.len();
 321                let mut collaborators = Vec::with_capacity(peer_count);
 322                collaborators.push(proto::Collaborator {
 323                    peer_id: joined.project.host_connection_id.0,
 324                    replica_id: 0,
 325                    user_id: joined.project.host_user_id.to_proto(),
 326                });
 327                let worktrees = joined
 328                    .project
 329                    .worktrees
 330                    .iter()
 331                    .filter_map(|(id, worktree)| {
 332                        worktree.share.as_ref().map(|share| proto::Worktree {
 333                            id: *id,
 334                            root_name: worktree.root_name.clone(),
 335                            entries: share.entries.values().cloned().collect(),
 336                            diagnostic_summaries: share
 337                                .diagnostic_summaries
 338                                .values()
 339                                .cloned()
 340                                .collect(),
 341                            weak: worktree.weak,
 342                            next_update_id: share.next_update_id as u64,
 343                        })
 344                    })
 345                    .collect();
 346                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 347                    if *peer_conn_id != request.sender_id {
 348                        collaborators.push(proto::Collaborator {
 349                            peer_id: peer_conn_id.0,
 350                            replica_id: *peer_replica_id as u32,
 351                            user_id: peer_user_id.to_proto(),
 352                        });
 353                    }
 354                }
 355                let response = proto::JoinProjectResponse {
 356                    worktrees,
 357                    replica_id: joined.replica_id as u32,
 358                    collaborators,
 359                };
 360                let connection_ids = joined.project.connection_ids();
 361                let contact_user_ids = joined.project.authorized_user_ids();
 362                Ok((response, connection_ids, contact_user_ids))
 363            })?;
 364
 365        broadcast(request.sender_id, connection_ids, |conn_id| {
 366            self.peer.send(
 367                conn_id,
 368                proto::AddProjectCollaborator {
 369                    project_id,
 370                    collaborator: Some(proto::Collaborator {
 371                        peer_id: request.sender_id.0,
 372                        replica_id: response.replica_id,
 373                        user_id: user_id.to_proto(),
 374                    }),
 375                },
 376            )
 377        })?;
 378        self.update_contacts_for_users(&contact_user_ids)?;
 379        Ok(response)
 380    }
 381
 382    async fn leave_project(
 383        mut self: Arc<Server>,
 384        request: TypedEnvelope<proto::LeaveProject>,
 385    ) -> tide::Result<()> {
 386        let sender_id = request.sender_id;
 387        let project_id = request.payload.project_id;
 388        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 389
 390        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 391            self.peer.send(
 392                conn_id,
 393                proto::RemoveProjectCollaborator {
 394                    project_id,
 395                    peer_id: sender_id.0,
 396                },
 397            )
 398        })?;
 399        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 400
 401        Ok(())
 402    }
 403
 404    async fn register_worktree(
 405        mut self: Arc<Server>,
 406        request: TypedEnvelope<proto::RegisterWorktree>,
 407    ) -> tide::Result<proto::Ack> {
 408        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 409
 410        let mut contact_user_ids = HashSet::default();
 411        contact_user_ids.insert(host_user_id);
 412        for github_login in request.payload.authorized_logins {
 413            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 414            contact_user_ids.insert(contact_user_id);
 415        }
 416
 417        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 418        self.state_mut().register_worktree(
 419            request.payload.project_id,
 420            request.payload.worktree_id,
 421            request.sender_id,
 422            Worktree {
 423                authorized_user_ids: contact_user_ids.clone(),
 424                root_name: request.payload.root_name,
 425                share: None,
 426                weak: false,
 427            },
 428        )?;
 429        self.update_contacts_for_users(&contact_user_ids)?;
 430        Ok(proto::Ack {})
 431    }
 432
 433    async fn unregister_worktree(
 434        mut self: Arc<Server>,
 435        request: TypedEnvelope<proto::UnregisterWorktree>,
 436    ) -> tide::Result<()> {
 437        let project_id = request.payload.project_id;
 438        let worktree_id = request.payload.worktree_id;
 439        let (worktree, guest_connection_ids) =
 440            self.state_mut()
 441                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 442        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 443            self.peer.send(
 444                conn_id,
 445                proto::UnregisterWorktree {
 446                    project_id,
 447                    worktree_id,
 448                },
 449            )
 450        })?;
 451        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 452        Ok(())
 453    }
 454
 455    async fn share_worktree(
 456        mut self: Arc<Server>,
 457        mut request: TypedEnvelope<proto::ShareWorktree>,
 458    ) -> tide::Result<proto::Ack> {
 459        let worktree = request
 460            .payload
 461            .worktree
 462            .as_mut()
 463            .ok_or_else(|| anyhow!("missing worktree"))?;
 464        let entries = worktree
 465            .entries
 466            .iter()
 467            .map(|entry| (entry.id, entry.clone()))
 468            .collect();
 469        let diagnostic_summaries = worktree
 470            .diagnostic_summaries
 471            .iter()
 472            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 473            .collect();
 474
 475        let shared_worktree = self.state_mut().share_worktree(
 476            request.payload.project_id,
 477            worktree.id,
 478            request.sender_id,
 479            entries,
 480            diagnostic_summaries,
 481            worktree.next_update_id,
 482        )?;
 483
 484        broadcast(
 485            request.sender_id,
 486            shared_worktree.connection_ids,
 487            |connection_id| {
 488                self.peer
 489                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 490            },
 491        )?;
 492        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 493
 494        Ok(proto::Ack {})
 495    }
 496
 497    async fn update_worktree(
 498        mut self: Arc<Server>,
 499        request: TypedEnvelope<proto::UpdateWorktree>,
 500    ) -> tide::Result<()> {
 501        let connection_ids = self.state_mut().update_worktree(
 502            request.sender_id,
 503            request.payload.project_id,
 504            request.payload.worktree_id,
 505            &request.payload.removed_entries,
 506            &request.payload.updated_entries,
 507        )?;
 508
 509        broadcast(request.sender_id, connection_ids, |connection_id| {
 510            self.peer
 511                .forward_send(request.sender_id, connection_id, request.payload.clone())
 512        })?;
 513
 514        Ok(())
 515    }
 516
 517    async fn update_diagnostic_summary(
 518        mut self: Arc<Server>,
 519        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 520    ) -> tide::Result<()> {
 521        let summary = request
 522            .payload
 523            .summary
 524            .clone()
 525            .ok_or_else(|| anyhow!("invalid summary"))?;
 526        let receiver_ids = self.state_mut().update_diagnostic_summary(
 527            request.payload.project_id,
 528            request.payload.worktree_id,
 529            request.sender_id,
 530            summary,
 531        )?;
 532
 533        broadcast(request.sender_id, receiver_ids, |connection_id| {
 534            self.peer
 535                .forward_send(request.sender_id, connection_id, request.payload.clone())
 536        })?;
 537        Ok(())
 538    }
 539
 540    async fn disk_based_diagnostics_updating(
 541        self: Arc<Server>,
 542        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 543    ) -> tide::Result<()> {
 544        let receiver_ids = self
 545            .state()
 546            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 547        broadcast(request.sender_id, receiver_ids, |connection_id| {
 548            self.peer
 549                .forward_send(request.sender_id, connection_id, request.payload.clone())
 550        })?;
 551        Ok(())
 552    }
 553
 554    async fn disk_based_diagnostics_updated(
 555        self: Arc<Server>,
 556        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 557    ) -> tide::Result<()> {
 558        let receiver_ids = self
 559            .state()
 560            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 561        broadcast(request.sender_id, receiver_ids, |connection_id| {
 562            self.peer
 563                .forward_send(request.sender_id, connection_id, request.payload.clone())
 564        })?;
 565        Ok(())
 566    }
 567
 568    async fn get_definition(
 569        self: Arc<Server>,
 570        request: TypedEnvelope<proto::GetDefinition>,
 571    ) -> tide::Result<proto::GetDefinitionResponse> {
 572        let host_connection_id = self
 573            .state()
 574            .read_project(request.payload.project_id, request.sender_id)?
 575            .host_connection_id;
 576        Ok(self
 577            .peer
 578            .forward_request(request.sender_id, host_connection_id, request.payload)
 579            .await?)
 580    }
 581
 582    async fn open_buffer(
 583        self: Arc<Server>,
 584        request: TypedEnvelope<proto::OpenBuffer>,
 585    ) -> tide::Result<proto::OpenBufferResponse> {
 586        let host_connection_id = self
 587            .state()
 588            .read_project(request.payload.project_id, request.sender_id)?
 589            .host_connection_id;
 590        Ok(self
 591            .peer
 592            .forward_request(request.sender_id, host_connection_id, request.payload)
 593            .await?)
 594    }
 595
 596    async fn close_buffer(
 597        self: Arc<Server>,
 598        request: TypedEnvelope<proto::CloseBuffer>,
 599    ) -> tide::Result<()> {
 600        let host_connection_id = self
 601            .state()
 602            .read_project(request.payload.project_id, request.sender_id)?
 603            .host_connection_id;
 604        self.peer
 605            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 606        Ok(())
 607    }
 608
 609    async fn save_buffer(
 610        self: Arc<Server>,
 611        request: TypedEnvelope<proto::SaveBuffer>,
 612    ) -> tide::Result<proto::BufferSaved> {
 613        let host;
 614        let mut guests;
 615        {
 616            let state = self.state();
 617            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 618            host = project.host_connection_id;
 619            guests = project.guest_connection_ids()
 620        }
 621
 622        let response = self
 623            .peer
 624            .forward_request(request.sender_id, host, request.payload.clone())
 625            .await?;
 626
 627        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 628        broadcast(host, guests, |conn_id| {
 629            self.peer.forward_send(host, conn_id, response.clone())
 630        })?;
 631
 632        Ok(response)
 633    }
 634
 635    async fn format_buffers(
 636        self: Arc<Server>,
 637        request: TypedEnvelope<proto::FormatBuffers>,
 638    ) -> tide::Result<proto::FormatBuffersResponse> {
 639        let host = self
 640            .state()
 641            .read_project(request.payload.project_id, request.sender_id)?
 642            .host_connection_id;
 643        Ok(self
 644            .peer
 645            .forward_request(request.sender_id, host, request.payload.clone())
 646            .await?)
 647    }
 648
 649    async fn get_completions(
 650        self: Arc<Server>,
 651        request: TypedEnvelope<proto::GetCompletions>,
 652    ) -> tide::Result<proto::GetCompletionsResponse> {
 653        let host = self
 654            .state()
 655            .read_project(request.payload.project_id, request.sender_id)?
 656            .host_connection_id;
 657        Ok(self
 658            .peer
 659            .forward_request(request.sender_id, host, request.payload.clone())
 660            .await?)
 661    }
 662
 663    async fn apply_additional_edits_for_completion(
 664        self: Arc<Server>,
 665        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 666    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 667        let host = self
 668            .state()
 669            .read_project(request.payload.project_id, request.sender_id)?
 670            .host_connection_id;
 671        Ok(self
 672            .peer
 673            .forward_request(request.sender_id, host, request.payload.clone())
 674            .await?)
 675    }
 676
 677    async fn get_code_actions(
 678        self: Arc<Server>,
 679        request: TypedEnvelope<proto::GetCodeActions>,
 680    ) -> tide::Result<proto::GetCodeActionsResponse> {
 681        let host = self
 682            .state()
 683            .read_project(request.payload.project_id, request.sender_id)?
 684            .host_connection_id;
 685        Ok(self
 686            .peer
 687            .forward_request(request.sender_id, host, request.payload.clone())
 688            .await?)
 689    }
 690
 691    async fn apply_code_action(
 692        self: Arc<Server>,
 693        request: TypedEnvelope<proto::ApplyCodeAction>,
 694    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 695        let host = self
 696            .state()
 697            .read_project(request.payload.project_id, request.sender_id)?
 698            .host_connection_id;
 699        Ok(self
 700            .peer
 701            .forward_request(request.sender_id, host, request.payload.clone())
 702            .await?)
 703    }
 704
 705    async fn update_buffer(
 706        self: Arc<Server>,
 707        request: TypedEnvelope<proto::UpdateBuffer>,
 708    ) -> tide::Result<proto::Ack> {
 709        let receiver_ids = self
 710            .state()
 711            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 712        broadcast(request.sender_id, receiver_ids, |connection_id| {
 713            self.peer
 714                .forward_send(request.sender_id, connection_id, request.payload.clone())
 715        })?;
 716        Ok(proto::Ack {})
 717    }
 718
 719    async fn update_buffer_file(
 720        self: Arc<Server>,
 721        request: TypedEnvelope<proto::UpdateBufferFile>,
 722    ) -> tide::Result<()> {
 723        let receiver_ids = self
 724            .state()
 725            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 726        broadcast(request.sender_id, receiver_ids, |connection_id| {
 727            self.peer
 728                .forward_send(request.sender_id, connection_id, request.payload.clone())
 729        })?;
 730        Ok(())
 731    }
 732
 733    async fn buffer_reloaded(
 734        self: Arc<Server>,
 735        request: TypedEnvelope<proto::BufferReloaded>,
 736    ) -> tide::Result<()> {
 737        let receiver_ids = self
 738            .state()
 739            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 740        broadcast(request.sender_id, receiver_ids, |connection_id| {
 741            self.peer
 742                .forward_send(request.sender_id, connection_id, request.payload.clone())
 743        })?;
 744        Ok(())
 745    }
 746
 747    async fn buffer_saved(
 748        self: Arc<Server>,
 749        request: TypedEnvelope<proto::BufferSaved>,
 750    ) -> tide::Result<()> {
 751        let receiver_ids = self
 752            .state()
 753            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 754        broadcast(request.sender_id, receiver_ids, |connection_id| {
 755            self.peer
 756                .forward_send(request.sender_id, connection_id, request.payload.clone())
 757        })?;
 758        Ok(())
 759    }
 760
 761    async fn get_channels(
 762        self: Arc<Server>,
 763        request: TypedEnvelope<proto::GetChannels>,
 764    ) -> tide::Result<proto::GetChannelsResponse> {
 765        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 766        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 767        Ok(proto::GetChannelsResponse {
 768            channels: channels
 769                .into_iter()
 770                .map(|chan| proto::Channel {
 771                    id: chan.id.to_proto(),
 772                    name: chan.name,
 773                })
 774                .collect(),
 775        })
 776    }
 777
 778    async fn get_users(
 779        self: Arc<Server>,
 780        request: TypedEnvelope<proto::GetUsers>,
 781    ) -> tide::Result<proto::GetUsersResponse> {
 782        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 783        let users = self
 784            .app_state
 785            .db
 786            .get_users_by_ids(user_ids)
 787            .await?
 788            .into_iter()
 789            .map(|user| proto::User {
 790                id: user.id.to_proto(),
 791                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 792                github_login: user.github_login,
 793            })
 794            .collect();
 795        Ok(proto::GetUsersResponse { users })
 796    }
 797
 798    fn update_contacts_for_users<'a>(
 799        self: &Arc<Server>,
 800        user_ids: impl IntoIterator<Item = &'a UserId>,
 801    ) -> anyhow::Result<()> {
 802        let mut result = Ok(());
 803        let state = self.state();
 804        for user_id in user_ids {
 805            let contacts = state.contacts_for_user(*user_id);
 806            for connection_id in state.connection_ids_for_user(*user_id) {
 807                if let Err(error) = self.peer.send(
 808                    connection_id,
 809                    proto::UpdateContacts {
 810                        contacts: contacts.clone(),
 811                    },
 812                ) {
 813                    result = Err(error);
 814                }
 815            }
 816        }
 817        result
 818    }
 819
 820    async fn join_channel(
 821        mut self: Arc<Self>,
 822        request: TypedEnvelope<proto::JoinChannel>,
 823    ) -> tide::Result<proto::JoinChannelResponse> {
 824        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 825        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 826        if !self
 827            .app_state
 828            .db
 829            .can_user_access_channel(user_id, channel_id)
 830            .await?
 831        {
 832            Err(anyhow!("access denied"))?;
 833        }
 834
 835        self.state_mut().join_channel(request.sender_id, channel_id);
 836        let messages = self
 837            .app_state
 838            .db
 839            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 840            .await?
 841            .into_iter()
 842            .map(|msg| proto::ChannelMessage {
 843                id: msg.id.to_proto(),
 844                body: msg.body,
 845                timestamp: msg.sent_at.unix_timestamp() as u64,
 846                sender_id: msg.sender_id.to_proto(),
 847                nonce: Some(msg.nonce.as_u128().into()),
 848            })
 849            .collect::<Vec<_>>();
 850        Ok(proto::JoinChannelResponse {
 851            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 852            messages,
 853        })
 854    }
 855
 856    async fn leave_channel(
 857        mut self: Arc<Self>,
 858        request: TypedEnvelope<proto::LeaveChannel>,
 859    ) -> tide::Result<()> {
 860        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 861        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 862        if !self
 863            .app_state
 864            .db
 865            .can_user_access_channel(user_id, channel_id)
 866            .await?
 867        {
 868            Err(anyhow!("access denied"))?;
 869        }
 870
 871        self.state_mut()
 872            .leave_channel(request.sender_id, channel_id);
 873
 874        Ok(())
 875    }
 876
 877    async fn send_channel_message(
 878        self: Arc<Self>,
 879        request: TypedEnvelope<proto::SendChannelMessage>,
 880    ) -> tide::Result<proto::SendChannelMessageResponse> {
 881        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 882        let user_id;
 883        let connection_ids;
 884        {
 885            let state = self.state();
 886            user_id = state.user_id_for_connection(request.sender_id)?;
 887            connection_ids = state.channel_connection_ids(channel_id)?;
 888        }
 889
 890        // Validate the message body.
 891        let body = request.payload.body.trim().to_string();
 892        if body.len() > MAX_MESSAGE_LEN {
 893            return Err(anyhow!("message is too long"))?;
 894        }
 895        if body.is_empty() {
 896            return Err(anyhow!("message can't be blank"))?;
 897        }
 898
 899        let timestamp = OffsetDateTime::now_utc();
 900        let nonce = request
 901            .payload
 902            .nonce
 903            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 904
 905        let message_id = self
 906            .app_state
 907            .db
 908            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 909            .await?
 910            .to_proto();
 911        let message = proto::ChannelMessage {
 912            sender_id: user_id.to_proto(),
 913            id: message_id,
 914            body,
 915            timestamp: timestamp.unix_timestamp() as u64,
 916            nonce: Some(nonce),
 917        };
 918        broadcast(request.sender_id, connection_ids, |conn_id| {
 919            self.peer.send(
 920                conn_id,
 921                proto::ChannelMessageSent {
 922                    channel_id: channel_id.to_proto(),
 923                    message: Some(message.clone()),
 924                },
 925            )
 926        })?;
 927        Ok(proto::SendChannelMessageResponse {
 928            message: Some(message),
 929        })
 930    }
 931
 932    async fn get_channel_messages(
 933        self: Arc<Self>,
 934        request: TypedEnvelope<proto::GetChannelMessages>,
 935    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 936        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 937        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 938        if !self
 939            .app_state
 940            .db
 941            .can_user_access_channel(user_id, channel_id)
 942            .await?
 943        {
 944            Err(anyhow!("access denied"))?;
 945        }
 946
 947        let messages = self
 948            .app_state
 949            .db
 950            .get_channel_messages(
 951                channel_id,
 952                MESSAGE_COUNT_PER_PAGE,
 953                Some(MessageId::from_proto(request.payload.before_message_id)),
 954            )
 955            .await?
 956            .into_iter()
 957            .map(|msg| proto::ChannelMessage {
 958                id: msg.id.to_proto(),
 959                body: msg.body,
 960                timestamp: msg.sent_at.unix_timestamp() as u64,
 961                sender_id: msg.sender_id.to_proto(),
 962                nonce: Some(msg.nonce.as_u128().into()),
 963            })
 964            .collect::<Vec<_>>();
 965
 966        Ok(proto::GetChannelMessagesResponse {
 967            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 968            messages,
 969        })
 970    }
 971
 972    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 973        self.store.read()
 974    }
 975
 976    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 977        self.store.write()
 978    }
 979}
 980
 981impl Executor for RealExecutor {
 982    fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
 983        task::spawn(future);
 984    }
 985}
 986
 987fn broadcast<F>(
 988    sender_id: ConnectionId,
 989    receiver_ids: Vec<ConnectionId>,
 990    mut f: F,
 991) -> anyhow::Result<()>
 992where
 993    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 994{
 995    let mut result = Ok(());
 996    for receiver_id in receiver_ids {
 997        if receiver_id != sender_id {
 998            if let Err(error) = f(receiver_id) {
 999                if result.is_ok() {
1000                    result = Err(error);
1001                }
1002            }
1003        }
1004    }
1005    result
1006}
1007
1008pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1009    let server = Server::new(app.state().clone(), rpc.clone(), None);
1010    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1011        let server = server.clone();
1012        async move {
1013            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1014
1015            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1016            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1017            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1018            let client_protocol_version: Option<u32> = request
1019                .header("X-Zed-Protocol-Version")
1020                .and_then(|v| v.as_str().parse().ok());
1021
1022            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1023                return Ok(Response::new(StatusCode::UpgradeRequired));
1024            }
1025
1026            let header = match request.header("Sec-Websocket-Key") {
1027                Some(h) => h.as_str(),
1028                None => return Err(anyhow!("expected sec-websocket-key"))?,
1029            };
1030
1031            let user_id = process_auth_header(&request).await?;
1032
1033            let mut response = Response::new(StatusCode::SwitchingProtocols);
1034            response.insert_header(UPGRADE, "websocket");
1035            response.insert_header(CONNECTION, "Upgrade");
1036            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1037            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1038            response.insert_header("Sec-Websocket-Version", "13");
1039
1040            let http_res: &mut tide::http::Response = response.as_mut();
1041            let upgrade_receiver = http_res.recv_upgrade().await;
1042            let addr = request.remote().unwrap_or("unknown").to_string();
1043            task::spawn(async move {
1044                if let Some(stream) = upgrade_receiver.await {
1045                    server
1046                        .handle_connection(
1047                            Connection::new(
1048                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1049                            ),
1050                            addr,
1051                            user_id,
1052                            None,
1053                            RealExecutor,
1054                        )
1055                        .await;
1056                }
1057            });
1058
1059            Ok(response)
1060        }
1061    });
1062}
1063
1064fn header_contains_ignore_case<T>(
1065    request: &tide::Request<T>,
1066    header_name: HeaderName,
1067    value: &str,
1068) -> bool {
1069    request
1070        .header(header_name)
1071        .map(|h| {
1072            h.as_str()
1073                .split(',')
1074                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1075        })
1076        .unwrap_or(false)
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081    use super::*;
1082    use crate::{
1083        auth,
1084        db::{tests::TestDb, UserId},
1085        github, AppState, Config,
1086    };
1087    use ::rpc::Peer;
1088    use gpui::{executor, ModelHandle, TestAppContext};
1089    use parking_lot::Mutex;
1090    use postage::{mpsc, watch};
1091    use rand::prelude::*;
1092    use rpc::PeerId;
1093    use serde_json::json;
1094    use sqlx::types::time::OffsetDateTime;
1095    use std::{
1096        ops::Deref,
1097        path::Path,
1098        rc::Rc,
1099        sync::{
1100            atomic::{AtomicBool, Ordering::SeqCst},
1101            Arc,
1102        },
1103        time::Duration,
1104    };
1105    use zed::{
1106        client::{
1107            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1108            EstablishConnectionError, UserStore,
1109        },
1110        editor::{
1111            self, ConfirmCodeAction, ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer,
1112            Redo, ToggleCodeActions, Undo,
1113        },
1114        fs::{FakeFs, Fs as _},
1115        language::{
1116            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1117            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1118        },
1119        lsp,
1120        project::{DiagnosticSummary, Project, ProjectPath},
1121        workspace::{Workspace, WorkspaceParams},
1122    };
1123
1124    #[cfg(test)]
1125    #[ctor::ctor]
1126    fn init_logger() {
1127        if std::env::var("RUST_LOG").is_ok() {
1128            env_logger::init();
1129        }
1130    }
1131
1132    #[gpui::test(iterations = 10)]
1133    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1134        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1135        let lang_registry = Arc::new(LanguageRegistry::new());
1136        let fs = Arc::new(FakeFs::new(cx_a.background()));
1137        cx_a.foreground().forbid_parking();
1138
1139        // Connect to a server as 2 clients.
1140        let mut server = TestServer::start(cx_a.foreground()).await;
1141        let client_a = server.create_client(&mut cx_a, "user_a").await;
1142        let client_b = server.create_client(&mut cx_b, "user_b").await;
1143
1144        // Share a project as client A
1145        fs.insert_tree(
1146            "/a",
1147            json!({
1148                ".zed.toml": r#"collaborators = ["user_b"]"#,
1149                "a.txt": "a-contents",
1150                "b.txt": "b-contents",
1151            }),
1152        )
1153        .await;
1154        let project_a = cx_a.update(|cx| {
1155            Project::local(
1156                client_a.clone(),
1157                client_a.user_store.clone(),
1158                lang_registry.clone(),
1159                fs.clone(),
1160                cx,
1161            )
1162        });
1163        let (worktree_a, _) = project_a
1164            .update(&mut cx_a, |p, cx| {
1165                p.find_or_create_local_worktree("/a", false, cx)
1166            })
1167            .await
1168            .unwrap();
1169        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1170        worktree_a
1171            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1172            .await;
1173        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1174        project_a
1175            .update(&mut cx_a, |p, cx| p.share(cx))
1176            .await
1177            .unwrap();
1178
1179        // Join that project as client B
1180        let project_b = Project::remote(
1181            project_id,
1182            client_b.clone(),
1183            client_b.user_store.clone(),
1184            lang_registry.clone(),
1185            fs.clone(),
1186            &mut cx_b.to_async(),
1187        )
1188        .await
1189        .unwrap();
1190
1191        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1192            assert_eq!(
1193                project
1194                    .collaborators()
1195                    .get(&client_a.peer_id)
1196                    .unwrap()
1197                    .user
1198                    .github_login,
1199                "user_a"
1200            );
1201            project.replica_id()
1202        });
1203        project_a
1204            .condition(&cx_a, |tree, _| {
1205                tree.collaborators()
1206                    .get(&client_b.peer_id)
1207                    .map_or(false, |collaborator| {
1208                        collaborator.replica_id == replica_id_b
1209                            && collaborator.user.github_login == "user_b"
1210                    })
1211            })
1212            .await;
1213
1214        // Open the same file as client B and client A.
1215        let buffer_b = project_b
1216            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1217            .await
1218            .unwrap();
1219        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1220        buffer_b.read_with(&cx_b, |buf, cx| {
1221            assert_eq!(buf.read(cx).text(), "b-contents")
1222        });
1223        project_a.read_with(&cx_a, |project, cx| {
1224            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1225        });
1226        let buffer_a = project_a
1227            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1228            .await
1229            .unwrap();
1230
1231        let editor_b = cx_b.add_view(window_b, |cx| {
1232            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1233        });
1234
1235        // TODO
1236        // // Create a selection set as client B and see that selection set as client A.
1237        // buffer_a
1238        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1239        //     .await;
1240
1241        // Edit the buffer as client B and see that edit as client A.
1242        editor_b.update(&mut cx_b, |editor, cx| {
1243            editor.handle_input(&Input("ok, ".into()), cx)
1244        });
1245        buffer_a
1246            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1247            .await;
1248
1249        // TODO
1250        // // Remove the selection set as client B, see those selections disappear as client A.
1251        cx_b.update(move |_| drop(editor_b));
1252        // buffer_a
1253        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1254        //     .await;
1255
1256        // Close the buffer as client A, see that the buffer is closed.
1257        cx_a.update(move |_| drop(buffer_a));
1258        project_a
1259            .condition(&cx_a, |project, cx| {
1260                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1261            })
1262            .await;
1263
1264        // Dropping the client B's project removes client B from client A's collaborators.
1265        cx_b.update(move |_| drop(project_b));
1266        project_a
1267            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1268            .await;
1269    }
1270
1271    #[gpui::test(iterations = 10)]
1272    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1273        let lang_registry = Arc::new(LanguageRegistry::new());
1274        let fs = Arc::new(FakeFs::new(cx_a.background()));
1275        cx_a.foreground().forbid_parking();
1276
1277        // Connect to a server as 2 clients.
1278        let mut server = TestServer::start(cx_a.foreground()).await;
1279        let client_a = server.create_client(&mut cx_a, "user_a").await;
1280        let client_b = server.create_client(&mut cx_b, "user_b").await;
1281
1282        // Share a project as client A
1283        fs.insert_tree(
1284            "/a",
1285            json!({
1286                ".zed.toml": r#"collaborators = ["user_b"]"#,
1287                "a.txt": "a-contents",
1288                "b.txt": "b-contents",
1289            }),
1290        )
1291        .await;
1292        let project_a = cx_a.update(|cx| {
1293            Project::local(
1294                client_a.clone(),
1295                client_a.user_store.clone(),
1296                lang_registry.clone(),
1297                fs.clone(),
1298                cx,
1299            )
1300        });
1301        let (worktree_a, _) = project_a
1302            .update(&mut cx_a, |p, cx| {
1303                p.find_or_create_local_worktree("/a", false, cx)
1304            })
1305            .await
1306            .unwrap();
1307        worktree_a
1308            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1309            .await;
1310        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1311        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1312        project_a
1313            .update(&mut cx_a, |p, cx| p.share(cx))
1314            .await
1315            .unwrap();
1316        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1317
1318        // Join that project as client B
1319        let project_b = Project::remote(
1320            project_id,
1321            client_b.clone(),
1322            client_b.user_store.clone(),
1323            lang_registry.clone(),
1324            fs.clone(),
1325            &mut cx_b.to_async(),
1326        )
1327        .await
1328        .unwrap();
1329        project_b
1330            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1331            .await
1332            .unwrap();
1333
1334        // Unshare the project as client A
1335        project_a
1336            .update(&mut cx_a, |project, cx| project.unshare(cx))
1337            .await
1338            .unwrap();
1339        project_b
1340            .condition(&mut cx_b, |project, _| project.is_read_only())
1341            .await;
1342        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1343        drop(project_b);
1344
1345        // Share the project again and ensure guests can still join.
1346        project_a
1347            .update(&mut cx_a, |project, cx| project.share(cx))
1348            .await
1349            .unwrap();
1350        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1351
1352        let project_c = Project::remote(
1353            project_id,
1354            client_b.clone(),
1355            client_b.user_store.clone(),
1356            lang_registry.clone(),
1357            fs.clone(),
1358            &mut cx_b.to_async(),
1359        )
1360        .await
1361        .unwrap();
1362        project_c
1363            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1364            .await
1365            .unwrap();
1366    }
1367
1368    #[gpui::test(iterations = 10)]
1369    async fn test_propagate_saves_and_fs_changes(
1370        mut cx_a: TestAppContext,
1371        mut cx_b: TestAppContext,
1372        mut cx_c: TestAppContext,
1373    ) {
1374        let lang_registry = Arc::new(LanguageRegistry::new());
1375        let fs = Arc::new(FakeFs::new(cx_a.background()));
1376        cx_a.foreground().forbid_parking();
1377
1378        // Connect to a server as 3 clients.
1379        let mut server = TestServer::start(cx_a.foreground()).await;
1380        let client_a = server.create_client(&mut cx_a, "user_a").await;
1381        let client_b = server.create_client(&mut cx_b, "user_b").await;
1382        let client_c = server.create_client(&mut cx_c, "user_c").await;
1383
1384        // Share a worktree as client A.
1385        fs.insert_tree(
1386            "/a",
1387            json!({
1388                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1389                "file1": "",
1390                "file2": ""
1391            }),
1392        )
1393        .await;
1394        let project_a = cx_a.update(|cx| {
1395            Project::local(
1396                client_a.clone(),
1397                client_a.user_store.clone(),
1398                lang_registry.clone(),
1399                fs.clone(),
1400                cx,
1401            )
1402        });
1403        let (worktree_a, _) = project_a
1404            .update(&mut cx_a, |p, cx| {
1405                p.find_or_create_local_worktree("/a", false, cx)
1406            })
1407            .await
1408            .unwrap();
1409        worktree_a
1410            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1411            .await;
1412        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1413        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1414        project_a
1415            .update(&mut cx_a, |p, cx| p.share(cx))
1416            .await
1417            .unwrap();
1418
1419        // Join that worktree as clients B and C.
1420        let project_b = Project::remote(
1421            project_id,
1422            client_b.clone(),
1423            client_b.user_store.clone(),
1424            lang_registry.clone(),
1425            fs.clone(),
1426            &mut cx_b.to_async(),
1427        )
1428        .await
1429        .unwrap();
1430        let project_c = Project::remote(
1431            project_id,
1432            client_c.clone(),
1433            client_c.user_store.clone(),
1434            lang_registry.clone(),
1435            fs.clone(),
1436            &mut cx_c.to_async(),
1437        )
1438        .await
1439        .unwrap();
1440        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1441        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1442
1443        // Open and edit a buffer as both guests B and C.
1444        let buffer_b = project_b
1445            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1446            .await
1447            .unwrap();
1448        let buffer_c = project_c
1449            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1450            .await
1451            .unwrap();
1452        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1453        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1454
1455        // Open and edit that buffer as the host.
1456        let buffer_a = project_a
1457            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1458            .await
1459            .unwrap();
1460
1461        buffer_a
1462            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1463            .await;
1464        buffer_a.update(&mut cx_a, |buf, cx| {
1465            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1466        });
1467
1468        // Wait for edits to propagate
1469        buffer_a
1470            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1471            .await;
1472        buffer_b
1473            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1474            .await;
1475        buffer_c
1476            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1477            .await;
1478
1479        // Edit the buffer as the host and concurrently save as guest B.
1480        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1481        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1482        save_b.await.unwrap();
1483        assert_eq!(
1484            fs.load("/a/file1".as_ref()).await.unwrap(),
1485            "hi-a, i-am-c, i-am-b, i-am-a"
1486        );
1487        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1488        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1489        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1490
1491        // Make changes on host's file system, see those changes on guest worktrees.
1492        fs.rename(
1493            "/a/file1".as_ref(),
1494            "/a/file1-renamed".as_ref(),
1495            Default::default(),
1496        )
1497        .await
1498        .unwrap();
1499
1500        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1501            .await
1502            .unwrap();
1503        fs.insert_file(Path::new("/a/file4"), "4".into())
1504            .await
1505            .unwrap();
1506
1507        worktree_a
1508            .condition(&cx_a, |tree, _| {
1509                tree.paths()
1510                    .map(|p| p.to_string_lossy())
1511                    .collect::<Vec<_>>()
1512                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1513            })
1514            .await;
1515        worktree_b
1516            .condition(&cx_b, |tree, _| {
1517                tree.paths()
1518                    .map(|p| p.to_string_lossy())
1519                    .collect::<Vec<_>>()
1520                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1521            })
1522            .await;
1523        worktree_c
1524            .condition(&cx_c, |tree, _| {
1525                tree.paths()
1526                    .map(|p| p.to_string_lossy())
1527                    .collect::<Vec<_>>()
1528                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1529            })
1530            .await;
1531
1532        // Ensure buffer files are updated as well.
1533        buffer_a
1534            .condition(&cx_a, |buf, _| {
1535                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1536            })
1537            .await;
1538        buffer_b
1539            .condition(&cx_b, |buf, _| {
1540                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1541            })
1542            .await;
1543        buffer_c
1544            .condition(&cx_c, |buf, _| {
1545                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1546            })
1547            .await;
1548    }
1549
1550    #[gpui::test(iterations = 10)]
1551    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1552        cx_a.foreground().forbid_parking();
1553        let lang_registry = Arc::new(LanguageRegistry::new());
1554        let fs = Arc::new(FakeFs::new(cx_a.background()));
1555
1556        // Connect to a server as 2 clients.
1557        let mut server = TestServer::start(cx_a.foreground()).await;
1558        let client_a = server.create_client(&mut cx_a, "user_a").await;
1559        let client_b = server.create_client(&mut cx_b, "user_b").await;
1560
1561        // Share a project as client A
1562        fs.insert_tree(
1563            "/dir",
1564            json!({
1565                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1566                "a.txt": "a-contents",
1567            }),
1568        )
1569        .await;
1570
1571        let project_a = cx_a.update(|cx| {
1572            Project::local(
1573                client_a.clone(),
1574                client_a.user_store.clone(),
1575                lang_registry.clone(),
1576                fs.clone(),
1577                cx,
1578            )
1579        });
1580        let (worktree_a, _) = project_a
1581            .update(&mut cx_a, |p, cx| {
1582                p.find_or_create_local_worktree("/dir", false, cx)
1583            })
1584            .await
1585            .unwrap();
1586        worktree_a
1587            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1588            .await;
1589        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1590        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1591        project_a
1592            .update(&mut cx_a, |p, cx| p.share(cx))
1593            .await
1594            .unwrap();
1595
1596        // Join that project as client B
1597        let project_b = Project::remote(
1598            project_id,
1599            client_b.clone(),
1600            client_b.user_store.clone(),
1601            lang_registry.clone(),
1602            fs.clone(),
1603            &mut cx_b.to_async(),
1604        )
1605        .await
1606        .unwrap();
1607
1608        // Open a buffer as client B
1609        let buffer_b = project_b
1610            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1611            .await
1612            .unwrap();
1613
1614        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1615        buffer_b.read_with(&cx_b, |buf, _| {
1616            assert!(buf.is_dirty());
1617            assert!(!buf.has_conflict());
1618        });
1619
1620        buffer_b
1621            .update(&mut cx_b, |buf, cx| buf.save(cx))
1622            .await
1623            .unwrap();
1624        buffer_b
1625            .condition(&cx_b, |buffer_b, _| !buffer_b.is_dirty())
1626            .await;
1627        buffer_b.read_with(&cx_b, |buf, _| {
1628            assert!(!buf.has_conflict());
1629        });
1630
1631        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1632        buffer_b.read_with(&cx_b, |buf, _| {
1633            assert!(buf.is_dirty());
1634            assert!(!buf.has_conflict());
1635        });
1636    }
1637
1638    #[gpui::test(iterations = 10)]
1639    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1640        cx_a.foreground().forbid_parking();
1641        let lang_registry = Arc::new(LanguageRegistry::new());
1642        let fs = Arc::new(FakeFs::new(cx_a.background()));
1643
1644        // Connect to a server as 2 clients.
1645        let mut server = TestServer::start(cx_a.foreground()).await;
1646        let client_a = server.create_client(&mut cx_a, "user_a").await;
1647        let client_b = server.create_client(&mut cx_b, "user_b").await;
1648
1649        // Share a project as client A
1650        fs.insert_tree(
1651            "/dir",
1652            json!({
1653                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1654                "a.txt": "a-contents",
1655            }),
1656        )
1657        .await;
1658
1659        let project_a = cx_a.update(|cx| {
1660            Project::local(
1661                client_a.clone(),
1662                client_a.user_store.clone(),
1663                lang_registry.clone(),
1664                fs.clone(),
1665                cx,
1666            )
1667        });
1668        let (worktree_a, _) = project_a
1669            .update(&mut cx_a, |p, cx| {
1670                p.find_or_create_local_worktree("/dir", false, cx)
1671            })
1672            .await
1673            .unwrap();
1674        worktree_a
1675            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1676            .await;
1677        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1678        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1679        project_a
1680            .update(&mut cx_a, |p, cx| p.share(cx))
1681            .await
1682            .unwrap();
1683
1684        // Join that project as client B
1685        let project_b = Project::remote(
1686            project_id,
1687            client_b.clone(),
1688            client_b.user_store.clone(),
1689            lang_registry.clone(),
1690            fs.clone(),
1691            &mut cx_b.to_async(),
1692        )
1693        .await
1694        .unwrap();
1695        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1696
1697        // Open a buffer as client B
1698        let buffer_b = project_b
1699            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1700            .await
1701            .unwrap();
1702        buffer_b.read_with(&cx_b, |buf, _| {
1703            assert!(!buf.is_dirty());
1704            assert!(!buf.has_conflict());
1705        });
1706
1707        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1708            .await
1709            .unwrap();
1710        buffer_b
1711            .condition(&cx_b, |buf, _| {
1712                buf.text() == "new contents" && !buf.is_dirty()
1713            })
1714            .await;
1715        buffer_b.read_with(&cx_b, |buf, _| {
1716            assert!(!buf.has_conflict());
1717        });
1718    }
1719
1720    #[gpui::test(iterations = 10)]
1721    async fn test_editing_while_guest_opens_buffer(
1722        mut cx_a: TestAppContext,
1723        mut cx_b: TestAppContext,
1724    ) {
1725        cx_a.foreground().forbid_parking();
1726        let lang_registry = Arc::new(LanguageRegistry::new());
1727        let fs = Arc::new(FakeFs::new(cx_a.background()));
1728
1729        // Connect to a server as 2 clients.
1730        let mut server = TestServer::start(cx_a.foreground()).await;
1731        let client_a = server.create_client(&mut cx_a, "user_a").await;
1732        let client_b = server.create_client(&mut cx_b, "user_b").await;
1733
1734        // Share a project as client A
1735        fs.insert_tree(
1736            "/dir",
1737            json!({
1738                ".zed.toml": r#"collaborators = ["user_b"]"#,
1739                "a.txt": "a-contents",
1740            }),
1741        )
1742        .await;
1743        let project_a = cx_a.update(|cx| {
1744            Project::local(
1745                client_a.clone(),
1746                client_a.user_store.clone(),
1747                lang_registry.clone(),
1748                fs.clone(),
1749                cx,
1750            )
1751        });
1752        let (worktree_a, _) = project_a
1753            .update(&mut cx_a, |p, cx| {
1754                p.find_or_create_local_worktree("/dir", false, cx)
1755            })
1756            .await
1757            .unwrap();
1758        worktree_a
1759            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1760            .await;
1761        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1762        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1763        project_a
1764            .update(&mut cx_a, |p, cx| p.share(cx))
1765            .await
1766            .unwrap();
1767
1768        // Join that project as client B
1769        let project_b = Project::remote(
1770            project_id,
1771            client_b.clone(),
1772            client_b.user_store.clone(),
1773            lang_registry.clone(),
1774            fs.clone(),
1775            &mut cx_b.to_async(),
1776        )
1777        .await
1778        .unwrap();
1779
1780        // Open a buffer as client A
1781        let buffer_a = project_a
1782            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1783            .await
1784            .unwrap();
1785
1786        // Start opening the same buffer as client B
1787        let buffer_b = cx_b
1788            .background()
1789            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1790
1791        // Edit the buffer as client A while client B is still opening it.
1792        cx_b.background().simulate_random_delay().await;
1793        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "X", cx));
1794        cx_b.background().simulate_random_delay().await;
1795        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([1..1], "Y", cx));
1796
1797        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1798        let buffer_b = buffer_b.await.unwrap();
1799        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1800    }
1801
1802    #[gpui::test(iterations = 10)]
1803    async fn test_leaving_worktree_while_opening_buffer(
1804        mut cx_a: TestAppContext,
1805        mut cx_b: TestAppContext,
1806    ) {
1807        cx_a.foreground().forbid_parking();
1808        let lang_registry = Arc::new(LanguageRegistry::new());
1809        let fs = Arc::new(FakeFs::new(cx_a.background()));
1810
1811        // Connect to a server as 2 clients.
1812        let mut server = TestServer::start(cx_a.foreground()).await;
1813        let client_a = server.create_client(&mut cx_a, "user_a").await;
1814        let client_b = server.create_client(&mut cx_b, "user_b").await;
1815
1816        // Share a project as client A
1817        fs.insert_tree(
1818            "/dir",
1819            json!({
1820                ".zed.toml": r#"collaborators = ["user_b"]"#,
1821                "a.txt": "a-contents",
1822            }),
1823        )
1824        .await;
1825        let project_a = cx_a.update(|cx| {
1826            Project::local(
1827                client_a.clone(),
1828                client_a.user_store.clone(),
1829                lang_registry.clone(),
1830                fs.clone(),
1831                cx,
1832            )
1833        });
1834        let (worktree_a, _) = project_a
1835            .update(&mut cx_a, |p, cx| {
1836                p.find_or_create_local_worktree("/dir", false, cx)
1837            })
1838            .await
1839            .unwrap();
1840        worktree_a
1841            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1842            .await;
1843        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1844        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1845        project_a
1846            .update(&mut cx_a, |p, cx| p.share(cx))
1847            .await
1848            .unwrap();
1849
1850        // Join that project as client B
1851        let project_b = Project::remote(
1852            project_id,
1853            client_b.clone(),
1854            client_b.user_store.clone(),
1855            lang_registry.clone(),
1856            fs.clone(),
1857            &mut cx_b.to_async(),
1858        )
1859        .await
1860        .unwrap();
1861
1862        // See that a guest has joined as client A.
1863        project_a
1864            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1865            .await;
1866
1867        // Begin opening a buffer as client B, but leave the project before the open completes.
1868        let buffer_b = cx_b
1869            .background()
1870            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1871        cx_b.update(|_| drop(project_b));
1872        drop(buffer_b);
1873
1874        // See that the guest has left.
1875        project_a
1876            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1877            .await;
1878    }
1879
1880    #[gpui::test(iterations = 10)]
1881    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1882        cx_a.foreground().forbid_parking();
1883        let lang_registry = Arc::new(LanguageRegistry::new());
1884        let fs = Arc::new(FakeFs::new(cx_a.background()));
1885
1886        // Connect to a server as 2 clients.
1887        let mut server = TestServer::start(cx_a.foreground()).await;
1888        let client_a = server.create_client(&mut cx_a, "user_a").await;
1889        let client_b = server.create_client(&mut cx_b, "user_b").await;
1890
1891        // Share a project as client A
1892        fs.insert_tree(
1893            "/a",
1894            json!({
1895                ".zed.toml": r#"collaborators = ["user_b"]"#,
1896                "a.txt": "a-contents",
1897                "b.txt": "b-contents",
1898            }),
1899        )
1900        .await;
1901        let project_a = cx_a.update(|cx| {
1902            Project::local(
1903                client_a.clone(),
1904                client_a.user_store.clone(),
1905                lang_registry.clone(),
1906                fs.clone(),
1907                cx,
1908            )
1909        });
1910        let (worktree_a, _) = project_a
1911            .update(&mut cx_a, |p, cx| {
1912                p.find_or_create_local_worktree("/a", false, cx)
1913            })
1914            .await
1915            .unwrap();
1916        worktree_a
1917            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1918            .await;
1919        let project_id = project_a
1920            .update(&mut cx_a, |project, _| project.next_remote_id())
1921            .await;
1922        project_a
1923            .update(&mut cx_a, |project, cx| project.share(cx))
1924            .await
1925            .unwrap();
1926
1927        // Join that project as client B
1928        let _project_b = Project::remote(
1929            project_id,
1930            client_b.clone(),
1931            client_b.user_store.clone(),
1932            lang_registry.clone(),
1933            fs.clone(),
1934            &mut cx_b.to_async(),
1935        )
1936        .await
1937        .unwrap();
1938
1939        // See that a guest has joined as client A.
1940        project_a
1941            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1942            .await;
1943
1944        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1945        client_b.disconnect(&cx_b.to_async()).unwrap();
1946        project_a
1947            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1948            .await;
1949    }
1950
1951    #[gpui::test(iterations = 10)]
1952    async fn test_collaborating_with_diagnostics(
1953        mut cx_a: TestAppContext,
1954        mut cx_b: TestAppContext,
1955    ) {
1956        cx_a.foreground().forbid_parking();
1957        let mut lang_registry = Arc::new(LanguageRegistry::new());
1958        let fs = Arc::new(FakeFs::new(cx_a.background()));
1959
1960        // Set up a fake language server.
1961        let (language_server_config, mut fake_language_server) =
1962            LanguageServerConfig::fake(&cx_a).await;
1963        Arc::get_mut(&mut lang_registry)
1964            .unwrap()
1965            .add(Arc::new(Language::new(
1966                LanguageConfig {
1967                    name: "Rust".to_string(),
1968                    path_suffixes: vec!["rs".to_string()],
1969                    language_server: Some(language_server_config),
1970                    ..Default::default()
1971                },
1972                Some(tree_sitter_rust::language()),
1973            )));
1974
1975        // Connect to a server as 2 clients.
1976        let mut server = TestServer::start(cx_a.foreground()).await;
1977        let client_a = server.create_client(&mut cx_a, "user_a").await;
1978        let client_b = server.create_client(&mut cx_b, "user_b").await;
1979
1980        // Share a project as client A
1981        fs.insert_tree(
1982            "/a",
1983            json!({
1984                ".zed.toml": r#"collaborators = ["user_b"]"#,
1985                "a.rs": "let one = two",
1986                "other.rs": "",
1987            }),
1988        )
1989        .await;
1990        let project_a = cx_a.update(|cx| {
1991            Project::local(
1992                client_a.clone(),
1993                client_a.user_store.clone(),
1994                lang_registry.clone(),
1995                fs.clone(),
1996                cx,
1997            )
1998        });
1999        let (worktree_a, _) = project_a
2000            .update(&mut cx_a, |p, cx| {
2001                p.find_or_create_local_worktree("/a", false, cx)
2002            })
2003            .await
2004            .unwrap();
2005        worktree_a
2006            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2007            .await;
2008        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2009        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2010        project_a
2011            .update(&mut cx_a, |p, cx| p.share(cx))
2012            .await
2013            .unwrap();
2014
2015        // Cause the language server to start.
2016        let _ = cx_a
2017            .background()
2018            .spawn(project_a.update(&mut cx_a, |project, cx| {
2019                project.open_buffer(
2020                    ProjectPath {
2021                        worktree_id,
2022                        path: Path::new("other.rs").into(),
2023                    },
2024                    cx,
2025                )
2026            }))
2027            .await
2028            .unwrap();
2029
2030        // Simulate a language server reporting errors for a file.
2031        fake_language_server
2032            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2033                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2034                version: None,
2035                diagnostics: vec![lsp::Diagnostic {
2036                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2037                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2038                    message: "message 1".to_string(),
2039                    ..Default::default()
2040                }],
2041            })
2042            .await;
2043
2044        // Wait for server to see the diagnostics update.
2045        server
2046            .condition(|store| {
2047                let worktree = store
2048                    .project(project_id)
2049                    .unwrap()
2050                    .worktrees
2051                    .get(&worktree_id.to_proto())
2052                    .unwrap();
2053
2054                !worktree
2055                    .share
2056                    .as_ref()
2057                    .unwrap()
2058                    .diagnostic_summaries
2059                    .is_empty()
2060            })
2061            .await;
2062
2063        // Join the worktree as client B.
2064        let project_b = Project::remote(
2065            project_id,
2066            client_b.clone(),
2067            client_b.user_store.clone(),
2068            lang_registry.clone(),
2069            fs.clone(),
2070            &mut cx_b.to_async(),
2071        )
2072        .await
2073        .unwrap();
2074
2075        project_b.read_with(&cx_b, |project, cx| {
2076            assert_eq!(
2077                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2078                &[(
2079                    ProjectPath {
2080                        worktree_id,
2081                        path: Arc::from(Path::new("a.rs")),
2082                    },
2083                    DiagnosticSummary {
2084                        error_count: 1,
2085                        warning_count: 0,
2086                        ..Default::default()
2087                    },
2088                )]
2089            )
2090        });
2091
2092        // Simulate a language server reporting more errors for a file.
2093        fake_language_server
2094            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2095                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2096                version: None,
2097                diagnostics: vec![
2098                    lsp::Diagnostic {
2099                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2100                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2101                        message: "message 1".to_string(),
2102                        ..Default::default()
2103                    },
2104                    lsp::Diagnostic {
2105                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2106                        range: lsp::Range::new(
2107                            lsp::Position::new(0, 10),
2108                            lsp::Position::new(0, 13),
2109                        ),
2110                        message: "message 2".to_string(),
2111                        ..Default::default()
2112                    },
2113                ],
2114            })
2115            .await;
2116
2117        // Client b gets the updated summaries
2118        project_b
2119            .condition(&cx_b, |project, cx| {
2120                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2121                    == &[(
2122                        ProjectPath {
2123                            worktree_id,
2124                            path: Arc::from(Path::new("a.rs")),
2125                        },
2126                        DiagnosticSummary {
2127                            error_count: 1,
2128                            warning_count: 1,
2129                            ..Default::default()
2130                        },
2131                    )]
2132            })
2133            .await;
2134
2135        // Open the file with the errors on client B. They should be present.
2136        let buffer_b = cx_b
2137            .background()
2138            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2139            .await
2140            .unwrap();
2141
2142        buffer_b.read_with(&cx_b, |buffer, _| {
2143            assert_eq!(
2144                buffer
2145                    .snapshot()
2146                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2147                    .map(|entry| entry)
2148                    .collect::<Vec<_>>(),
2149                &[
2150                    DiagnosticEntry {
2151                        range: Point::new(0, 4)..Point::new(0, 7),
2152                        diagnostic: Diagnostic {
2153                            group_id: 0,
2154                            message: "message 1".to_string(),
2155                            severity: lsp::DiagnosticSeverity::ERROR,
2156                            is_primary: true,
2157                            ..Default::default()
2158                        }
2159                    },
2160                    DiagnosticEntry {
2161                        range: Point::new(0, 10)..Point::new(0, 13),
2162                        diagnostic: Diagnostic {
2163                            group_id: 1,
2164                            severity: lsp::DiagnosticSeverity::WARNING,
2165                            message: "message 2".to_string(),
2166                            is_primary: true,
2167                            ..Default::default()
2168                        }
2169                    }
2170                ]
2171            );
2172        });
2173    }
2174
2175    #[gpui::test(iterations = 10)]
2176    async fn test_collaborating_with_completion(
2177        mut cx_a: TestAppContext,
2178        mut cx_b: TestAppContext,
2179    ) {
2180        cx_a.foreground().forbid_parking();
2181        let mut lang_registry = Arc::new(LanguageRegistry::new());
2182        let fs = Arc::new(FakeFs::new(cx_a.background()));
2183
2184        // Set up a fake language server.
2185        let (language_server_config, mut fake_language_server) =
2186            LanguageServerConfig::fake_with_capabilities(
2187                lsp::ServerCapabilities {
2188                    completion_provider: Some(lsp::CompletionOptions {
2189                        trigger_characters: Some(vec![".".to_string()]),
2190                        ..Default::default()
2191                    }),
2192                    ..Default::default()
2193                },
2194                &cx_a,
2195            )
2196            .await;
2197        Arc::get_mut(&mut lang_registry)
2198            .unwrap()
2199            .add(Arc::new(Language::new(
2200                LanguageConfig {
2201                    name: "Rust".to_string(),
2202                    path_suffixes: vec!["rs".to_string()],
2203                    language_server: Some(language_server_config),
2204                    ..Default::default()
2205                },
2206                Some(tree_sitter_rust::language()),
2207            )));
2208
2209        // Connect to a server as 2 clients.
2210        let mut server = TestServer::start(cx_a.foreground()).await;
2211        let client_a = server.create_client(&mut cx_a, "user_a").await;
2212        let client_b = server.create_client(&mut cx_b, "user_b").await;
2213
2214        // Share a project as client A
2215        fs.insert_tree(
2216            "/a",
2217            json!({
2218                ".zed.toml": r#"collaborators = ["user_b"]"#,
2219                "main.rs": "fn main() { a }",
2220                "other.rs": "",
2221            }),
2222        )
2223        .await;
2224        let project_a = cx_a.update(|cx| {
2225            Project::local(
2226                client_a.clone(),
2227                client_a.user_store.clone(),
2228                lang_registry.clone(),
2229                fs.clone(),
2230                cx,
2231            )
2232        });
2233        let (worktree_a, _) = project_a
2234            .update(&mut cx_a, |p, cx| {
2235                p.find_or_create_local_worktree("/a", false, cx)
2236            })
2237            .await
2238            .unwrap();
2239        worktree_a
2240            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2241            .await;
2242        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2243        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2244        project_a
2245            .update(&mut cx_a, |p, cx| p.share(cx))
2246            .await
2247            .unwrap();
2248
2249        // Join the worktree as client B.
2250        let project_b = Project::remote(
2251            project_id,
2252            client_b.clone(),
2253            client_b.user_store.clone(),
2254            lang_registry.clone(),
2255            fs.clone(),
2256            &mut cx_b.to_async(),
2257        )
2258        .await
2259        .unwrap();
2260
2261        // Open a file in an editor as the guest.
2262        let buffer_b = project_b
2263            .update(&mut cx_b, |p, cx| {
2264                p.open_buffer((worktree_id, "main.rs"), cx)
2265            })
2266            .await
2267            .unwrap();
2268        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2269        let editor_b = cx_b.add_view(window_b, |cx| {
2270            Editor::for_buffer(
2271                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2272                Arc::new(|cx| EditorSettings::test(cx)),
2273                Some(project_b.clone()),
2274                cx,
2275            )
2276        });
2277
2278        // Type a completion trigger character as the guest.
2279        editor_b.update(&mut cx_b, |editor, cx| {
2280            editor.select_ranges([13..13], None, cx);
2281            editor.handle_input(&Input(".".into()), cx);
2282            cx.focus(&editor_b);
2283        });
2284
2285        // Receive a completion request as the host's language server.
2286        // Return some completions from the host's language server.
2287        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2288            assert_eq!(
2289                params.text_document_position.text_document.uri,
2290                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2291            );
2292            assert_eq!(
2293                params.text_document_position.position,
2294                lsp::Position::new(0, 14),
2295            );
2296
2297            Some(lsp::CompletionResponse::Array(vec![
2298                lsp::CompletionItem {
2299                    label: "first_method(…)".into(),
2300                    detail: Some("fn(&mut self, B) -> C".into()),
2301                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2302                        new_text: "first_method($1)".to_string(),
2303                        range: lsp::Range::new(
2304                            lsp::Position::new(0, 14),
2305                            lsp::Position::new(0, 14),
2306                        ),
2307                    })),
2308                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2309                    ..Default::default()
2310                },
2311                lsp::CompletionItem {
2312                    label: "second_method(…)".into(),
2313                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2314                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2315                        new_text: "second_method()".to_string(),
2316                        range: lsp::Range::new(
2317                            lsp::Position::new(0, 14),
2318                            lsp::Position::new(0, 14),
2319                        ),
2320                    })),
2321                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2322                    ..Default::default()
2323                },
2324            ]))
2325        });
2326
2327        // Open the buffer on the host.
2328        let buffer_a = project_a
2329            .update(&mut cx_a, |p, cx| {
2330                p.open_buffer((worktree_id, "main.rs"), cx)
2331            })
2332            .await
2333            .unwrap();
2334        buffer_a
2335            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2336            .await;
2337
2338        // Confirm a completion on the guest.
2339        editor_b
2340            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2341            .await;
2342        editor_b.update(&mut cx_b, |editor, cx| {
2343            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2344            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2345        });
2346
2347        // Return a resolved completion from the host's language server.
2348        // The resolved completion has an additional text edit.
2349        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2350            assert_eq!(params.label, "first_method(…)");
2351            lsp::CompletionItem {
2352                label: "first_method(…)".into(),
2353                detail: Some("fn(&mut self, B) -> C".into()),
2354                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2355                    new_text: "first_method($1)".to_string(),
2356                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2357                })),
2358                additional_text_edits: Some(vec![lsp::TextEdit {
2359                    new_text: "use d::SomeTrait;\n".to_string(),
2360                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2361                }]),
2362                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2363                ..Default::default()
2364            }
2365        });
2366
2367        // The additional edit is applied.
2368        buffer_a
2369            .condition(&cx_a, |buffer, _| {
2370                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2371            })
2372            .await;
2373        buffer_b
2374            .condition(&cx_b, |buffer, _| {
2375                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2376            })
2377            .await;
2378    }
2379
2380    #[gpui::test(iterations = 10)]
2381    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2382        cx_a.foreground().forbid_parking();
2383        let mut lang_registry = Arc::new(LanguageRegistry::new());
2384        let fs = Arc::new(FakeFs::new(cx_a.background()));
2385
2386        // Set up a fake language server.
2387        let (language_server_config, mut fake_language_server) =
2388            LanguageServerConfig::fake(&cx_a).await;
2389        Arc::get_mut(&mut lang_registry)
2390            .unwrap()
2391            .add(Arc::new(Language::new(
2392                LanguageConfig {
2393                    name: "Rust".to_string(),
2394                    path_suffixes: vec!["rs".to_string()],
2395                    language_server: Some(language_server_config),
2396                    ..Default::default()
2397                },
2398                Some(tree_sitter_rust::language()),
2399            )));
2400
2401        // Connect to a server as 2 clients.
2402        let mut server = TestServer::start(cx_a.foreground()).await;
2403        let client_a = server.create_client(&mut cx_a, "user_a").await;
2404        let client_b = server.create_client(&mut cx_b, "user_b").await;
2405
2406        // Share a project as client A
2407        fs.insert_tree(
2408            "/a",
2409            json!({
2410                ".zed.toml": r#"collaborators = ["user_b"]"#,
2411                "a.rs": "let one = two",
2412            }),
2413        )
2414        .await;
2415        let project_a = cx_a.update(|cx| {
2416            Project::local(
2417                client_a.clone(),
2418                client_a.user_store.clone(),
2419                lang_registry.clone(),
2420                fs.clone(),
2421                cx,
2422            )
2423        });
2424        let (worktree_a, _) = project_a
2425            .update(&mut cx_a, |p, cx| {
2426                p.find_or_create_local_worktree("/a", false, cx)
2427            })
2428            .await
2429            .unwrap();
2430        worktree_a
2431            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2432            .await;
2433        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2434        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2435        project_a
2436            .update(&mut cx_a, |p, cx| p.share(cx))
2437            .await
2438            .unwrap();
2439
2440        // Join the worktree as client B.
2441        let project_b = Project::remote(
2442            project_id,
2443            client_b.clone(),
2444            client_b.user_store.clone(),
2445            lang_registry.clone(),
2446            fs.clone(),
2447            &mut cx_b.to_async(),
2448        )
2449        .await
2450        .unwrap();
2451
2452        let buffer_b = cx_b
2453            .background()
2454            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2455            .await
2456            .unwrap();
2457
2458        let format = project_b.update(&mut cx_b, |project, cx| {
2459            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2460        });
2461
2462        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2463            Some(vec![
2464                lsp::TextEdit {
2465                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2466                    new_text: "h".to_string(),
2467                },
2468                lsp::TextEdit {
2469                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2470                    new_text: "y".to_string(),
2471                },
2472            ])
2473        });
2474
2475        format.await.unwrap();
2476        assert_eq!(
2477            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2478            "let honey = two"
2479        );
2480    }
2481
2482    #[gpui::test(iterations = 10)]
2483    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2484        cx_a.foreground().forbid_parking();
2485        let mut lang_registry = Arc::new(LanguageRegistry::new());
2486        let fs = Arc::new(FakeFs::new(cx_a.background()));
2487        fs.insert_tree(
2488            "/root-1",
2489            json!({
2490                ".zed.toml": r#"collaborators = ["user_b"]"#,
2491                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2492            }),
2493        )
2494        .await;
2495        fs.insert_tree(
2496            "/root-2",
2497            json!({
2498                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2499            }),
2500        )
2501        .await;
2502
2503        // Set up a fake language server.
2504        let (language_server_config, mut fake_language_server) =
2505            LanguageServerConfig::fake(&cx_a).await;
2506        Arc::get_mut(&mut lang_registry)
2507            .unwrap()
2508            .add(Arc::new(Language::new(
2509                LanguageConfig {
2510                    name: "Rust".to_string(),
2511                    path_suffixes: vec!["rs".to_string()],
2512                    language_server: Some(language_server_config),
2513                    ..Default::default()
2514                },
2515                Some(tree_sitter_rust::language()),
2516            )));
2517
2518        // Connect to a server as 2 clients.
2519        let mut server = TestServer::start(cx_a.foreground()).await;
2520        let client_a = server.create_client(&mut cx_a, "user_a").await;
2521        let client_b = server.create_client(&mut cx_b, "user_b").await;
2522
2523        // Share a project as client A
2524        let project_a = cx_a.update(|cx| {
2525            Project::local(
2526                client_a.clone(),
2527                client_a.user_store.clone(),
2528                lang_registry.clone(),
2529                fs.clone(),
2530                cx,
2531            )
2532        });
2533        let (worktree_a, _) = project_a
2534            .update(&mut cx_a, |p, cx| {
2535                p.find_or_create_local_worktree("/root-1", false, cx)
2536            })
2537            .await
2538            .unwrap();
2539        worktree_a
2540            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2541            .await;
2542        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2543        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2544        project_a
2545            .update(&mut cx_a, |p, cx| p.share(cx))
2546            .await
2547            .unwrap();
2548
2549        // Join the worktree as client B.
2550        let project_b = Project::remote(
2551            project_id,
2552            client_b.clone(),
2553            client_b.user_store.clone(),
2554            lang_registry.clone(),
2555            fs.clone(),
2556            &mut cx_b.to_async(),
2557        )
2558        .await
2559        .unwrap();
2560
2561        // Open the file on client B.
2562        let buffer_b = cx_b
2563            .background()
2564            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2565            .await
2566            .unwrap();
2567
2568        // Request the definition of a symbol as the guest.
2569        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2570        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2571            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2572                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2573                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2574            )))
2575        });
2576
2577        let definitions_1 = definitions_1.await.unwrap();
2578        cx_b.read(|cx| {
2579            assert_eq!(definitions_1.len(), 1);
2580            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2581            let target_buffer = definitions_1[0].target_buffer.read(cx);
2582            assert_eq!(
2583                target_buffer.text(),
2584                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2585            );
2586            assert_eq!(
2587                definitions_1[0].target_range.to_point(target_buffer),
2588                Point::new(0, 6)..Point::new(0, 9)
2589            );
2590        });
2591
2592        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2593        // the previous call to `definition`.
2594        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2595        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2596            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2597                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2598                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2599            )))
2600        });
2601
2602        let definitions_2 = definitions_2.await.unwrap();
2603        cx_b.read(|cx| {
2604            assert_eq!(definitions_2.len(), 1);
2605            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2606            let target_buffer = definitions_2[0].target_buffer.read(cx);
2607            assert_eq!(
2608                target_buffer.text(),
2609                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2610            );
2611            assert_eq!(
2612                definitions_2[0].target_range.to_point(target_buffer),
2613                Point::new(1, 6)..Point::new(1, 11)
2614            );
2615        });
2616        assert_eq!(
2617            definitions_1[0].target_buffer,
2618            definitions_2[0].target_buffer
2619        );
2620
2621        cx_b.update(|_| {
2622            drop(definitions_1);
2623            drop(definitions_2);
2624        });
2625        project_b
2626            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2627            .await;
2628    }
2629
2630    #[gpui::test(iterations = 10)]
2631    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2632        mut cx_a: TestAppContext,
2633        mut cx_b: TestAppContext,
2634        mut rng: StdRng,
2635    ) {
2636        cx_a.foreground().forbid_parking();
2637        let mut lang_registry = Arc::new(LanguageRegistry::new());
2638        let fs = Arc::new(FakeFs::new(cx_a.background()));
2639        fs.insert_tree(
2640            "/root",
2641            json!({
2642                ".zed.toml": r#"collaborators = ["user_b"]"#,
2643                "a.rs": "const ONE: usize = b::TWO;",
2644                "b.rs": "const TWO: usize = 2",
2645            }),
2646        )
2647        .await;
2648
2649        // Set up a fake language server.
2650        let (language_server_config, mut fake_language_server) =
2651            LanguageServerConfig::fake(&cx_a).await;
2652
2653        Arc::get_mut(&mut lang_registry)
2654            .unwrap()
2655            .add(Arc::new(Language::new(
2656                LanguageConfig {
2657                    name: "Rust".to_string(),
2658                    path_suffixes: vec!["rs".to_string()],
2659                    language_server: Some(language_server_config),
2660                    ..Default::default()
2661                },
2662                Some(tree_sitter_rust::language()),
2663            )));
2664
2665        // Connect to a server as 2 clients.
2666        let mut server = TestServer::start(cx_a.foreground()).await;
2667        let client_a = server.create_client(&mut cx_a, "user_a").await;
2668        let client_b = server.create_client(&mut cx_b, "user_b").await;
2669
2670        // Share a project as client A
2671        let project_a = cx_a.update(|cx| {
2672            Project::local(
2673                client_a.clone(),
2674                client_a.user_store.clone(),
2675                lang_registry.clone(),
2676                fs.clone(),
2677                cx,
2678            )
2679        });
2680
2681        let (worktree_a, _) = project_a
2682            .update(&mut cx_a, |p, cx| {
2683                p.find_or_create_local_worktree("/root", false, cx)
2684            })
2685            .await
2686            .unwrap();
2687        worktree_a
2688            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2689            .await;
2690        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2691        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2692        project_a
2693            .update(&mut cx_a, |p, cx| p.share(cx))
2694            .await
2695            .unwrap();
2696
2697        // Join the worktree as client B.
2698        let project_b = Project::remote(
2699            project_id,
2700            client_b.clone(),
2701            client_b.user_store.clone(),
2702            lang_registry.clone(),
2703            fs.clone(),
2704            &mut cx_b.to_async(),
2705        )
2706        .await
2707        .unwrap();
2708
2709        let buffer_b1 = cx_b
2710            .background()
2711            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2712            .await
2713            .unwrap();
2714
2715        let definitions;
2716        let buffer_b2;
2717        if rng.gen() {
2718            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2719            buffer_b2 =
2720                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2721        } else {
2722            buffer_b2 =
2723                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2724            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2725        }
2726
2727        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2728            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2729                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2730                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2731            )))
2732        });
2733
2734        let buffer_b2 = buffer_b2.await.unwrap();
2735        let definitions = definitions.await.unwrap();
2736        assert_eq!(definitions.len(), 1);
2737        assert_eq!(definitions[0].target_buffer, buffer_b2);
2738    }
2739
2740    #[gpui::test(iterations = 10)]
2741    async fn test_collaborating_with_code_actions(
2742        mut cx_a: TestAppContext,
2743        mut cx_b: TestAppContext,
2744    ) {
2745        cx_a.foreground().forbid_parking();
2746        let mut lang_registry = Arc::new(LanguageRegistry::new());
2747        let fs = Arc::new(FakeFs::new(cx_a.background()));
2748        let mut path_openers_b = Vec::new();
2749        cx_b.update(|cx| editor::init(cx, &mut path_openers_b));
2750
2751        // Set up a fake language server.
2752        let (language_server_config, mut fake_language_server) =
2753            LanguageServerConfig::fake_with_capabilities(
2754                lsp::ServerCapabilities {
2755                    ..Default::default()
2756                },
2757                &cx_a,
2758            )
2759            .await;
2760        Arc::get_mut(&mut lang_registry)
2761            .unwrap()
2762            .add(Arc::new(Language::new(
2763                LanguageConfig {
2764                    name: "Rust".to_string(),
2765                    path_suffixes: vec!["rs".to_string()],
2766                    language_server: Some(language_server_config),
2767                    ..Default::default()
2768                },
2769                Some(tree_sitter_rust::language()),
2770            )));
2771
2772        // Connect to a server as 2 clients.
2773        let mut server = TestServer::start(cx_a.foreground()).await;
2774        let client_a = server.create_client(&mut cx_a, "user_a").await;
2775        let client_b = server.create_client(&mut cx_b, "user_b").await;
2776
2777        // Share a project as client A
2778        fs.insert_tree(
2779            "/a",
2780            json!({
2781                ".zed.toml": r#"collaborators = ["user_b"]"#,
2782                "main.rs": "mod other;\nfn main() { let foo = other::foo(); }",
2783                "other.rs": "pub fn foo() -> usize { 4 }",
2784            }),
2785        )
2786        .await;
2787        let project_a = cx_a.update(|cx| {
2788            Project::local(
2789                client_a.clone(),
2790                client_a.user_store.clone(),
2791                lang_registry.clone(),
2792                fs.clone(),
2793                cx,
2794            )
2795        });
2796        let (worktree_a, _) = project_a
2797            .update(&mut cx_a, |p, cx| {
2798                p.find_or_create_local_worktree("/a", false, cx)
2799            })
2800            .await
2801            .unwrap();
2802        worktree_a
2803            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2804            .await;
2805        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2806        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2807        project_a
2808            .update(&mut cx_a, |p, cx| p.share(cx))
2809            .await
2810            .unwrap();
2811
2812        // Join the worktree as client B.
2813        let project_b = Project::remote(
2814            project_id,
2815            client_b.clone(),
2816            client_b.user_store.clone(),
2817            lang_registry.clone(),
2818            fs.clone(),
2819            &mut cx_b.to_async(),
2820        )
2821        .await
2822        .unwrap();
2823        let mut params = cx_b.update(WorkspaceParams::test);
2824        params.languages = lang_registry.clone();
2825        params.client = client_b.client.clone();
2826        params.user_store = client_b.user_store.clone();
2827        params.project = project_b;
2828        params.path_openers = path_openers_b.into();
2829
2830        let (_window_b, workspace_b) = cx_b.add_window(|cx| Workspace::new(&params, cx));
2831        let editor_b = workspace_b
2832            .update(&mut cx_b, |workspace, cx| {
2833                workspace.open_path((worktree_id, "main.rs").into(), cx)
2834            })
2835            .await
2836            .unwrap()
2837            .downcast::<Editor>()
2838            .unwrap();
2839        fake_language_server
2840            .handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2841                assert_eq!(
2842                    params.text_document.uri,
2843                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2844                );
2845                assert_eq!(params.range.start, lsp::Position::new(0, 0));
2846                assert_eq!(params.range.end, lsp::Position::new(0, 0));
2847                None
2848            })
2849            .next()
2850            .await;
2851
2852        // Move cursor to a location that contains code actions.
2853        editor_b.update(&mut cx_b, |editor, cx| {
2854            editor.select_ranges([Point::new(1, 31)..Point::new(1, 31)], None, cx);
2855            cx.focus(&editor_b);
2856        });
2857        fake_language_server.handle_request::<lsp::request::CodeActionRequest, _>(|params| {
2858            assert_eq!(
2859                params.text_document.uri,
2860                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2861            );
2862            assert_eq!(params.range.start, lsp::Position::new(1, 31));
2863            assert_eq!(params.range.end, lsp::Position::new(1, 31));
2864
2865            Some(vec![lsp::CodeActionOrCommand::CodeAction(
2866                lsp::CodeAction {
2867                    title: "Inline into all callers".to_string(),
2868                    edit: Some(lsp::WorkspaceEdit {
2869                        changes: Some(
2870                            [
2871                                (
2872                                    lsp::Url::from_file_path("/a/main.rs").unwrap(),
2873                                    vec![lsp::TextEdit::new(
2874                                        lsp::Range::new(
2875                                            lsp::Position::new(1, 22),
2876                                            lsp::Position::new(1, 34),
2877                                        ),
2878                                        "4".to_string(),
2879                                    )],
2880                                ),
2881                                (
2882                                    lsp::Url::from_file_path("/a/other.rs").unwrap(),
2883                                    vec![lsp::TextEdit::new(
2884                                        lsp::Range::new(
2885                                            lsp::Position::new(0, 0),
2886                                            lsp::Position::new(0, 27),
2887                                        ),
2888                                        "".to_string(),
2889                                    )],
2890                                ),
2891                            ]
2892                            .into_iter()
2893                            .collect(),
2894                        ),
2895                        ..Default::default()
2896                    }),
2897                    data: Some(json!({
2898                        "codeActionParams": {
2899                            "range": {
2900                                "start": {"line": 1, "column": 31},
2901                                "end": {"line": 1, "column": 31},
2902                            }
2903                        }
2904                    })),
2905                    ..Default::default()
2906                },
2907            )])
2908        });
2909
2910        // Toggle code actions and wait for them to display.
2911        editor_b.update(&mut cx_b, |editor, cx| {
2912            editor.toggle_code_actions(&ToggleCodeActions(false), cx);
2913        });
2914        editor_b
2915            .condition(&cx_b, |editor, _| editor.context_menu_visible())
2916            .await;
2917
2918        // Confirming the code action will trigger a resolve request.
2919        let confirm_action = workspace_b
2920            .update(&mut cx_b, |workspace, cx| {
2921                Editor::confirm_code_action(workspace, &ConfirmCodeAction(Some(0)), cx)
2922            })
2923            .unwrap();
2924        fake_language_server.handle_request::<lsp::request::CodeActionResolveRequest, _>(|_| {
2925            lsp::CodeAction {
2926                title: "Inline into all callers".to_string(),
2927                edit: Some(lsp::WorkspaceEdit {
2928                    changes: Some(
2929                        [
2930                            (
2931                                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2932                                vec![lsp::TextEdit::new(
2933                                    lsp::Range::new(
2934                                        lsp::Position::new(1, 22),
2935                                        lsp::Position::new(1, 34),
2936                                    ),
2937                                    "4".to_string(),
2938                                )],
2939                            ),
2940                            (
2941                                lsp::Url::from_file_path("/a/other.rs").unwrap(),
2942                                vec![lsp::TextEdit::new(
2943                                    lsp::Range::new(
2944                                        lsp::Position::new(0, 0),
2945                                        lsp::Position::new(0, 27),
2946                                    ),
2947                                    "".to_string(),
2948                                )],
2949                            ),
2950                        ]
2951                        .into_iter()
2952                        .collect(),
2953                    ),
2954                    ..Default::default()
2955                }),
2956                ..Default::default()
2957            }
2958        });
2959
2960        // After the action is confirmed, an editor containing both modified files is opened.
2961        confirm_action.await.unwrap();
2962        let code_action_editor = workspace_b.read_with(&cx_b, |workspace, cx| {
2963            workspace
2964                .active_item(cx)
2965                .unwrap()
2966                .downcast::<Editor>()
2967                .unwrap()
2968        });
2969        code_action_editor.update(&mut cx_b, |editor, cx| {
2970            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2971            editor.undo(&Undo, cx);
2972            assert_eq!(
2973                editor.text(cx),
2974                "pub fn foo() -> usize { 4 }\nmod other;\nfn main() { let foo = other::foo(); }"
2975            );
2976            editor.redo(&Redo, cx);
2977            assert_eq!(editor.text(cx), "\nmod other;\nfn main() { let foo = 4; }");
2978        });
2979    }
2980
2981    #[gpui::test(iterations = 10)]
2982    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2983        cx_a.foreground().forbid_parking();
2984
2985        // Connect to a server as 2 clients.
2986        let mut server = TestServer::start(cx_a.foreground()).await;
2987        let client_a = server.create_client(&mut cx_a, "user_a").await;
2988        let client_b = server.create_client(&mut cx_b, "user_b").await;
2989
2990        // Create an org that includes these 2 users.
2991        let db = &server.app_state.db;
2992        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2993        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2994            .await
2995            .unwrap();
2996        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2997            .await
2998            .unwrap();
2999
3000        // Create a channel that includes all the users.
3001        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3002        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3003            .await
3004            .unwrap();
3005        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3006            .await
3007            .unwrap();
3008        db.create_channel_message(
3009            channel_id,
3010            client_b.current_user_id(&cx_b),
3011            "hello A, it's B.",
3012            OffsetDateTime::now_utc(),
3013            1,
3014        )
3015        .await
3016        .unwrap();
3017
3018        let channels_a = cx_a
3019            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3020        channels_a
3021            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3022            .await;
3023        channels_a.read_with(&cx_a, |list, _| {
3024            assert_eq!(
3025                list.available_channels().unwrap(),
3026                &[ChannelDetails {
3027                    id: channel_id.to_proto(),
3028                    name: "test-channel".to_string()
3029                }]
3030            )
3031        });
3032        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3033            this.get_channel(channel_id.to_proto(), cx).unwrap()
3034        });
3035        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3036        channel_a
3037            .condition(&cx_a, |channel, _| {
3038                channel_messages(channel)
3039                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3040            })
3041            .await;
3042
3043        let channels_b = cx_b
3044            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3045        channels_b
3046            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3047            .await;
3048        channels_b.read_with(&cx_b, |list, _| {
3049            assert_eq!(
3050                list.available_channels().unwrap(),
3051                &[ChannelDetails {
3052                    id: channel_id.to_proto(),
3053                    name: "test-channel".to_string()
3054                }]
3055            )
3056        });
3057
3058        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3059            this.get_channel(channel_id.to_proto(), cx).unwrap()
3060        });
3061        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3062        channel_b
3063            .condition(&cx_b, |channel, _| {
3064                channel_messages(channel)
3065                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3066            })
3067            .await;
3068
3069        channel_a
3070            .update(&mut cx_a, |channel, cx| {
3071                channel
3072                    .send_message("oh, hi B.".to_string(), cx)
3073                    .unwrap()
3074                    .detach();
3075                let task = channel.send_message("sup".to_string(), cx).unwrap();
3076                assert_eq!(
3077                    channel_messages(channel),
3078                    &[
3079                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3080                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3081                        ("user_a".to_string(), "sup".to_string(), true)
3082                    ]
3083                );
3084                task
3085            })
3086            .await
3087            .unwrap();
3088
3089        channel_b
3090            .condition(&cx_b, |channel, _| {
3091                channel_messages(channel)
3092                    == [
3093                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3094                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3095                        ("user_a".to_string(), "sup".to_string(), false),
3096                    ]
3097            })
3098            .await;
3099
3100        assert_eq!(
3101            server
3102                .state()
3103                .await
3104                .channel(channel_id)
3105                .unwrap()
3106                .connection_ids
3107                .len(),
3108            2
3109        );
3110        cx_b.update(|_| drop(channel_b));
3111        server
3112            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3113            .await;
3114
3115        cx_a.update(|_| drop(channel_a));
3116        server
3117            .condition(|state| state.channel(channel_id).is_none())
3118            .await;
3119    }
3120
3121    #[gpui::test(iterations = 10)]
3122    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3123        cx_a.foreground().forbid_parking();
3124
3125        let mut server = TestServer::start(cx_a.foreground()).await;
3126        let client_a = server.create_client(&mut cx_a, "user_a").await;
3127
3128        let db = &server.app_state.db;
3129        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3130        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3131        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3132            .await
3133            .unwrap();
3134        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3135            .await
3136            .unwrap();
3137
3138        let channels_a = cx_a
3139            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3140        channels_a
3141            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3142            .await;
3143        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3144            this.get_channel(channel_id.to_proto(), cx).unwrap()
3145        });
3146
3147        // Messages aren't allowed to be too long.
3148        channel_a
3149            .update(&mut cx_a, |channel, cx| {
3150                let long_body = "this is long.\n".repeat(1024);
3151                channel.send_message(long_body, cx).unwrap()
3152            })
3153            .await
3154            .unwrap_err();
3155
3156        // Messages aren't allowed to be blank.
3157        channel_a.update(&mut cx_a, |channel, cx| {
3158            channel.send_message(String::new(), cx).unwrap_err()
3159        });
3160
3161        // Leading and trailing whitespace are trimmed.
3162        channel_a
3163            .update(&mut cx_a, |channel, cx| {
3164                channel
3165                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3166                    .unwrap()
3167            })
3168            .await
3169            .unwrap();
3170        assert_eq!(
3171            db.get_channel_messages(channel_id, 10, None)
3172                .await
3173                .unwrap()
3174                .iter()
3175                .map(|m| &m.body)
3176                .collect::<Vec<_>>(),
3177            &["surrounded by whitespace"]
3178        );
3179    }
3180
3181    #[gpui::test(iterations = 10)]
3182    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3183        cx_a.foreground().forbid_parking();
3184
3185        // Connect to a server as 2 clients.
3186        let mut server = TestServer::start(cx_a.foreground()).await;
3187        let client_a = server.create_client(&mut cx_a, "user_a").await;
3188        let client_b = server.create_client(&mut cx_b, "user_b").await;
3189        let mut status_b = client_b.status();
3190
3191        // Create an org that includes these 2 users.
3192        let db = &server.app_state.db;
3193        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3194        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3195            .await
3196            .unwrap();
3197        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3198            .await
3199            .unwrap();
3200
3201        // Create a channel that includes all the users.
3202        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3203        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3204            .await
3205            .unwrap();
3206        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3207            .await
3208            .unwrap();
3209        db.create_channel_message(
3210            channel_id,
3211            client_b.current_user_id(&cx_b),
3212            "hello A, it's B.",
3213            OffsetDateTime::now_utc(),
3214            2,
3215        )
3216        .await
3217        .unwrap();
3218
3219        let channels_a = cx_a
3220            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3221        channels_a
3222            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3223            .await;
3224
3225        channels_a.read_with(&cx_a, |list, _| {
3226            assert_eq!(
3227                list.available_channels().unwrap(),
3228                &[ChannelDetails {
3229                    id: channel_id.to_proto(),
3230                    name: "test-channel".to_string()
3231                }]
3232            )
3233        });
3234        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3235            this.get_channel(channel_id.to_proto(), cx).unwrap()
3236        });
3237        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3238        channel_a
3239            .condition(&cx_a, |channel, _| {
3240                channel_messages(channel)
3241                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3242            })
3243            .await;
3244
3245        let channels_b = cx_b
3246            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3247        channels_b
3248            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3249            .await;
3250        channels_b.read_with(&cx_b, |list, _| {
3251            assert_eq!(
3252                list.available_channels().unwrap(),
3253                &[ChannelDetails {
3254                    id: channel_id.to_proto(),
3255                    name: "test-channel".to_string()
3256                }]
3257            )
3258        });
3259
3260        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3261            this.get_channel(channel_id.to_proto(), cx).unwrap()
3262        });
3263        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3264        channel_b
3265            .condition(&cx_b, |channel, _| {
3266                channel_messages(channel)
3267                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3268            })
3269            .await;
3270
3271        // Disconnect client B, ensuring we can still access its cached channel data.
3272        server.forbid_connections();
3273        server.disconnect_client(client_b.current_user_id(&cx_b));
3274        while !matches!(
3275            status_b.next().await,
3276            Some(client::Status::ReconnectionError { .. })
3277        ) {}
3278
3279        channels_b.read_with(&cx_b, |channels, _| {
3280            assert_eq!(
3281                channels.available_channels().unwrap(),
3282                [ChannelDetails {
3283                    id: channel_id.to_proto(),
3284                    name: "test-channel".to_string()
3285                }]
3286            )
3287        });
3288        channel_b.read_with(&cx_b, |channel, _| {
3289            assert_eq!(
3290                channel_messages(channel),
3291                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3292            )
3293        });
3294
3295        // Send a message from client B while it is disconnected.
3296        channel_b
3297            .update(&mut cx_b, |channel, cx| {
3298                let task = channel
3299                    .send_message("can you see this?".to_string(), cx)
3300                    .unwrap();
3301                assert_eq!(
3302                    channel_messages(channel),
3303                    &[
3304                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3305                        ("user_b".to_string(), "can you see this?".to_string(), true)
3306                    ]
3307                );
3308                task
3309            })
3310            .await
3311            .unwrap_err();
3312
3313        // Send a message from client A while B is disconnected.
3314        channel_a
3315            .update(&mut cx_a, |channel, cx| {
3316                channel
3317                    .send_message("oh, hi B.".to_string(), cx)
3318                    .unwrap()
3319                    .detach();
3320                let task = channel.send_message("sup".to_string(), cx).unwrap();
3321                assert_eq!(
3322                    channel_messages(channel),
3323                    &[
3324                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3325                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3326                        ("user_a".to_string(), "sup".to_string(), true)
3327                    ]
3328                );
3329                task
3330            })
3331            .await
3332            .unwrap();
3333
3334        // Give client B a chance to reconnect.
3335        server.allow_connections();
3336        cx_b.foreground().advance_clock(Duration::from_secs(10));
3337
3338        // Verify that B sees the new messages upon reconnection, as well as the message client B
3339        // sent while offline.
3340        channel_b
3341            .condition(&cx_b, |channel, _| {
3342                channel_messages(channel)
3343                    == [
3344                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3345                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3346                        ("user_a".to_string(), "sup".to_string(), false),
3347                        ("user_b".to_string(), "can you see this?".to_string(), false),
3348                    ]
3349            })
3350            .await;
3351
3352        // Ensure client A and B can communicate normally after reconnection.
3353        channel_a
3354            .update(&mut cx_a, |channel, cx| {
3355                channel.send_message("you online?".to_string(), cx).unwrap()
3356            })
3357            .await
3358            .unwrap();
3359        channel_b
3360            .condition(&cx_b, |channel, _| {
3361                channel_messages(channel)
3362                    == [
3363                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3364                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3365                        ("user_a".to_string(), "sup".to_string(), false),
3366                        ("user_b".to_string(), "can you see this?".to_string(), false),
3367                        ("user_a".to_string(), "you online?".to_string(), false),
3368                    ]
3369            })
3370            .await;
3371
3372        channel_b
3373            .update(&mut cx_b, |channel, cx| {
3374                channel.send_message("yep".to_string(), cx).unwrap()
3375            })
3376            .await
3377            .unwrap();
3378        channel_a
3379            .condition(&cx_a, |channel, _| {
3380                channel_messages(channel)
3381                    == [
3382                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3383                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3384                        ("user_a".to_string(), "sup".to_string(), false),
3385                        ("user_b".to_string(), "can you see this?".to_string(), false),
3386                        ("user_a".to_string(), "you online?".to_string(), false),
3387                        ("user_b".to_string(), "yep".to_string(), false),
3388                    ]
3389            })
3390            .await;
3391    }
3392
3393    #[gpui::test(iterations = 10)]
3394    async fn test_contacts(
3395        mut cx_a: TestAppContext,
3396        mut cx_b: TestAppContext,
3397        mut cx_c: TestAppContext,
3398    ) {
3399        cx_a.foreground().forbid_parking();
3400        let lang_registry = Arc::new(LanguageRegistry::new());
3401        let fs = Arc::new(FakeFs::new(cx_a.background()));
3402
3403        // Connect to a server as 3 clients.
3404        let mut server = TestServer::start(cx_a.foreground()).await;
3405        let client_a = server.create_client(&mut cx_a, "user_a").await;
3406        let client_b = server.create_client(&mut cx_b, "user_b").await;
3407        let client_c = server.create_client(&mut cx_c, "user_c").await;
3408
3409        // Share a worktree as client A.
3410        fs.insert_tree(
3411            "/a",
3412            json!({
3413                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3414            }),
3415        )
3416        .await;
3417
3418        let project_a = cx_a.update(|cx| {
3419            Project::local(
3420                client_a.clone(),
3421                client_a.user_store.clone(),
3422                lang_registry.clone(),
3423                fs.clone(),
3424                cx,
3425            )
3426        });
3427        let (worktree_a, _) = project_a
3428            .update(&mut cx_a, |p, cx| {
3429                p.find_or_create_local_worktree("/a", false, cx)
3430            })
3431            .await
3432            .unwrap();
3433        worktree_a
3434            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3435            .await;
3436
3437        client_a
3438            .user_store
3439            .condition(&cx_a, |user_store, _| {
3440                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3441            })
3442            .await;
3443        client_b
3444            .user_store
3445            .condition(&cx_b, |user_store, _| {
3446                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3447            })
3448            .await;
3449        client_c
3450            .user_store
3451            .condition(&cx_c, |user_store, _| {
3452                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3453            })
3454            .await;
3455
3456        let project_id = project_a
3457            .update(&mut cx_a, |project, _| project.next_remote_id())
3458            .await;
3459        project_a
3460            .update(&mut cx_a, |project, cx| project.share(cx))
3461            .await
3462            .unwrap();
3463
3464        let _project_b = Project::remote(
3465            project_id,
3466            client_b.clone(),
3467            client_b.user_store.clone(),
3468            lang_registry.clone(),
3469            fs.clone(),
3470            &mut cx_b.to_async(),
3471        )
3472        .await
3473        .unwrap();
3474
3475        client_a
3476            .user_store
3477            .condition(&cx_a, |user_store, _| {
3478                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3479            })
3480            .await;
3481        client_b
3482            .user_store
3483            .condition(&cx_b, |user_store, _| {
3484                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3485            })
3486            .await;
3487        client_c
3488            .user_store
3489            .condition(&cx_c, |user_store, _| {
3490                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3491            })
3492            .await;
3493
3494        project_a
3495            .condition(&cx_a, |project, _| {
3496                project.collaborators().contains_key(&client_b.peer_id)
3497            })
3498            .await;
3499
3500        cx_a.update(move |_| drop(project_a));
3501        client_a
3502            .user_store
3503            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3504            .await;
3505        client_b
3506            .user_store
3507            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3508            .await;
3509        client_c
3510            .user_store
3511            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3512            .await;
3513
3514        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3515            user_store
3516                .contacts()
3517                .iter()
3518                .map(|contact| {
3519                    let worktrees = contact
3520                        .projects
3521                        .iter()
3522                        .map(|p| {
3523                            (
3524                                p.worktree_root_names[0].as_str(),
3525                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3526                            )
3527                        })
3528                        .collect();
3529                    (contact.user.github_login.as_str(), worktrees)
3530                })
3531                .collect()
3532        }
3533    }
3534
3535    struct TestServer {
3536        peer: Arc<Peer>,
3537        app_state: Arc<AppState>,
3538        server: Arc<Server>,
3539        foreground: Rc<executor::Foreground>,
3540        notifications: mpsc::Receiver<()>,
3541        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3542        forbid_connections: Arc<AtomicBool>,
3543        _test_db: TestDb,
3544    }
3545
3546    impl TestServer {
3547        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3548            let test_db = TestDb::new();
3549            let app_state = Self::build_app_state(&test_db).await;
3550            let peer = Peer::new();
3551            let notifications = mpsc::channel(128);
3552            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3553            Self {
3554                peer,
3555                app_state,
3556                server,
3557                foreground,
3558                notifications: notifications.1,
3559                connection_killers: Default::default(),
3560                forbid_connections: Default::default(),
3561                _test_db: test_db,
3562            }
3563        }
3564
3565        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3566            let http = FakeHttpClient::with_404_response();
3567            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3568            let client_name = name.to_string();
3569            let mut client = Client::new(http.clone());
3570            let server = self.server.clone();
3571            let connection_killers = self.connection_killers.clone();
3572            let forbid_connections = self.forbid_connections.clone();
3573            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3574
3575            Arc::get_mut(&mut client)
3576                .unwrap()
3577                .override_authenticate(move |cx| {
3578                    cx.spawn(|_| async move {
3579                        let access_token = "the-token".to_string();
3580                        Ok(Credentials {
3581                            user_id: user_id.0 as u64,
3582                            access_token,
3583                        })
3584                    })
3585                })
3586                .override_establish_connection(move |credentials, cx| {
3587                    assert_eq!(credentials.user_id, user_id.0 as u64);
3588                    assert_eq!(credentials.access_token, "the-token");
3589
3590                    let server = server.clone();
3591                    let connection_killers = connection_killers.clone();
3592                    let forbid_connections = forbid_connections.clone();
3593                    let client_name = client_name.clone();
3594                    let connection_id_tx = connection_id_tx.clone();
3595                    cx.spawn(move |cx| async move {
3596                        if forbid_connections.load(SeqCst) {
3597                            Err(EstablishConnectionError::other(anyhow!(
3598                                "server is forbidding connections"
3599                            )))
3600                        } else {
3601                            let (client_conn, server_conn, kill_conn) =
3602                                Connection::in_memory(cx.background());
3603                            connection_killers.lock().insert(user_id, kill_conn);
3604                            cx.background()
3605                                .spawn(server.handle_connection(
3606                                    server_conn,
3607                                    client_name,
3608                                    user_id,
3609                                    Some(connection_id_tx),
3610                                    cx.background(),
3611                                ))
3612                                .detach();
3613                            Ok(client_conn)
3614                        }
3615                    })
3616                });
3617
3618            client
3619                .authenticate_and_connect(&cx.to_async())
3620                .await
3621                .unwrap();
3622
3623            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3624            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3625            let mut authed_user =
3626                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3627            while authed_user.next().await.unwrap().is_none() {}
3628
3629            TestClient {
3630                client,
3631                peer_id,
3632                user_store,
3633            }
3634        }
3635
3636        fn disconnect_client(&self, user_id: UserId) {
3637            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3638                let _ = kill_conn.try_send(Some(()));
3639            }
3640        }
3641
3642        fn forbid_connections(&self) {
3643            self.forbid_connections.store(true, SeqCst);
3644        }
3645
3646        fn allow_connections(&self) {
3647            self.forbid_connections.store(false, SeqCst);
3648        }
3649
3650        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3651            let mut config = Config::default();
3652            config.session_secret = "a".repeat(32);
3653            config.database_url = test_db.url.clone();
3654            let github_client = github::AppClient::test();
3655            Arc::new(AppState {
3656                db: test_db.db().clone(),
3657                handlebars: Default::default(),
3658                auth_client: auth::build_client("", ""),
3659                repo_client: github::RepoClient::test(&github_client),
3660                github_client,
3661                config,
3662            })
3663        }
3664
3665        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3666            self.server.store.read()
3667        }
3668
3669        async fn condition<F>(&mut self, mut predicate: F)
3670        where
3671            F: FnMut(&Store) -> bool,
3672        {
3673            async_std::future::timeout(Duration::from_millis(500), async {
3674                while !(predicate)(&*self.server.store.read()) {
3675                    self.foreground.start_waiting();
3676                    self.notifications.next().await;
3677                    self.foreground.finish_waiting();
3678                }
3679            })
3680            .await
3681            .expect("condition timed out");
3682        }
3683    }
3684
3685    impl Drop for TestServer {
3686        fn drop(&mut self) {
3687            self.peer.reset();
3688        }
3689    }
3690
3691    struct TestClient {
3692        client: Arc<Client>,
3693        pub peer_id: PeerId,
3694        pub user_store: ModelHandle<UserStore>,
3695    }
3696
3697    impl Deref for TestClient {
3698        type Target = Arc<Client>;
3699
3700        fn deref(&self) -> &Self::Target {
3701            &self.client
3702        }
3703    }
3704
3705    impl TestClient {
3706        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3707            UserId::from_proto(
3708                self.user_store
3709                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3710            )
3711        }
3712    }
3713
3714    impl Executor for Arc<gpui::executor::Background> {
3715        fn spawn_detached<F: 'static + Send + Future<Output = ()>>(&self, future: F) {
3716            self.spawn(future).detach();
3717        }
3718    }
3719
3720    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3721        channel
3722            .messages()
3723            .cursor::<()>()
3724            .map(|m| {
3725                (
3726                    m.sender.github_login.clone(),
3727                    m.body.clone(),
3728                    m.is_pending(),
3729                )
3730            })
3731            .collect()
3732    }
3733
3734    struct EmptyView;
3735
3736    impl gpui::Entity for EmptyView {
3737        type Event = ();
3738    }
3739
3740    impl gpui::View for EmptyView {
3741        fn ui_name() -> &'static str {
3742            "empty view"
3743        }
3744
3745        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3746            gpui::Element::boxed(gpui::elements::Empty)
3747        }
3748    }
3749}