rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage, RequestMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46
  47impl Server {
  48    pub fn new(
  49        app_state: Arc<AppState>,
  50        peer: Arc<Peer>,
  51        notifications: Option<mpsc::Sender<()>>,
  52    ) -> Arc<Self> {
  53        let mut server = Self {
  54            peer,
  55            app_state,
  56            store: Default::default(),
  57            handlers: Default::default(),
  58            notifications,
  59        };
  60
  61        server
  62            .add_request_handler(Server::ping)
  63            .add_request_handler(Server::register_project)
  64            .add_message_handler(Server::unregister_project)
  65            .add_request_handler(Server::share_project)
  66            .add_message_handler(Server::unshare_project)
  67            .add_request_handler(Server::join_project)
  68            .add_message_handler(Server::leave_project)
  69            .add_request_handler(Server::register_worktree)
  70            .add_message_handler(Server::unregister_worktree)
  71            .add_request_handler(Server::share_worktree)
  72            .add_message_handler(Server::update_worktree)
  73            .add_message_handler(Server::update_diagnostic_summary)
  74            .add_message_handler(Server::disk_based_diagnostics_updating)
  75            .add_message_handler(Server::disk_based_diagnostics_updated)
  76            .add_request_handler(Server::get_definition)
  77            .add_request_handler(Server::open_buffer)
  78            .add_message_handler(Server::close_buffer)
  79            .add_request_handler(Server::update_buffer)
  80            .add_message_handler(Server::update_buffer_file)
  81            .add_message_handler(Server::buffer_reloaded)
  82            .add_message_handler(Server::buffer_saved)
  83            .add_request_handler(Server::save_buffer)
  84            .add_request_handler(Server::format_buffers)
  85            .add_request_handler(Server::get_completions)
  86            .add_request_handler(Server::apply_additional_edits_for_completion)
  87            .add_request_handler(Server::get_code_actions)
  88            .add_request_handler(Server::apply_code_action)
  89            .add_request_handler(Server::get_channels)
  90            .add_request_handler(Server::get_users)
  91            .add_request_handler(Server::join_channel)
  92            .add_message_handler(Server::leave_channel)
  93            .add_request_handler(Server::send_channel_message)
  94            .add_request_handler(Server::get_channel_messages);
  95
  96        Arc::new(server)
  97    }
  98
  99    fn add_message_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 100    where
 101        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 102        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 103        M: EnvelopedMessage,
 104    {
 105        let prev_handler = self.handlers.insert(
 106            TypeId::of::<M>(),
 107            Box::new(move |server, envelope| {
 108                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 109                (handler)(server, *envelope).boxed()
 110            }),
 111        );
 112        if prev_handler.is_some() {
 113            panic!("registered a handler for the same message twice");
 114        }
 115        self
 116    }
 117
 118    fn add_request_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 119    where
 120        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 121        Fut: 'static + Send + Future<Output = tide::Result<M::Response>>,
 122        M: RequestMessage,
 123    {
 124        self.add_message_handler(move |server, envelope| {
 125            let receipt = envelope.receipt();
 126            let response = (handler)(server.clone(), envelope);
 127            async move {
 128                match response.await {
 129                    Ok(response) => {
 130                        server.peer.respond(receipt, response)?;
 131                        Ok(())
 132                    }
 133                    Err(error) => {
 134                        server.peer.respond_with_error(
 135                            receipt,
 136                            proto::Error {
 137                                message: error.to_string(),
 138                            },
 139                        )?;
 140                        Err(error)
 141                    }
 142                }
 143            }
 144        })
 145    }
 146
 147    pub fn handle_connection(
 148        self: &Arc<Self>,
 149        connection: Connection,
 150        addr: String,
 151        user_id: UserId,
 152        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 153    ) -> impl Future<Output = ()> {
 154        let mut this = self.clone();
 155        async move {
 156            let (connection_id, handle_io, mut incoming_rx) =
 157                this.peer.add_connection(connection).await;
 158
 159            if let Some(send_connection_id) = send_connection_id.as_mut() {
 160                let _ = send_connection_id.send(connection_id).await;
 161            }
 162
 163            this.state_mut().add_connection(connection_id, user_id);
 164            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 165                log::error!("error updating contacts for {:?}: {}", user_id, err);
 166            }
 167
 168            let handle_io = handle_io.fuse();
 169            futures::pin_mut!(handle_io);
 170            loop {
 171                let next_message = incoming_rx.next().fuse();
 172                futures::pin_mut!(next_message);
 173                futures::select_biased! {
 174                    result = handle_io => {
 175                        if let Err(err) = result {
 176                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 177                        }
 178                        break;
 179                    }
 180                    message = next_message => {
 181                        if let Some(message) = message {
 182                            let start_time = Instant::now();
 183                            let type_name = message.payload_type_name();
 184                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 185                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 186                                if let Err(err) = (handler)(this.clone(), message).await {
 187                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 188                                } else {
 189                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 190                                }
 191
 192                                if let Some(mut notifications) = this.notifications.clone() {
 193                                    let _ = notifications.send(()).await;
 194                                }
 195                            } else {
 196                                log::warn!("unhandled message: {}", type_name);
 197                            }
 198                        } else {
 199                            log::info!("rpc connection closed {:?}", addr);
 200                            break;
 201                        }
 202                    }
 203                }
 204            }
 205
 206            if let Err(err) = this.sign_out(connection_id).await {
 207                log::error!("error signing out connection {:?} - {:?}", addr, err);
 208            }
 209        }
 210    }
 211
 212    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 213        self.peer.disconnect(connection_id);
 214        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 215
 216        for (project_id, project) in removed_connection.hosted_projects {
 217            if let Some(share) = project.share {
 218                broadcast(
 219                    connection_id,
 220                    share.guests.keys().copied().collect(),
 221                    |conn_id| {
 222                        self.peer
 223                            .send(conn_id, proto::UnshareProject { project_id })
 224                    },
 225                )?;
 226            }
 227        }
 228
 229        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 230            broadcast(connection_id, peer_ids, |conn_id| {
 231                self.peer.send(
 232                    conn_id,
 233                    proto::RemoveProjectCollaborator {
 234                        project_id,
 235                        peer_id: connection_id.0,
 236                    },
 237                )
 238            })?;
 239        }
 240
 241        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 242        Ok(())
 243    }
 244
 245    async fn ping(self: Arc<Server>, _: TypedEnvelope<proto::Ping>) -> tide::Result<proto::Ack> {
 246        Ok(proto::Ack {})
 247    }
 248
 249    async fn register_project(
 250        mut self: Arc<Server>,
 251        request: TypedEnvelope<proto::RegisterProject>,
 252    ) -> tide::Result<proto::RegisterProjectResponse> {
 253        let project_id = {
 254            let mut state = self.state_mut();
 255            let user_id = state.user_id_for_connection(request.sender_id)?;
 256            state.register_project(request.sender_id, user_id)
 257        };
 258        Ok(proto::RegisterProjectResponse { project_id })
 259    }
 260
 261    async fn unregister_project(
 262        mut self: Arc<Server>,
 263        request: TypedEnvelope<proto::UnregisterProject>,
 264    ) -> tide::Result<()> {
 265        let project = self
 266            .state_mut()
 267            .unregister_project(request.payload.project_id, request.sender_id)?;
 268        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 269        Ok(())
 270    }
 271
 272    async fn share_project(
 273        mut self: Arc<Server>,
 274        request: TypedEnvelope<proto::ShareProject>,
 275    ) -> tide::Result<proto::Ack> {
 276        self.state_mut()
 277            .share_project(request.payload.project_id, request.sender_id);
 278        Ok(proto::Ack {})
 279    }
 280
 281    async fn unshare_project(
 282        mut self: Arc<Server>,
 283        request: TypedEnvelope<proto::UnshareProject>,
 284    ) -> tide::Result<()> {
 285        let project_id = request.payload.project_id;
 286        let project = self
 287            .state_mut()
 288            .unshare_project(project_id, request.sender_id)?;
 289
 290        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 291            self.peer
 292                .send(conn_id, proto::UnshareProject { project_id })
 293        })?;
 294        self.update_contacts_for_users(&project.authorized_user_ids)?;
 295        Ok(())
 296    }
 297
 298    async fn join_project(
 299        mut self: Arc<Server>,
 300        request: TypedEnvelope<proto::JoinProject>,
 301    ) -> tide::Result<proto::JoinProjectResponse> {
 302        let project_id = request.payload.project_id;
 303
 304        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 305        let (response, connection_ids, contact_user_ids) = self
 306            .state_mut()
 307            .join_project(request.sender_id, user_id, project_id)
 308            .and_then(|joined| {
 309                let share = joined.project.share()?;
 310                let peer_count = share.guests.len();
 311                let mut collaborators = Vec::with_capacity(peer_count);
 312                collaborators.push(proto::Collaborator {
 313                    peer_id: joined.project.host_connection_id.0,
 314                    replica_id: 0,
 315                    user_id: joined.project.host_user_id.to_proto(),
 316                });
 317                let worktrees = joined
 318                    .project
 319                    .worktrees
 320                    .iter()
 321                    .filter_map(|(id, worktree)| {
 322                        worktree.share.as_ref().map(|share| proto::Worktree {
 323                            id: *id,
 324                            root_name: worktree.root_name.clone(),
 325                            entries: share.entries.values().cloned().collect(),
 326                            diagnostic_summaries: share
 327                                .diagnostic_summaries
 328                                .values()
 329                                .cloned()
 330                                .collect(),
 331                            weak: worktree.weak,
 332                        })
 333                    })
 334                    .collect();
 335                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 336                    if *peer_conn_id != request.sender_id {
 337                        collaborators.push(proto::Collaborator {
 338                            peer_id: peer_conn_id.0,
 339                            replica_id: *peer_replica_id as u32,
 340                            user_id: peer_user_id.to_proto(),
 341                        });
 342                    }
 343                }
 344                let response = proto::JoinProjectResponse {
 345                    worktrees,
 346                    replica_id: joined.replica_id as u32,
 347                    collaborators,
 348                };
 349                let connection_ids = joined.project.connection_ids();
 350                let contact_user_ids = joined.project.authorized_user_ids();
 351                Ok((response, connection_ids, contact_user_ids))
 352            })?;
 353
 354        broadcast(request.sender_id, connection_ids, |conn_id| {
 355            self.peer.send(
 356                conn_id,
 357                proto::AddProjectCollaborator {
 358                    project_id,
 359                    collaborator: Some(proto::Collaborator {
 360                        peer_id: request.sender_id.0,
 361                        replica_id: response.replica_id,
 362                        user_id: user_id.to_proto(),
 363                    }),
 364                },
 365            )
 366        })?;
 367        self.update_contacts_for_users(&contact_user_ids)?;
 368        Ok(response)
 369    }
 370
 371    async fn leave_project(
 372        mut self: Arc<Server>,
 373        request: TypedEnvelope<proto::LeaveProject>,
 374    ) -> tide::Result<()> {
 375        let sender_id = request.sender_id;
 376        let project_id = request.payload.project_id;
 377        let worktree = self.state_mut().leave_project(sender_id, project_id)?;
 378
 379        broadcast(sender_id, worktree.connection_ids, |conn_id| {
 380            self.peer.send(
 381                conn_id,
 382                proto::RemoveProjectCollaborator {
 383                    project_id,
 384                    peer_id: sender_id.0,
 385                },
 386            )
 387        })?;
 388        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 389
 390        Ok(())
 391    }
 392
 393    async fn register_worktree(
 394        mut self: Arc<Server>,
 395        request: TypedEnvelope<proto::RegisterWorktree>,
 396    ) -> tide::Result<proto::Ack> {
 397        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 398
 399        let mut contact_user_ids = HashSet::default();
 400        contact_user_ids.insert(host_user_id);
 401        for github_login in request.payload.authorized_logins {
 402            let contact_user_id = self.app_state.db.create_user(&github_login, false).await?;
 403            contact_user_ids.insert(contact_user_id);
 404        }
 405
 406        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 407        self.state_mut().register_worktree(
 408            request.payload.project_id,
 409            request.payload.worktree_id,
 410            request.sender_id,
 411            Worktree {
 412                authorized_user_ids: contact_user_ids.clone(),
 413                root_name: request.payload.root_name,
 414                share: None,
 415                weak: false,
 416            },
 417        )?;
 418        self.update_contacts_for_users(&contact_user_ids)?;
 419        Ok(proto::Ack {})
 420    }
 421
 422    async fn unregister_worktree(
 423        mut self: Arc<Server>,
 424        request: TypedEnvelope<proto::UnregisterWorktree>,
 425    ) -> tide::Result<()> {
 426        let project_id = request.payload.project_id;
 427        let worktree_id = request.payload.worktree_id;
 428        let (worktree, guest_connection_ids) =
 429            self.state_mut()
 430                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 431        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 432            self.peer.send(
 433                conn_id,
 434                proto::UnregisterWorktree {
 435                    project_id,
 436                    worktree_id,
 437                },
 438            )
 439        })?;
 440        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 441        Ok(())
 442    }
 443
 444    async fn share_worktree(
 445        mut self: Arc<Server>,
 446        mut request: TypedEnvelope<proto::ShareWorktree>,
 447    ) -> tide::Result<proto::Ack> {
 448        let worktree = request
 449            .payload
 450            .worktree
 451            .as_mut()
 452            .ok_or_else(|| anyhow!("missing worktree"))?;
 453        let entries = worktree
 454            .entries
 455            .iter()
 456            .map(|entry| (entry.id, entry.clone()))
 457            .collect();
 458        let diagnostic_summaries = worktree
 459            .diagnostic_summaries
 460            .iter()
 461            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 462            .collect();
 463
 464        let shared_worktree = self.state_mut().share_worktree(
 465            request.payload.project_id,
 466            worktree.id,
 467            request.sender_id,
 468            entries,
 469            diagnostic_summaries,
 470        )?;
 471
 472        broadcast(
 473            request.sender_id,
 474            shared_worktree.connection_ids,
 475            |connection_id| {
 476                self.peer
 477                    .forward_send(request.sender_id, connection_id, request.payload.clone())
 478            },
 479        )?;
 480        self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 481
 482        Ok(proto::Ack {})
 483    }
 484
 485    async fn update_worktree(
 486        mut self: Arc<Server>,
 487        request: TypedEnvelope<proto::UpdateWorktree>,
 488    ) -> tide::Result<()> {
 489        let connection_ids = self.state_mut().update_worktree(
 490            request.sender_id,
 491            request.payload.project_id,
 492            request.payload.worktree_id,
 493            &request.payload.removed_entries,
 494            &request.payload.updated_entries,
 495        )?;
 496
 497        broadcast(request.sender_id, connection_ids, |connection_id| {
 498            self.peer
 499                .forward_send(request.sender_id, connection_id, request.payload.clone())
 500        })?;
 501
 502        Ok(())
 503    }
 504
 505    async fn update_diagnostic_summary(
 506        mut self: Arc<Server>,
 507        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 508    ) -> tide::Result<()> {
 509        let summary = request
 510            .payload
 511            .summary
 512            .clone()
 513            .ok_or_else(|| anyhow!("invalid summary"))?;
 514        let receiver_ids = self.state_mut().update_diagnostic_summary(
 515            request.payload.project_id,
 516            request.payload.worktree_id,
 517            request.sender_id,
 518            summary,
 519        )?;
 520
 521        broadcast(request.sender_id, receiver_ids, |connection_id| {
 522            self.peer
 523                .forward_send(request.sender_id, connection_id, request.payload.clone())
 524        })?;
 525        Ok(())
 526    }
 527
 528    async fn disk_based_diagnostics_updating(
 529        self: Arc<Server>,
 530        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 531    ) -> tide::Result<()> {
 532        let receiver_ids = self
 533            .state()
 534            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 535        broadcast(request.sender_id, receiver_ids, |connection_id| {
 536            self.peer
 537                .forward_send(request.sender_id, connection_id, request.payload.clone())
 538        })?;
 539        Ok(())
 540    }
 541
 542    async fn disk_based_diagnostics_updated(
 543        self: Arc<Server>,
 544        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 545    ) -> tide::Result<()> {
 546        let receiver_ids = self
 547            .state()
 548            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 549        broadcast(request.sender_id, receiver_ids, |connection_id| {
 550            self.peer
 551                .forward_send(request.sender_id, connection_id, request.payload.clone())
 552        })?;
 553        Ok(())
 554    }
 555
 556    async fn get_definition(
 557        self: Arc<Server>,
 558        request: TypedEnvelope<proto::GetDefinition>,
 559    ) -> tide::Result<proto::GetDefinitionResponse> {
 560        let host_connection_id = self
 561            .state()
 562            .read_project(request.payload.project_id, request.sender_id)?
 563            .host_connection_id;
 564        Ok(self
 565            .peer
 566            .forward_request(request.sender_id, host_connection_id, request.payload)
 567            .await?)
 568    }
 569
 570    async fn open_buffer(
 571        self: Arc<Server>,
 572        request: TypedEnvelope<proto::OpenBuffer>,
 573    ) -> tide::Result<proto::OpenBufferResponse> {
 574        let host_connection_id = self
 575            .state()
 576            .read_project(request.payload.project_id, request.sender_id)?
 577            .host_connection_id;
 578        Ok(self
 579            .peer
 580            .forward_request(request.sender_id, host_connection_id, request.payload)
 581            .await?)
 582    }
 583
 584    async fn close_buffer(
 585        self: Arc<Server>,
 586        request: TypedEnvelope<proto::CloseBuffer>,
 587    ) -> tide::Result<()> {
 588        let host_connection_id = self
 589            .state()
 590            .read_project(request.payload.project_id, request.sender_id)?
 591            .host_connection_id;
 592        self.peer
 593            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 594        Ok(())
 595    }
 596
 597    async fn save_buffer(
 598        self: Arc<Server>,
 599        request: TypedEnvelope<proto::SaveBuffer>,
 600    ) -> tide::Result<proto::BufferSaved> {
 601        let host;
 602        let mut guests;
 603        {
 604            let state = self.state();
 605            let project = state.read_project(request.payload.project_id, request.sender_id)?;
 606            host = project.host_connection_id;
 607            guests = project.guest_connection_ids()
 608        }
 609
 610        let response = self
 611            .peer
 612            .forward_request(request.sender_id, host, request.payload.clone())
 613            .await?;
 614
 615        guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id);
 616        broadcast(host, guests, |conn_id| {
 617            self.peer.forward_send(host, conn_id, response.clone())
 618        })?;
 619
 620        Ok(response)
 621    }
 622
 623    async fn format_buffers(
 624        self: Arc<Server>,
 625        request: TypedEnvelope<proto::FormatBuffers>,
 626    ) -> tide::Result<proto::FormatBuffersResponse> {
 627        let host = self
 628            .state()
 629            .read_project(request.payload.project_id, request.sender_id)?
 630            .host_connection_id;
 631        Ok(self
 632            .peer
 633            .forward_request(request.sender_id, host, request.payload.clone())
 634            .await?)
 635    }
 636
 637    async fn get_completions(
 638        self: Arc<Server>,
 639        request: TypedEnvelope<proto::GetCompletions>,
 640    ) -> tide::Result<proto::GetCompletionsResponse> {
 641        let host = self
 642            .state()
 643            .read_project(request.payload.project_id, request.sender_id)?
 644            .host_connection_id;
 645        Ok(self
 646            .peer
 647            .forward_request(request.sender_id, host, request.payload.clone())
 648            .await?)
 649    }
 650
 651    async fn apply_additional_edits_for_completion(
 652        self: Arc<Server>,
 653        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 654    ) -> tide::Result<proto::ApplyCompletionAdditionalEditsResponse> {
 655        let host = self
 656            .state()
 657            .read_project(request.payload.project_id, request.sender_id)?
 658            .host_connection_id;
 659        Ok(self
 660            .peer
 661            .forward_request(request.sender_id, host, request.payload.clone())
 662            .await?)
 663    }
 664
 665    async fn get_code_actions(
 666        self: Arc<Server>,
 667        request: TypedEnvelope<proto::GetCodeActions>,
 668    ) -> tide::Result<proto::GetCodeActionsResponse> {
 669        let host = self
 670            .state()
 671            .read_project(request.payload.project_id, request.sender_id)?
 672            .host_connection_id;
 673        Ok(self
 674            .peer
 675            .forward_request(request.sender_id, host, request.payload.clone())
 676            .await?)
 677    }
 678
 679    async fn apply_code_action(
 680        self: Arc<Server>,
 681        request: TypedEnvelope<proto::ApplyCodeAction>,
 682    ) -> tide::Result<proto::ApplyCodeActionResponse> {
 683        let host = self
 684            .state()
 685            .read_project(request.payload.project_id, request.sender_id)?
 686            .host_connection_id;
 687        Ok(self
 688            .peer
 689            .forward_request(request.sender_id, host, request.payload.clone())
 690            .await?)
 691    }
 692
 693    async fn update_buffer(
 694        self: Arc<Server>,
 695        request: TypedEnvelope<proto::UpdateBuffer>,
 696    ) -> tide::Result<proto::Ack> {
 697        let receiver_ids = self
 698            .state()
 699            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 700        broadcast(request.sender_id, receiver_ids, |connection_id| {
 701            self.peer
 702                .forward_send(request.sender_id, connection_id, request.payload.clone())
 703        })?;
 704        Ok(proto::Ack {})
 705    }
 706
 707    async fn update_buffer_file(
 708        self: Arc<Server>,
 709        request: TypedEnvelope<proto::UpdateBufferFile>,
 710    ) -> tide::Result<()> {
 711        let receiver_ids = self
 712            .state()
 713            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 714        broadcast(request.sender_id, receiver_ids, |connection_id| {
 715            self.peer
 716                .forward_send(request.sender_id, connection_id, request.payload.clone())
 717        })?;
 718        Ok(())
 719    }
 720
 721    async fn buffer_reloaded(
 722        self: Arc<Server>,
 723        request: TypedEnvelope<proto::BufferReloaded>,
 724    ) -> tide::Result<()> {
 725        let receiver_ids = self
 726            .state()
 727            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 728        broadcast(request.sender_id, receiver_ids, |connection_id| {
 729            self.peer
 730                .forward_send(request.sender_id, connection_id, request.payload.clone())
 731        })?;
 732        Ok(())
 733    }
 734
 735    async fn buffer_saved(
 736        self: Arc<Server>,
 737        request: TypedEnvelope<proto::BufferSaved>,
 738    ) -> tide::Result<()> {
 739        let receiver_ids = self
 740            .state()
 741            .project_connection_ids(request.payload.project_id, request.sender_id)?;
 742        broadcast(request.sender_id, receiver_ids, |connection_id| {
 743            self.peer
 744                .forward_send(request.sender_id, connection_id, request.payload.clone())
 745        })?;
 746        Ok(())
 747    }
 748
 749    async fn get_channels(
 750        self: Arc<Server>,
 751        request: TypedEnvelope<proto::GetChannels>,
 752    ) -> tide::Result<proto::GetChannelsResponse> {
 753        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 754        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 755        Ok(proto::GetChannelsResponse {
 756            channels: channels
 757                .into_iter()
 758                .map(|chan| proto::Channel {
 759                    id: chan.id.to_proto(),
 760                    name: chan.name,
 761                })
 762                .collect(),
 763        })
 764    }
 765
 766    async fn get_users(
 767        self: Arc<Server>,
 768        request: TypedEnvelope<proto::GetUsers>,
 769    ) -> tide::Result<proto::GetUsersResponse> {
 770        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 771        let users = self
 772            .app_state
 773            .db
 774            .get_users_by_ids(user_ids)
 775            .await?
 776            .into_iter()
 777            .map(|user| proto::User {
 778                id: user.id.to_proto(),
 779                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 780                github_login: user.github_login,
 781            })
 782            .collect();
 783        Ok(proto::GetUsersResponse { users })
 784    }
 785
 786    fn update_contacts_for_users<'a>(
 787        self: &Arc<Server>,
 788        user_ids: impl IntoIterator<Item = &'a UserId>,
 789    ) -> anyhow::Result<()> {
 790        let mut result = Ok(());
 791        let state = self.state();
 792        for user_id in user_ids {
 793            let contacts = state.contacts_for_user(*user_id);
 794            for connection_id in state.connection_ids_for_user(*user_id) {
 795                if let Err(error) = self.peer.send(
 796                    connection_id,
 797                    proto::UpdateContacts {
 798                        contacts: contacts.clone(),
 799                    },
 800                ) {
 801                    result = Err(error);
 802                }
 803            }
 804        }
 805        result
 806    }
 807
 808    async fn join_channel(
 809        mut self: Arc<Self>,
 810        request: TypedEnvelope<proto::JoinChannel>,
 811    ) -> tide::Result<proto::JoinChannelResponse> {
 812        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 813        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 814        if !self
 815            .app_state
 816            .db
 817            .can_user_access_channel(user_id, channel_id)
 818            .await?
 819        {
 820            Err(anyhow!("access denied"))?;
 821        }
 822
 823        self.state_mut().join_channel(request.sender_id, channel_id);
 824        let messages = self
 825            .app_state
 826            .db
 827            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 828            .await?
 829            .into_iter()
 830            .map(|msg| proto::ChannelMessage {
 831                id: msg.id.to_proto(),
 832                body: msg.body,
 833                timestamp: msg.sent_at.unix_timestamp() as u64,
 834                sender_id: msg.sender_id.to_proto(),
 835                nonce: Some(msg.nonce.as_u128().into()),
 836            })
 837            .collect::<Vec<_>>();
 838        Ok(proto::JoinChannelResponse {
 839            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 840            messages,
 841        })
 842    }
 843
 844    async fn leave_channel(
 845        mut self: Arc<Self>,
 846        request: TypedEnvelope<proto::LeaveChannel>,
 847    ) -> tide::Result<()> {
 848        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 849        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 850        if !self
 851            .app_state
 852            .db
 853            .can_user_access_channel(user_id, channel_id)
 854            .await?
 855        {
 856            Err(anyhow!("access denied"))?;
 857        }
 858
 859        self.state_mut()
 860            .leave_channel(request.sender_id, channel_id);
 861
 862        Ok(())
 863    }
 864
 865    async fn send_channel_message(
 866        self: Arc<Self>,
 867        request: TypedEnvelope<proto::SendChannelMessage>,
 868    ) -> tide::Result<proto::SendChannelMessageResponse> {
 869        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 870        let user_id;
 871        let connection_ids;
 872        {
 873            let state = self.state();
 874            user_id = state.user_id_for_connection(request.sender_id)?;
 875            connection_ids = state.channel_connection_ids(channel_id)?;
 876        }
 877
 878        // Validate the message body.
 879        let body = request.payload.body.trim().to_string();
 880        if body.len() > MAX_MESSAGE_LEN {
 881            return Err(anyhow!("message is too long"))?;
 882        }
 883        if body.is_empty() {
 884            return Err(anyhow!("message can't be blank"))?;
 885        }
 886
 887        let timestamp = OffsetDateTime::now_utc();
 888        let nonce = request
 889            .payload
 890            .nonce
 891            .ok_or_else(|| anyhow!("nonce can't be blank"))?;
 892
 893        let message_id = self
 894            .app_state
 895            .db
 896            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 897            .await?
 898            .to_proto();
 899        let message = proto::ChannelMessage {
 900            sender_id: user_id.to_proto(),
 901            id: message_id,
 902            body,
 903            timestamp: timestamp.unix_timestamp() as u64,
 904            nonce: Some(nonce),
 905        };
 906        broadcast(request.sender_id, connection_ids, |conn_id| {
 907            self.peer.send(
 908                conn_id,
 909                proto::ChannelMessageSent {
 910                    channel_id: channel_id.to_proto(),
 911                    message: Some(message.clone()),
 912                },
 913            )
 914        })?;
 915        Ok(proto::SendChannelMessageResponse {
 916            message: Some(message),
 917        })
 918    }
 919
 920    async fn get_channel_messages(
 921        self: Arc<Self>,
 922        request: TypedEnvelope<proto::GetChannelMessages>,
 923    ) -> tide::Result<proto::GetChannelMessagesResponse> {
 924        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 925        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 926        if !self
 927            .app_state
 928            .db
 929            .can_user_access_channel(user_id, channel_id)
 930            .await?
 931        {
 932            Err(anyhow!("access denied"))?;
 933        }
 934
 935        let messages = self
 936            .app_state
 937            .db
 938            .get_channel_messages(
 939                channel_id,
 940                MESSAGE_COUNT_PER_PAGE,
 941                Some(MessageId::from_proto(request.payload.before_message_id)),
 942            )
 943            .await?
 944            .into_iter()
 945            .map(|msg| proto::ChannelMessage {
 946                id: msg.id.to_proto(),
 947                body: msg.body,
 948                timestamp: msg.sent_at.unix_timestamp() as u64,
 949                sender_id: msg.sender_id.to_proto(),
 950                nonce: Some(msg.nonce.as_u128().into()),
 951            })
 952            .collect::<Vec<_>>();
 953
 954        Ok(proto::GetChannelMessagesResponse {
 955            done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 956            messages,
 957        })
 958    }
 959
 960    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
 961        self.store.read()
 962    }
 963
 964    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
 965        self.store.write()
 966    }
 967}
 968
 969fn broadcast<F>(
 970    sender_id: ConnectionId,
 971    receiver_ids: Vec<ConnectionId>,
 972    mut f: F,
 973) -> anyhow::Result<()>
 974where
 975    F: FnMut(ConnectionId) -> anyhow::Result<()>,
 976{
 977    let mut result = Ok(());
 978    for receiver_id in receiver_ids {
 979        if receiver_id != sender_id {
 980            if let Err(error) = f(receiver_id) {
 981                if result.is_ok() {
 982                    result = Err(error);
 983                }
 984            }
 985        }
 986    }
 987    result
 988}
 989
 990pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
 991    let server = Server::new(app.state().clone(), rpc.clone(), None);
 992    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
 993        let server = server.clone();
 994        async move {
 995            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 996
 997            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
 998            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
 999            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1000            let client_protocol_version: Option<u32> = request
1001                .header("X-Zed-Protocol-Version")
1002                .and_then(|v| v.as_str().parse().ok());
1003
1004            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1005                return Ok(Response::new(StatusCode::UpgradeRequired));
1006            }
1007
1008            let header = match request.header("Sec-Websocket-Key") {
1009                Some(h) => h.as_str(),
1010                None => return Err(anyhow!("expected sec-websocket-key"))?,
1011            };
1012
1013            let user_id = process_auth_header(&request).await?;
1014
1015            let mut response = Response::new(StatusCode::SwitchingProtocols);
1016            response.insert_header(UPGRADE, "websocket");
1017            response.insert_header(CONNECTION, "Upgrade");
1018            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1019            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1020            response.insert_header("Sec-Websocket-Version", "13");
1021
1022            let http_res: &mut tide::http::Response = response.as_mut();
1023            let upgrade_receiver = http_res.recv_upgrade().await;
1024            let addr = request.remote().unwrap_or("unknown").to_string();
1025            task::spawn(async move {
1026                if let Some(stream) = upgrade_receiver.await {
1027                    server
1028                        .handle_connection(
1029                            Connection::new(
1030                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1031                            ),
1032                            addr,
1033                            user_id,
1034                            None,
1035                        )
1036                        .await;
1037                }
1038            });
1039
1040            Ok(response)
1041        }
1042    });
1043}
1044
1045fn header_contains_ignore_case<T>(
1046    request: &tide::Request<T>,
1047    header_name: HeaderName,
1048    value: &str,
1049) -> bool {
1050    request
1051        .header(header_name)
1052        .map(|h| {
1053            h.as_str()
1054                .split(',')
1055                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1056        })
1057        .unwrap_or(false)
1058}
1059
1060#[cfg(test)]
1061mod tests {
1062    use super::*;
1063    use crate::{
1064        auth,
1065        db::{tests::TestDb, UserId},
1066        github, AppState, Config,
1067    };
1068    use ::rpc::Peer;
1069    use async_std::task;
1070    use gpui::{executor, ModelHandle, TestAppContext};
1071    use parking_lot::Mutex;
1072    use postage::{mpsc, watch};
1073    use rand::prelude::*;
1074    use rpc::PeerId;
1075    use serde_json::json;
1076    use sqlx::types::time::OffsetDateTime;
1077    use std::{
1078        ops::Deref,
1079        path::Path,
1080        rc::Rc,
1081        sync::{
1082            atomic::{AtomicBool, Ordering::SeqCst},
1083            Arc,
1084        },
1085        time::Duration,
1086    };
1087    use zed::{
1088        client::{
1089            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1090            EstablishConnectionError, UserStore,
1091        },
1092        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1093        fs::{FakeFs, Fs as _},
1094        language::{
1095            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1096            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1097        },
1098        lsp,
1099        project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
1100    };
1101
1102    #[cfg(test)]
1103    #[ctor::ctor]
1104    fn init_logger() {
1105        if std::env::var("RUST_LOG").is_ok() {
1106            env_logger::init();
1107        }
1108    }
1109
1110    #[gpui::test(iterations = 10)]
1111    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1112        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1113        let lang_registry = Arc::new(LanguageRegistry::new());
1114        let fs = Arc::new(FakeFs::new(cx_a.background()));
1115        cx_a.foreground().forbid_parking();
1116
1117        // Connect to a server as 2 clients.
1118        let mut server = TestServer::start(cx_a.foreground()).await;
1119        let client_a = server.create_client(&mut cx_a, "user_a").await;
1120        let client_b = server.create_client(&mut cx_b, "user_b").await;
1121
1122        // Share a project as client A
1123        fs.insert_tree(
1124            "/a",
1125            json!({
1126                ".zed.toml": r#"collaborators = ["user_b"]"#,
1127                "a.txt": "a-contents",
1128                "b.txt": "b-contents",
1129            }),
1130        )
1131        .await;
1132        let project_a = cx_a.update(|cx| {
1133            Project::local(
1134                client_a.clone(),
1135                client_a.user_store.clone(),
1136                lang_registry.clone(),
1137                fs.clone(),
1138                cx,
1139            )
1140        });
1141        let (worktree_a, _) = project_a
1142            .update(&mut cx_a, |p, cx| {
1143                p.find_or_create_local_worktree("/a", false, cx)
1144            })
1145            .await
1146            .unwrap();
1147        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1148        worktree_a
1149            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1150            .await;
1151        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1152        project_a
1153            .update(&mut cx_a, |p, cx| p.share(cx))
1154            .await
1155            .unwrap();
1156
1157        // Join that project as client B
1158        let project_b = Project::remote(
1159            project_id,
1160            client_b.clone(),
1161            client_b.user_store.clone(),
1162            lang_registry.clone(),
1163            fs.clone(),
1164            &mut cx_b.to_async(),
1165        )
1166        .await
1167        .unwrap();
1168
1169        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1170            assert_eq!(
1171                project
1172                    .collaborators()
1173                    .get(&client_a.peer_id)
1174                    .unwrap()
1175                    .user
1176                    .github_login,
1177                "user_a"
1178            );
1179            project.replica_id()
1180        });
1181        project_a
1182            .condition(&cx_a, |tree, _| {
1183                tree.collaborators()
1184                    .get(&client_b.peer_id)
1185                    .map_or(false, |collaborator| {
1186                        collaborator.replica_id == replica_id_b
1187                            && collaborator.user.github_login == "user_b"
1188                    })
1189            })
1190            .await;
1191
1192        // Open the same file as client B and client A.
1193        let buffer_b = project_b
1194            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1195            .await
1196            .unwrap();
1197        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1198        buffer_b.read_with(&cx_b, |buf, cx| {
1199            assert_eq!(buf.read(cx).text(), "b-contents")
1200        });
1201        project_a.read_with(&cx_a, |project, cx| {
1202            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1203        });
1204        let buffer_a = project_a
1205            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1206            .await
1207            .unwrap();
1208
1209        let editor_b = cx_b.add_view(window_b, |cx| {
1210            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1211        });
1212
1213        // TODO
1214        // // Create a selection set as client B and see that selection set as client A.
1215        // buffer_a
1216        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1217        //     .await;
1218
1219        // Edit the buffer as client B and see that edit as client A.
1220        editor_b.update(&mut cx_b, |editor, cx| {
1221            editor.handle_input(&Input("ok, ".into()), cx)
1222        });
1223        buffer_a
1224            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1225            .await;
1226
1227        // TODO
1228        // // Remove the selection set as client B, see those selections disappear as client A.
1229        cx_b.update(move |_| drop(editor_b));
1230        // buffer_a
1231        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1232        //     .await;
1233
1234        // Close the buffer as client A, see that the buffer is closed.
1235        cx_a.update(move |_| drop(buffer_a));
1236        project_a
1237            .condition(&cx_a, |project, cx| {
1238                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1239            })
1240            .await;
1241
1242        // Dropping the client B's project removes client B from client A's collaborators.
1243        cx_b.update(move |_| drop(project_b));
1244        project_a
1245            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1246            .await;
1247    }
1248
1249    #[gpui::test(iterations = 10)]
1250    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1251        let lang_registry = Arc::new(LanguageRegistry::new());
1252        let fs = Arc::new(FakeFs::new(cx_a.background()));
1253        cx_a.foreground().forbid_parking();
1254
1255        // Connect to a server as 2 clients.
1256        let mut server = TestServer::start(cx_a.foreground()).await;
1257        let client_a = server.create_client(&mut cx_a, "user_a").await;
1258        let client_b = server.create_client(&mut cx_b, "user_b").await;
1259
1260        // Share a project as client A
1261        fs.insert_tree(
1262            "/a",
1263            json!({
1264                ".zed.toml": r#"collaborators = ["user_b"]"#,
1265                "a.txt": "a-contents",
1266                "b.txt": "b-contents",
1267            }),
1268        )
1269        .await;
1270        let project_a = cx_a.update(|cx| {
1271            Project::local(
1272                client_a.clone(),
1273                client_a.user_store.clone(),
1274                lang_registry.clone(),
1275                fs.clone(),
1276                cx,
1277            )
1278        });
1279        let (worktree_a, _) = project_a
1280            .update(&mut cx_a, |p, cx| {
1281                p.find_or_create_local_worktree("/a", false, cx)
1282            })
1283            .await
1284            .unwrap();
1285        worktree_a
1286            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1287            .await;
1288        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1289        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1290        project_a
1291            .update(&mut cx_a, |p, cx| p.share(cx))
1292            .await
1293            .unwrap();
1294        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1295
1296        // Join that project as client B
1297        let project_b = Project::remote(
1298            project_id,
1299            client_b.clone(),
1300            client_b.user_store.clone(),
1301            lang_registry.clone(),
1302            fs.clone(),
1303            &mut cx_b.to_async(),
1304        )
1305        .await
1306        .unwrap();
1307        project_b
1308            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1309            .await
1310            .unwrap();
1311
1312        // Unshare the project as client A
1313        project_a
1314            .update(&mut cx_a, |project, cx| project.unshare(cx))
1315            .await
1316            .unwrap();
1317        project_b
1318            .condition(&mut cx_b, |project, _| project.is_read_only())
1319            .await;
1320        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1321        drop(project_b);
1322
1323        // Share the project again and ensure guests can still join.
1324        project_a
1325            .update(&mut cx_a, |project, cx| project.share(cx))
1326            .await
1327            .unwrap();
1328        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1329
1330        let project_c = Project::remote(
1331            project_id,
1332            client_b.clone(),
1333            client_b.user_store.clone(),
1334            lang_registry.clone(),
1335            fs.clone(),
1336            &mut cx_b.to_async(),
1337        )
1338        .await
1339        .unwrap();
1340        project_c
1341            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1342            .await
1343            .unwrap();
1344    }
1345
1346    #[gpui::test(iterations = 10)]
1347    async fn test_propagate_saves_and_fs_changes(
1348        mut cx_a: TestAppContext,
1349        mut cx_b: TestAppContext,
1350        mut cx_c: TestAppContext,
1351    ) {
1352        let lang_registry = Arc::new(LanguageRegistry::new());
1353        let fs = Arc::new(FakeFs::new(cx_a.background()));
1354        cx_a.foreground().forbid_parking();
1355
1356        // Connect to a server as 3 clients.
1357        let mut server = TestServer::start(cx_a.foreground()).await;
1358        let client_a = server.create_client(&mut cx_a, "user_a").await;
1359        let client_b = server.create_client(&mut cx_b, "user_b").await;
1360        let client_c = server.create_client(&mut cx_c, "user_c").await;
1361
1362        // Share a worktree as client A.
1363        fs.insert_tree(
1364            "/a",
1365            json!({
1366                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1367                "file1": "",
1368                "file2": ""
1369            }),
1370        )
1371        .await;
1372        let project_a = cx_a.update(|cx| {
1373            Project::local(
1374                client_a.clone(),
1375                client_a.user_store.clone(),
1376                lang_registry.clone(),
1377                fs.clone(),
1378                cx,
1379            )
1380        });
1381        let (worktree_a, _) = project_a
1382            .update(&mut cx_a, |p, cx| {
1383                p.find_or_create_local_worktree("/a", false, cx)
1384            })
1385            .await
1386            .unwrap();
1387        worktree_a
1388            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1389            .await;
1390        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1391        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1392        project_a
1393            .update(&mut cx_a, |p, cx| p.share(cx))
1394            .await
1395            .unwrap();
1396
1397        // Join that worktree as clients B and C.
1398        let project_b = Project::remote(
1399            project_id,
1400            client_b.clone(),
1401            client_b.user_store.clone(),
1402            lang_registry.clone(),
1403            fs.clone(),
1404            &mut cx_b.to_async(),
1405        )
1406        .await
1407        .unwrap();
1408        let project_c = Project::remote(
1409            project_id,
1410            client_c.clone(),
1411            client_c.user_store.clone(),
1412            lang_registry.clone(),
1413            fs.clone(),
1414            &mut cx_c.to_async(),
1415        )
1416        .await
1417        .unwrap();
1418        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1419        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1420
1421        // Open and edit a buffer as both guests B and C.
1422        let buffer_b = project_b
1423            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1424            .await
1425            .unwrap();
1426        let buffer_c = project_c
1427            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1428            .await
1429            .unwrap();
1430        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1431        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1432
1433        // Open and edit that buffer as the host.
1434        let buffer_a = project_a
1435            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1436            .await
1437            .unwrap();
1438
1439        buffer_a
1440            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1441            .await;
1442        buffer_a.update(&mut cx_a, |buf, cx| {
1443            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1444        });
1445
1446        // Wait for edits to propagate
1447        buffer_a
1448            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1449            .await;
1450        buffer_b
1451            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1452            .await;
1453        buffer_c
1454            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1455            .await;
1456
1457        // Edit the buffer as the host and concurrently save as guest B.
1458        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1459        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1460        save_b.await.unwrap();
1461        assert_eq!(
1462            fs.load("/a/file1".as_ref()).await.unwrap(),
1463            "hi-a, i-am-c, i-am-b, i-am-a"
1464        );
1465        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1466        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1467        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1468
1469        // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
1470        // when interpreting the change event it will mistakenly think that the file has been
1471        // deleted (because its path has changed) and will subsequently fail to detect the rename.
1472        worktree_a.flush_fs_events(&cx_a).await;
1473
1474        // Make changes on host's file system, see those changes on guest worktrees.
1475        fs.rename(
1476            "/a/file1".as_ref(),
1477            "/a/file1-renamed".as_ref(),
1478            Default::default(),
1479        )
1480        .await
1481        .unwrap();
1482        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1483            .await
1484            .unwrap();
1485        fs.insert_file(Path::new("/a/file4"), "4".into())
1486            .await
1487            .unwrap();
1488
1489        worktree_a
1490            .condition(&cx_a, |tree, _| {
1491                tree.paths()
1492                    .map(|p| p.to_string_lossy())
1493                    .collect::<Vec<_>>()
1494                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1495            })
1496            .await;
1497        worktree_b
1498            .condition(&cx_b, |tree, _| {
1499                tree.paths()
1500                    .map(|p| p.to_string_lossy())
1501                    .collect::<Vec<_>>()
1502                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1503            })
1504            .await;
1505        worktree_c
1506            .condition(&cx_c, |tree, _| {
1507                tree.paths()
1508                    .map(|p| p.to_string_lossy())
1509                    .collect::<Vec<_>>()
1510                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1511            })
1512            .await;
1513
1514        // Ensure buffer files are updated as well.
1515        buffer_a
1516            .condition(&cx_a, |buf, _| {
1517                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1518            })
1519            .await;
1520        buffer_b
1521            .condition(&cx_b, |buf, _| {
1522                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1523            })
1524            .await;
1525        buffer_c
1526            .condition(&cx_c, |buf, _| {
1527                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1528            })
1529            .await;
1530    }
1531
1532    #[gpui::test(iterations = 10)]
1533    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1534        cx_a.foreground().forbid_parking();
1535        let lang_registry = Arc::new(LanguageRegistry::new());
1536        let fs = Arc::new(FakeFs::new(cx_a.background()));
1537
1538        // Connect to a server as 2 clients.
1539        let mut server = TestServer::start(cx_a.foreground()).await;
1540        let client_a = server.create_client(&mut cx_a, "user_a").await;
1541        let client_b = server.create_client(&mut cx_b, "user_b").await;
1542
1543        // Share a project as client A
1544        fs.insert_tree(
1545            "/dir",
1546            json!({
1547                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1548                "a.txt": "a-contents",
1549            }),
1550        )
1551        .await;
1552
1553        let project_a = cx_a.update(|cx| {
1554            Project::local(
1555                client_a.clone(),
1556                client_a.user_store.clone(),
1557                lang_registry.clone(),
1558                fs.clone(),
1559                cx,
1560            )
1561        });
1562        let (worktree_a, _) = project_a
1563            .update(&mut cx_a, |p, cx| {
1564                p.find_or_create_local_worktree("/dir", false, cx)
1565            })
1566            .await
1567            .unwrap();
1568        worktree_a
1569            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1570            .await;
1571        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1572        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1573        project_a
1574            .update(&mut cx_a, |p, cx| p.share(cx))
1575            .await
1576            .unwrap();
1577
1578        // Join that project as client B
1579        let project_b = Project::remote(
1580            project_id,
1581            client_b.clone(),
1582            client_b.user_store.clone(),
1583            lang_registry.clone(),
1584            fs.clone(),
1585            &mut cx_b.to_async(),
1586        )
1587        .await
1588        .unwrap();
1589        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1590
1591        // Open a buffer as client B
1592        let buffer_b = project_b
1593            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1594            .await
1595            .unwrap();
1596        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1597
1598        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1599        buffer_b.read_with(&cx_b, |buf, _| {
1600            assert!(buf.is_dirty());
1601            assert!(!buf.has_conflict());
1602        });
1603
1604        buffer_b
1605            .update(&mut cx_b, |buf, cx| buf.save(cx))
1606            .await
1607            .unwrap();
1608        worktree_b
1609            .condition(&cx_b, |_, cx| {
1610                buffer_b.read(cx).file().unwrap().mtime() != mtime
1611            })
1612            .await;
1613        buffer_b.read_with(&cx_b, |buf, _| {
1614            assert!(!buf.is_dirty());
1615            assert!(!buf.has_conflict());
1616        });
1617
1618        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1619        buffer_b.read_with(&cx_b, |buf, _| {
1620            assert!(buf.is_dirty());
1621            assert!(!buf.has_conflict());
1622        });
1623    }
1624
1625    #[gpui::test(iterations = 10)]
1626    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1627        cx_a.foreground().forbid_parking();
1628        let lang_registry = Arc::new(LanguageRegistry::new());
1629        let fs = Arc::new(FakeFs::new(cx_a.background()));
1630
1631        // Connect to a server as 2 clients.
1632        let mut server = TestServer::start(cx_a.foreground()).await;
1633        let client_a = server.create_client(&mut cx_a, "user_a").await;
1634        let client_b = server.create_client(&mut cx_b, "user_b").await;
1635
1636        // Share a project as client A
1637        fs.insert_tree(
1638            "/dir",
1639            json!({
1640                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1641                "a.txt": "a-contents",
1642            }),
1643        )
1644        .await;
1645
1646        let project_a = cx_a.update(|cx| {
1647            Project::local(
1648                client_a.clone(),
1649                client_a.user_store.clone(),
1650                lang_registry.clone(),
1651                fs.clone(),
1652                cx,
1653            )
1654        });
1655        let (worktree_a, _) = project_a
1656            .update(&mut cx_a, |p, cx| {
1657                p.find_or_create_local_worktree("/dir", false, cx)
1658            })
1659            .await
1660            .unwrap();
1661        worktree_a
1662            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1663            .await;
1664        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1665        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1666        project_a
1667            .update(&mut cx_a, |p, cx| p.share(cx))
1668            .await
1669            .unwrap();
1670
1671        // Join that project as client B
1672        let project_b = Project::remote(
1673            project_id,
1674            client_b.clone(),
1675            client_b.user_store.clone(),
1676            lang_registry.clone(),
1677            fs.clone(),
1678            &mut cx_b.to_async(),
1679        )
1680        .await
1681        .unwrap();
1682        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1683
1684        // Open a buffer as client B
1685        let buffer_b = project_b
1686            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1687            .await
1688            .unwrap();
1689        buffer_b.read_with(&cx_b, |buf, _| {
1690            assert!(!buf.is_dirty());
1691            assert!(!buf.has_conflict());
1692        });
1693
1694        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1695            .await
1696            .unwrap();
1697        buffer_b
1698            .condition(&cx_b, |buf, _| {
1699                buf.text() == "new contents" && !buf.is_dirty()
1700            })
1701            .await;
1702        buffer_b.read_with(&cx_b, |buf, _| {
1703            assert!(!buf.has_conflict());
1704        });
1705    }
1706
1707    #[gpui::test(iterations = 10)]
1708    async fn test_editing_while_guest_opens_buffer(
1709        mut cx_a: TestAppContext,
1710        mut cx_b: TestAppContext,
1711    ) {
1712        cx_a.foreground().forbid_parking();
1713        let lang_registry = Arc::new(LanguageRegistry::new());
1714        let fs = Arc::new(FakeFs::new(cx_a.background()));
1715
1716        // Connect to a server as 2 clients.
1717        let mut server = TestServer::start(cx_a.foreground()).await;
1718        let client_a = server.create_client(&mut cx_a, "user_a").await;
1719        let client_b = server.create_client(&mut cx_b, "user_b").await;
1720
1721        // Share a project as client A
1722        fs.insert_tree(
1723            "/dir",
1724            json!({
1725                ".zed.toml": r#"collaborators = ["user_b"]"#,
1726                "a.txt": "a-contents",
1727            }),
1728        )
1729        .await;
1730        let project_a = cx_a.update(|cx| {
1731            Project::local(
1732                client_a.clone(),
1733                client_a.user_store.clone(),
1734                lang_registry.clone(),
1735                fs.clone(),
1736                cx,
1737            )
1738        });
1739        let (worktree_a, _) = project_a
1740            .update(&mut cx_a, |p, cx| {
1741                p.find_or_create_local_worktree("/dir", false, cx)
1742            })
1743            .await
1744            .unwrap();
1745        worktree_a
1746            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1747            .await;
1748        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1749        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1750        project_a
1751            .update(&mut cx_a, |p, cx| p.share(cx))
1752            .await
1753            .unwrap();
1754
1755        // Join that project as client B
1756        let project_b = Project::remote(
1757            project_id,
1758            client_b.clone(),
1759            client_b.user_store.clone(),
1760            lang_registry.clone(),
1761            fs.clone(),
1762            &mut cx_b.to_async(),
1763        )
1764        .await
1765        .unwrap();
1766
1767        // Open a buffer as client A
1768        let buffer_a = project_a
1769            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1770            .await
1771            .unwrap();
1772
1773        // Start opening the same buffer as client B
1774        let buffer_b = cx_b
1775            .background()
1776            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1777        task::yield_now().await;
1778
1779        // Edit the buffer as client A while client B is still opening it.
1780        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1781
1782        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1783        let buffer_b = buffer_b.await.unwrap();
1784        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1785    }
1786
1787    #[gpui::test(iterations = 10)]
1788    async fn test_leaving_worktree_while_opening_buffer(
1789        mut cx_a: TestAppContext,
1790        mut cx_b: TestAppContext,
1791    ) {
1792        cx_a.foreground().forbid_parking();
1793        let lang_registry = Arc::new(LanguageRegistry::new());
1794        let fs = Arc::new(FakeFs::new(cx_a.background()));
1795
1796        // Connect to a server as 2 clients.
1797        let mut server = TestServer::start(cx_a.foreground()).await;
1798        let client_a = server.create_client(&mut cx_a, "user_a").await;
1799        let client_b = server.create_client(&mut cx_b, "user_b").await;
1800
1801        // Share a project as client A
1802        fs.insert_tree(
1803            "/dir",
1804            json!({
1805                ".zed.toml": r#"collaborators = ["user_b"]"#,
1806                "a.txt": "a-contents",
1807            }),
1808        )
1809        .await;
1810        let project_a = cx_a.update(|cx| {
1811            Project::local(
1812                client_a.clone(),
1813                client_a.user_store.clone(),
1814                lang_registry.clone(),
1815                fs.clone(),
1816                cx,
1817            )
1818        });
1819        let (worktree_a, _) = project_a
1820            .update(&mut cx_a, |p, cx| {
1821                p.find_or_create_local_worktree("/dir", false, cx)
1822            })
1823            .await
1824            .unwrap();
1825        worktree_a
1826            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1827            .await;
1828        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1829        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1830        project_a
1831            .update(&mut cx_a, |p, cx| p.share(cx))
1832            .await
1833            .unwrap();
1834
1835        // Join that project as client B
1836        let project_b = Project::remote(
1837            project_id,
1838            client_b.clone(),
1839            client_b.user_store.clone(),
1840            lang_registry.clone(),
1841            fs.clone(),
1842            &mut cx_b.to_async(),
1843        )
1844        .await
1845        .unwrap();
1846
1847        // See that a guest has joined as client A.
1848        project_a
1849            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1850            .await;
1851
1852        // Begin opening a buffer as client B, but leave the project before the open completes.
1853        let buffer_b = cx_b
1854            .background()
1855            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1856        cx_b.update(|_| drop(project_b));
1857        drop(buffer_b);
1858
1859        // See that the guest has left.
1860        project_a
1861            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1862            .await;
1863    }
1864
1865    #[gpui::test(iterations = 10)]
1866    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1867        cx_a.foreground().forbid_parking();
1868        let lang_registry = Arc::new(LanguageRegistry::new());
1869        let fs = Arc::new(FakeFs::new(cx_a.background()));
1870
1871        // Connect to a server as 2 clients.
1872        let mut server = TestServer::start(cx_a.foreground()).await;
1873        let client_a = server.create_client(&mut cx_a, "user_a").await;
1874        let client_b = server.create_client(&mut cx_b, "user_b").await;
1875
1876        // Share a project as client A
1877        fs.insert_tree(
1878            "/a",
1879            json!({
1880                ".zed.toml": r#"collaborators = ["user_b"]"#,
1881                "a.txt": "a-contents",
1882                "b.txt": "b-contents",
1883            }),
1884        )
1885        .await;
1886        let project_a = cx_a.update(|cx| {
1887            Project::local(
1888                client_a.clone(),
1889                client_a.user_store.clone(),
1890                lang_registry.clone(),
1891                fs.clone(),
1892                cx,
1893            )
1894        });
1895        let (worktree_a, _) = project_a
1896            .update(&mut cx_a, |p, cx| {
1897                p.find_or_create_local_worktree("/a", false, cx)
1898            })
1899            .await
1900            .unwrap();
1901        worktree_a
1902            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1903            .await;
1904        let project_id = project_a
1905            .update(&mut cx_a, |project, _| project.next_remote_id())
1906            .await;
1907        project_a
1908            .update(&mut cx_a, |project, cx| project.share(cx))
1909            .await
1910            .unwrap();
1911
1912        // Join that project as client B
1913        let _project_b = Project::remote(
1914            project_id,
1915            client_b.clone(),
1916            client_b.user_store.clone(),
1917            lang_registry.clone(),
1918            fs.clone(),
1919            &mut cx_b.to_async(),
1920        )
1921        .await
1922        .unwrap();
1923
1924        // See that a guest has joined as client A.
1925        project_a
1926            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1927            .await;
1928
1929        // Drop client B's connection and ensure client A observes client B leaving the worktree.
1930        client_b.disconnect(&cx_b.to_async()).unwrap();
1931        project_a
1932            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1933            .await;
1934    }
1935
1936    #[gpui::test(iterations = 10)]
1937    async fn test_collaborating_with_diagnostics(
1938        mut cx_a: TestAppContext,
1939        mut cx_b: TestAppContext,
1940    ) {
1941        cx_a.foreground().forbid_parking();
1942        let mut lang_registry = Arc::new(LanguageRegistry::new());
1943        let fs = Arc::new(FakeFs::new(cx_a.background()));
1944
1945        // Set up a fake language server.
1946        let (language_server_config, mut fake_language_server) =
1947            LanguageServerConfig::fake(&cx_a).await;
1948        Arc::get_mut(&mut lang_registry)
1949            .unwrap()
1950            .add(Arc::new(Language::new(
1951                LanguageConfig {
1952                    name: "Rust".to_string(),
1953                    path_suffixes: vec!["rs".to_string()],
1954                    language_server: Some(language_server_config),
1955                    ..Default::default()
1956                },
1957                Some(tree_sitter_rust::language()),
1958            )));
1959
1960        // Connect to a server as 2 clients.
1961        let mut server = TestServer::start(cx_a.foreground()).await;
1962        let client_a = server.create_client(&mut cx_a, "user_a").await;
1963        let client_b = server.create_client(&mut cx_b, "user_b").await;
1964
1965        // Share a project as client A
1966        fs.insert_tree(
1967            "/a",
1968            json!({
1969                ".zed.toml": r#"collaborators = ["user_b"]"#,
1970                "a.rs": "let one = two",
1971                "other.rs": "",
1972            }),
1973        )
1974        .await;
1975        let project_a = cx_a.update(|cx| {
1976            Project::local(
1977                client_a.clone(),
1978                client_a.user_store.clone(),
1979                lang_registry.clone(),
1980                fs.clone(),
1981                cx,
1982            )
1983        });
1984        let (worktree_a, _) = project_a
1985            .update(&mut cx_a, |p, cx| {
1986                p.find_or_create_local_worktree("/a", false, cx)
1987            })
1988            .await
1989            .unwrap();
1990        worktree_a
1991            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1992            .await;
1993        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1994        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1995        project_a
1996            .update(&mut cx_a, |p, cx| p.share(cx))
1997            .await
1998            .unwrap();
1999
2000        // Cause the language server to start.
2001        let _ = cx_a
2002            .background()
2003            .spawn(project_a.update(&mut cx_a, |project, cx| {
2004                project.open_buffer(
2005                    ProjectPath {
2006                        worktree_id,
2007                        path: Path::new("other.rs").into(),
2008                    },
2009                    cx,
2010                )
2011            }))
2012            .await
2013            .unwrap();
2014
2015        // Simulate a language server reporting errors for a file.
2016        fake_language_server
2017            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2018                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2019                version: None,
2020                diagnostics: vec![lsp::Diagnostic {
2021                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2022                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2023                    message: "message 1".to_string(),
2024                    ..Default::default()
2025                }],
2026            })
2027            .await;
2028
2029        // Wait for server to see the diagnostics update.
2030        server
2031            .condition(|store| {
2032                let worktree = store
2033                    .project(project_id)
2034                    .unwrap()
2035                    .worktrees
2036                    .get(&worktree_id.to_proto())
2037                    .unwrap();
2038
2039                !worktree
2040                    .share
2041                    .as_ref()
2042                    .unwrap()
2043                    .diagnostic_summaries
2044                    .is_empty()
2045            })
2046            .await;
2047
2048        // Join the worktree as client B.
2049        let project_b = Project::remote(
2050            project_id,
2051            client_b.clone(),
2052            client_b.user_store.clone(),
2053            lang_registry.clone(),
2054            fs.clone(),
2055            &mut cx_b.to_async(),
2056        )
2057        .await
2058        .unwrap();
2059
2060        project_b.read_with(&cx_b, |project, cx| {
2061            assert_eq!(
2062                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2063                &[(
2064                    ProjectPath {
2065                        worktree_id,
2066                        path: Arc::from(Path::new("a.rs")),
2067                    },
2068                    DiagnosticSummary {
2069                        error_count: 1,
2070                        warning_count: 0,
2071                        ..Default::default()
2072                    },
2073                )]
2074            )
2075        });
2076
2077        // Simulate a language server reporting more errors for a file.
2078        fake_language_server
2079            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2080                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2081                version: None,
2082                diagnostics: vec![
2083                    lsp::Diagnostic {
2084                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2085                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2086                        message: "message 1".to_string(),
2087                        ..Default::default()
2088                    },
2089                    lsp::Diagnostic {
2090                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2091                        range: lsp::Range::new(
2092                            lsp::Position::new(0, 10),
2093                            lsp::Position::new(0, 13),
2094                        ),
2095                        message: "message 2".to_string(),
2096                        ..Default::default()
2097                    },
2098                ],
2099            })
2100            .await;
2101
2102        // Client b gets the updated summaries
2103        project_b
2104            .condition(&cx_b, |project, cx| {
2105                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2106                    == &[(
2107                        ProjectPath {
2108                            worktree_id,
2109                            path: Arc::from(Path::new("a.rs")),
2110                        },
2111                        DiagnosticSummary {
2112                            error_count: 1,
2113                            warning_count: 1,
2114                            ..Default::default()
2115                        },
2116                    )]
2117            })
2118            .await;
2119
2120        // Open the file with the errors on client B. They should be present.
2121        let buffer_b = cx_b
2122            .background()
2123            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2124            .await
2125            .unwrap();
2126
2127        buffer_b.read_with(&cx_b, |buffer, _| {
2128            assert_eq!(
2129                buffer
2130                    .snapshot()
2131                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2132                    .map(|entry| entry)
2133                    .collect::<Vec<_>>(),
2134                &[
2135                    DiagnosticEntry {
2136                        range: Point::new(0, 4)..Point::new(0, 7),
2137                        diagnostic: Diagnostic {
2138                            group_id: 0,
2139                            message: "message 1".to_string(),
2140                            severity: lsp::DiagnosticSeverity::ERROR,
2141                            is_primary: true,
2142                            ..Default::default()
2143                        }
2144                    },
2145                    DiagnosticEntry {
2146                        range: Point::new(0, 10)..Point::new(0, 13),
2147                        diagnostic: Diagnostic {
2148                            group_id: 1,
2149                            severity: lsp::DiagnosticSeverity::WARNING,
2150                            message: "message 2".to_string(),
2151                            is_primary: true,
2152                            ..Default::default()
2153                        }
2154                    }
2155                ]
2156            );
2157        });
2158    }
2159
2160    #[gpui::test(iterations = 10)]
2161    async fn test_collaborating_with_completion(
2162        mut cx_a: TestAppContext,
2163        mut cx_b: TestAppContext,
2164    ) {
2165        cx_a.foreground().forbid_parking();
2166        let mut lang_registry = Arc::new(LanguageRegistry::new());
2167        let fs = Arc::new(FakeFs::new(cx_a.background()));
2168
2169        // Set up a fake language server.
2170        let (language_server_config, mut fake_language_server) =
2171            LanguageServerConfig::fake_with_capabilities(
2172                lsp::ServerCapabilities {
2173                    completion_provider: Some(lsp::CompletionOptions {
2174                        trigger_characters: Some(vec![".".to_string()]),
2175                        ..Default::default()
2176                    }),
2177                    ..Default::default()
2178                },
2179                &cx_a,
2180            )
2181            .await;
2182        Arc::get_mut(&mut lang_registry)
2183            .unwrap()
2184            .add(Arc::new(Language::new(
2185                LanguageConfig {
2186                    name: "Rust".to_string(),
2187                    path_suffixes: vec!["rs".to_string()],
2188                    language_server: Some(language_server_config),
2189                    ..Default::default()
2190                },
2191                Some(tree_sitter_rust::language()),
2192            )));
2193
2194        // Connect to a server as 2 clients.
2195        let mut server = TestServer::start(cx_a.foreground()).await;
2196        let client_a = server.create_client(&mut cx_a, "user_a").await;
2197        let client_b = server.create_client(&mut cx_b, "user_b").await;
2198
2199        // Share a project as client A
2200        fs.insert_tree(
2201            "/a",
2202            json!({
2203                ".zed.toml": r#"collaborators = ["user_b"]"#,
2204                "main.rs": "fn main() { a }",
2205                "other.rs": "",
2206            }),
2207        )
2208        .await;
2209        let project_a = cx_a.update(|cx| {
2210            Project::local(
2211                client_a.clone(),
2212                client_a.user_store.clone(),
2213                lang_registry.clone(),
2214                fs.clone(),
2215                cx,
2216            )
2217        });
2218        let (worktree_a, _) = project_a
2219            .update(&mut cx_a, |p, cx| {
2220                p.find_or_create_local_worktree("/a", false, cx)
2221            })
2222            .await
2223            .unwrap();
2224        worktree_a
2225            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2226            .await;
2227        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2228        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2229        project_a
2230            .update(&mut cx_a, |p, cx| p.share(cx))
2231            .await
2232            .unwrap();
2233
2234        // Join the worktree as client B.
2235        let project_b = Project::remote(
2236            project_id,
2237            client_b.clone(),
2238            client_b.user_store.clone(),
2239            lang_registry.clone(),
2240            fs.clone(),
2241            &mut cx_b.to_async(),
2242        )
2243        .await
2244        .unwrap();
2245
2246        // Open a file in an editor as the guest.
2247        let buffer_b = project_b
2248            .update(&mut cx_b, |p, cx| {
2249                p.open_buffer((worktree_id, "main.rs"), cx)
2250            })
2251            .await
2252            .unwrap();
2253        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2254        let editor_b = cx_b.add_view(window_b, |cx| {
2255            Editor::for_buffer(
2256                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2257                Arc::new(|cx| EditorSettings::test(cx)),
2258                Some(project_b.clone()),
2259                cx,
2260            )
2261        });
2262
2263        // Type a completion trigger character as the guest.
2264        editor_b.update(&mut cx_b, |editor, cx| {
2265            editor.select_ranges([13..13], None, cx);
2266            editor.handle_input(&Input(".".into()), cx);
2267            cx.focus(&editor_b);
2268        });
2269
2270        // Receive a completion request as the host's language server.
2271        // Return some completions from the host's language server.
2272        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2273            assert_eq!(
2274                params.text_document_position.text_document.uri,
2275                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2276            );
2277            assert_eq!(
2278                params.text_document_position.position,
2279                lsp::Position::new(0, 14),
2280            );
2281
2282            Some(lsp::CompletionResponse::Array(vec![
2283                lsp::CompletionItem {
2284                    label: "first_method(…)".into(),
2285                    detail: Some("fn(&mut self, B) -> C".into()),
2286                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2287                        new_text: "first_method($1)".to_string(),
2288                        range: lsp::Range::new(
2289                            lsp::Position::new(0, 14),
2290                            lsp::Position::new(0, 14),
2291                        ),
2292                    })),
2293                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2294                    ..Default::default()
2295                },
2296                lsp::CompletionItem {
2297                    label: "second_method(…)".into(),
2298                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2299                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2300                        new_text: "second_method()".to_string(),
2301                        range: lsp::Range::new(
2302                            lsp::Position::new(0, 14),
2303                            lsp::Position::new(0, 14),
2304                        ),
2305                    })),
2306                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2307                    ..Default::default()
2308                },
2309            ]))
2310        });
2311
2312        // Open the buffer on the host.
2313        let buffer_a = project_a
2314            .update(&mut cx_a, |p, cx| {
2315                p.open_buffer((worktree_id, "main.rs"), cx)
2316            })
2317            .await
2318            .unwrap();
2319        buffer_a
2320            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2321            .await;
2322
2323        // Confirm a completion on the guest.
2324        editor_b.next_notification(&cx_b).await;
2325        editor_b.update(&mut cx_b, |editor, cx| {
2326            assert!(editor.context_menu_visible());
2327            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2328            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2329        });
2330
2331        // Return a resolved completion from the host's language server.
2332        // The resolved completion has an additional text edit.
2333        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2334            assert_eq!(params.label, "first_method(…)");
2335            lsp::CompletionItem {
2336                label: "first_method(…)".into(),
2337                detail: Some("fn(&mut self, B) -> C".into()),
2338                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2339                    new_text: "first_method($1)".to_string(),
2340                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2341                })),
2342                additional_text_edits: Some(vec![lsp::TextEdit {
2343                    new_text: "use d::SomeTrait;\n".to_string(),
2344                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2345                }]),
2346                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2347                ..Default::default()
2348            }
2349        });
2350
2351        buffer_a
2352            .condition(&cx_a, |buffer, _| {
2353                buffer.text() == "fn main() { a.first_method() }"
2354            })
2355            .await;
2356
2357        // The additional edit is applied.
2358        buffer_b
2359            .condition(&cx_b, |buffer, _| {
2360                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2361            })
2362            .await;
2363        assert_eq!(
2364            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2365            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2366        );
2367    }
2368
2369    #[gpui::test(iterations = 10)]
2370    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2371        cx_a.foreground().forbid_parking();
2372        let mut lang_registry = Arc::new(LanguageRegistry::new());
2373        let fs = Arc::new(FakeFs::new(cx_a.background()));
2374
2375        // Set up a fake language server.
2376        let (language_server_config, mut fake_language_server) =
2377            LanguageServerConfig::fake(&cx_a).await;
2378        Arc::get_mut(&mut lang_registry)
2379            .unwrap()
2380            .add(Arc::new(Language::new(
2381                LanguageConfig {
2382                    name: "Rust".to_string(),
2383                    path_suffixes: vec!["rs".to_string()],
2384                    language_server: Some(language_server_config),
2385                    ..Default::default()
2386                },
2387                Some(tree_sitter_rust::language()),
2388            )));
2389
2390        // Connect to a server as 2 clients.
2391        let mut server = TestServer::start(cx_a.foreground()).await;
2392        let client_a = server.create_client(&mut cx_a, "user_a").await;
2393        let client_b = server.create_client(&mut cx_b, "user_b").await;
2394
2395        // Share a project as client A
2396        fs.insert_tree(
2397            "/a",
2398            json!({
2399                ".zed.toml": r#"collaborators = ["user_b"]"#,
2400                "a.rs": "let one = two",
2401            }),
2402        )
2403        .await;
2404        let project_a = cx_a.update(|cx| {
2405            Project::local(
2406                client_a.clone(),
2407                client_a.user_store.clone(),
2408                lang_registry.clone(),
2409                fs.clone(),
2410                cx,
2411            )
2412        });
2413        let (worktree_a, _) = project_a
2414            .update(&mut cx_a, |p, cx| {
2415                p.find_or_create_local_worktree("/a", false, cx)
2416            })
2417            .await
2418            .unwrap();
2419        worktree_a
2420            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2421            .await;
2422        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2423        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2424        project_a
2425            .update(&mut cx_a, |p, cx| p.share(cx))
2426            .await
2427            .unwrap();
2428
2429        // Join the worktree as client B.
2430        let project_b = Project::remote(
2431            project_id,
2432            client_b.clone(),
2433            client_b.user_store.clone(),
2434            lang_registry.clone(),
2435            fs.clone(),
2436            &mut cx_b.to_async(),
2437        )
2438        .await
2439        .unwrap();
2440
2441        let buffer_b = cx_b
2442            .background()
2443            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2444            .await
2445            .unwrap();
2446
2447        let format = project_b.update(&mut cx_b, |project, cx| {
2448            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2449        });
2450
2451        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2452            Some(vec![
2453                lsp::TextEdit {
2454                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2455                    new_text: "h".to_string(),
2456                },
2457                lsp::TextEdit {
2458                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2459                    new_text: "y".to_string(),
2460                },
2461            ])
2462        });
2463
2464        format.await.unwrap();
2465        assert_eq!(
2466            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2467            "let honey = two"
2468        );
2469    }
2470
2471    #[gpui::test(iterations = 10)]
2472    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2473        cx_a.foreground().forbid_parking();
2474        let mut lang_registry = Arc::new(LanguageRegistry::new());
2475        let fs = Arc::new(FakeFs::new(cx_a.background()));
2476        fs.insert_tree(
2477            "/root-1",
2478            json!({
2479                ".zed.toml": r#"collaborators = ["user_b"]"#,
2480                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2481            }),
2482        )
2483        .await;
2484        fs.insert_tree(
2485            "/root-2",
2486            json!({
2487                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2488            }),
2489        )
2490        .await;
2491
2492        // Set up a fake language server.
2493        let (language_server_config, mut fake_language_server) =
2494            LanguageServerConfig::fake(&cx_a).await;
2495        Arc::get_mut(&mut lang_registry)
2496            .unwrap()
2497            .add(Arc::new(Language::new(
2498                LanguageConfig {
2499                    name: "Rust".to_string(),
2500                    path_suffixes: vec!["rs".to_string()],
2501                    language_server: Some(language_server_config),
2502                    ..Default::default()
2503                },
2504                Some(tree_sitter_rust::language()),
2505            )));
2506
2507        // Connect to a server as 2 clients.
2508        let mut server = TestServer::start(cx_a.foreground()).await;
2509        let client_a = server.create_client(&mut cx_a, "user_a").await;
2510        let client_b = server.create_client(&mut cx_b, "user_b").await;
2511
2512        // Share a project as client A
2513        let project_a = cx_a.update(|cx| {
2514            Project::local(
2515                client_a.clone(),
2516                client_a.user_store.clone(),
2517                lang_registry.clone(),
2518                fs.clone(),
2519                cx,
2520            )
2521        });
2522        let (worktree_a, _) = project_a
2523            .update(&mut cx_a, |p, cx| {
2524                p.find_or_create_local_worktree("/root-1", false, cx)
2525            })
2526            .await
2527            .unwrap();
2528        worktree_a
2529            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2530            .await;
2531        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2532        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2533        project_a
2534            .update(&mut cx_a, |p, cx| p.share(cx))
2535            .await
2536            .unwrap();
2537
2538        // Join the worktree as client B.
2539        let project_b = Project::remote(
2540            project_id,
2541            client_b.clone(),
2542            client_b.user_store.clone(),
2543            lang_registry.clone(),
2544            fs.clone(),
2545            &mut cx_b.to_async(),
2546        )
2547        .await
2548        .unwrap();
2549
2550        // Open the file on client B.
2551        let buffer_b = cx_b
2552            .background()
2553            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2554            .await
2555            .unwrap();
2556
2557        // Request the definition of a symbol as the guest.
2558        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2559        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2560            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2561                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2562                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2563            )))
2564        });
2565
2566        let definitions_1 = definitions_1.await.unwrap();
2567        cx_b.read(|cx| {
2568            assert_eq!(definitions_1.len(), 1);
2569            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2570            let target_buffer = definitions_1[0].target_buffer.read(cx);
2571            assert_eq!(
2572                target_buffer.text(),
2573                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2574            );
2575            assert_eq!(
2576                definitions_1[0].target_range.to_point(target_buffer),
2577                Point::new(0, 6)..Point::new(0, 9)
2578            );
2579        });
2580
2581        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2582        // the previous call to `definition`.
2583        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2584        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2585            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2586                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2587                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2588            )))
2589        });
2590
2591        let definitions_2 = definitions_2.await.unwrap();
2592        cx_b.read(|cx| {
2593            assert_eq!(definitions_2.len(), 1);
2594            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2595            let target_buffer = definitions_2[0].target_buffer.read(cx);
2596            assert_eq!(
2597                target_buffer.text(),
2598                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2599            );
2600            assert_eq!(
2601                definitions_2[0].target_range.to_point(target_buffer),
2602                Point::new(1, 6)..Point::new(1, 11)
2603            );
2604        });
2605        assert_eq!(
2606            definitions_1[0].target_buffer,
2607            definitions_2[0].target_buffer
2608        );
2609
2610        cx_b.update(|_| {
2611            drop(definitions_1);
2612            drop(definitions_2);
2613        });
2614        project_b
2615            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2616            .await;
2617    }
2618
2619    #[gpui::test(iterations = 10)]
2620    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2621        mut cx_a: TestAppContext,
2622        mut cx_b: TestAppContext,
2623        mut rng: StdRng,
2624    ) {
2625        cx_a.foreground().forbid_parking();
2626        let mut lang_registry = Arc::new(LanguageRegistry::new());
2627        let fs = Arc::new(FakeFs::new(cx_a.background()));
2628        fs.insert_tree(
2629            "/root",
2630            json!({
2631                ".zed.toml": r#"collaborators = ["user_b"]"#,
2632                "a.rs": "const ONE: usize = b::TWO;",
2633                "b.rs": "const TWO: usize = 2",
2634            }),
2635        )
2636        .await;
2637
2638        // Set up a fake language server.
2639        let (language_server_config, mut fake_language_server) =
2640            LanguageServerConfig::fake(&cx_a).await;
2641        Arc::get_mut(&mut lang_registry)
2642            .unwrap()
2643            .add(Arc::new(Language::new(
2644                LanguageConfig {
2645                    name: "Rust".to_string(),
2646                    path_suffixes: vec!["rs".to_string()],
2647                    language_server: Some(language_server_config),
2648                    ..Default::default()
2649                },
2650                Some(tree_sitter_rust::language()),
2651            )));
2652
2653        // Connect to a server as 2 clients.
2654        let mut server = TestServer::start(cx_a.foreground()).await;
2655        let client_a = server.create_client(&mut cx_a, "user_a").await;
2656        let client_b = server.create_client(&mut cx_b, "user_b").await;
2657
2658        // Share a project as client A
2659        let project_a = cx_a.update(|cx| {
2660            Project::local(
2661                client_a.clone(),
2662                client_a.user_store.clone(),
2663                lang_registry.clone(),
2664                fs.clone(),
2665                cx,
2666            )
2667        });
2668        let (worktree_a, _) = project_a
2669            .update(&mut cx_a, |p, cx| {
2670                p.find_or_create_local_worktree("/root", false, cx)
2671            })
2672            .await
2673            .unwrap();
2674        worktree_a
2675            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2676            .await;
2677        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2678        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2679        project_a
2680            .update(&mut cx_a, |p, cx| p.share(cx))
2681            .await
2682            .unwrap();
2683
2684        // Join the worktree as client B.
2685        let project_b = Project::remote(
2686            project_id,
2687            client_b.clone(),
2688            client_b.user_store.clone(),
2689            lang_registry.clone(),
2690            fs.clone(),
2691            &mut cx_b.to_async(),
2692        )
2693        .await
2694        .unwrap();
2695
2696        let buffer_b1 = cx_b
2697            .background()
2698            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2699            .await
2700            .unwrap();
2701
2702        let definitions;
2703        let buffer_b2;
2704        if rng.gen() {
2705            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2706            buffer_b2 =
2707                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2708        } else {
2709            buffer_b2 =
2710                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2711            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2712        }
2713
2714        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2715            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2716                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2717                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2718            )))
2719        });
2720
2721        let buffer_b2 = buffer_b2.await.unwrap();
2722        let definitions = definitions.await.unwrap();
2723        assert_eq!(definitions.len(), 1);
2724        assert_eq!(definitions[0].target_buffer, buffer_b2);
2725    }
2726
2727    #[gpui::test(iterations = 10)]
2728    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2729        cx_a.foreground().forbid_parking();
2730
2731        // Connect to a server as 2 clients.
2732        let mut server = TestServer::start(cx_a.foreground()).await;
2733        let client_a = server.create_client(&mut cx_a, "user_a").await;
2734        let client_b = server.create_client(&mut cx_b, "user_b").await;
2735
2736        // Create an org that includes these 2 users.
2737        let db = &server.app_state.db;
2738        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2739        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2740            .await
2741            .unwrap();
2742        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2743            .await
2744            .unwrap();
2745
2746        // Create a channel that includes all the users.
2747        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2748        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2749            .await
2750            .unwrap();
2751        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2752            .await
2753            .unwrap();
2754        db.create_channel_message(
2755            channel_id,
2756            client_b.current_user_id(&cx_b),
2757            "hello A, it's B.",
2758            OffsetDateTime::now_utc(),
2759            1,
2760        )
2761        .await
2762        .unwrap();
2763
2764        let channels_a = cx_a
2765            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2766        channels_a
2767            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2768            .await;
2769        channels_a.read_with(&cx_a, |list, _| {
2770            assert_eq!(
2771                list.available_channels().unwrap(),
2772                &[ChannelDetails {
2773                    id: channel_id.to_proto(),
2774                    name: "test-channel".to_string()
2775                }]
2776            )
2777        });
2778        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2779            this.get_channel(channel_id.to_proto(), cx).unwrap()
2780        });
2781        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2782        channel_a
2783            .condition(&cx_a, |channel, _| {
2784                channel_messages(channel)
2785                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2786            })
2787            .await;
2788
2789        let channels_b = cx_b
2790            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2791        channels_b
2792            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2793            .await;
2794        channels_b.read_with(&cx_b, |list, _| {
2795            assert_eq!(
2796                list.available_channels().unwrap(),
2797                &[ChannelDetails {
2798                    id: channel_id.to_proto(),
2799                    name: "test-channel".to_string()
2800                }]
2801            )
2802        });
2803
2804        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2805            this.get_channel(channel_id.to_proto(), cx).unwrap()
2806        });
2807        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2808        channel_b
2809            .condition(&cx_b, |channel, _| {
2810                channel_messages(channel)
2811                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2812            })
2813            .await;
2814
2815        channel_a
2816            .update(&mut cx_a, |channel, cx| {
2817                channel
2818                    .send_message("oh, hi B.".to_string(), cx)
2819                    .unwrap()
2820                    .detach();
2821                let task = channel.send_message("sup".to_string(), cx).unwrap();
2822                assert_eq!(
2823                    channel_messages(channel),
2824                    &[
2825                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2826                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2827                        ("user_a".to_string(), "sup".to_string(), true)
2828                    ]
2829                );
2830                task
2831            })
2832            .await
2833            .unwrap();
2834
2835        channel_b
2836            .condition(&cx_b, |channel, _| {
2837                channel_messages(channel)
2838                    == [
2839                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2840                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2841                        ("user_a".to_string(), "sup".to_string(), false),
2842                    ]
2843            })
2844            .await;
2845
2846        assert_eq!(
2847            server
2848                .state()
2849                .await
2850                .channel(channel_id)
2851                .unwrap()
2852                .connection_ids
2853                .len(),
2854            2
2855        );
2856        cx_b.update(|_| drop(channel_b));
2857        server
2858            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2859            .await;
2860
2861        cx_a.update(|_| drop(channel_a));
2862        server
2863            .condition(|state| state.channel(channel_id).is_none())
2864            .await;
2865    }
2866
2867    #[gpui::test(iterations = 10)]
2868    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2869        cx_a.foreground().forbid_parking();
2870
2871        let mut server = TestServer::start(cx_a.foreground()).await;
2872        let client_a = server.create_client(&mut cx_a, "user_a").await;
2873
2874        let db = &server.app_state.db;
2875        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2876        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2877        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2878            .await
2879            .unwrap();
2880        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2881            .await
2882            .unwrap();
2883
2884        let channels_a = cx_a
2885            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2886        channels_a
2887            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2888            .await;
2889        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2890            this.get_channel(channel_id.to_proto(), cx).unwrap()
2891        });
2892
2893        // Messages aren't allowed to be too long.
2894        channel_a
2895            .update(&mut cx_a, |channel, cx| {
2896                let long_body = "this is long.\n".repeat(1024);
2897                channel.send_message(long_body, cx).unwrap()
2898            })
2899            .await
2900            .unwrap_err();
2901
2902        // Messages aren't allowed to be blank.
2903        channel_a.update(&mut cx_a, |channel, cx| {
2904            channel.send_message(String::new(), cx).unwrap_err()
2905        });
2906
2907        // Leading and trailing whitespace are trimmed.
2908        channel_a
2909            .update(&mut cx_a, |channel, cx| {
2910                channel
2911                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
2912                    .unwrap()
2913            })
2914            .await
2915            .unwrap();
2916        assert_eq!(
2917            db.get_channel_messages(channel_id, 10, None)
2918                .await
2919                .unwrap()
2920                .iter()
2921                .map(|m| &m.body)
2922                .collect::<Vec<_>>(),
2923            &["surrounded by whitespace"]
2924        );
2925    }
2926
2927    #[gpui::test(iterations = 10)]
2928    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2929        cx_a.foreground().forbid_parking();
2930
2931        // Connect to a server as 2 clients.
2932        let mut server = TestServer::start(cx_a.foreground()).await;
2933        let client_a = server.create_client(&mut cx_a, "user_a").await;
2934        let client_b = server.create_client(&mut cx_b, "user_b").await;
2935        let mut status_b = client_b.status();
2936
2937        // Create an org that includes these 2 users.
2938        let db = &server.app_state.db;
2939        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2940        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2941            .await
2942            .unwrap();
2943        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2944            .await
2945            .unwrap();
2946
2947        // Create a channel that includes all the users.
2948        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2949        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2950            .await
2951            .unwrap();
2952        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2953            .await
2954            .unwrap();
2955        db.create_channel_message(
2956            channel_id,
2957            client_b.current_user_id(&cx_b),
2958            "hello A, it's B.",
2959            OffsetDateTime::now_utc(),
2960            2,
2961        )
2962        .await
2963        .unwrap();
2964
2965        let channels_a = cx_a
2966            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2967        channels_a
2968            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2969            .await;
2970
2971        channels_a.read_with(&cx_a, |list, _| {
2972            assert_eq!(
2973                list.available_channels().unwrap(),
2974                &[ChannelDetails {
2975                    id: channel_id.to_proto(),
2976                    name: "test-channel".to_string()
2977                }]
2978            )
2979        });
2980        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2981            this.get_channel(channel_id.to_proto(), cx).unwrap()
2982        });
2983        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2984        channel_a
2985            .condition(&cx_a, |channel, _| {
2986                channel_messages(channel)
2987                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2988            })
2989            .await;
2990
2991        let channels_b = cx_b
2992            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2993        channels_b
2994            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2995            .await;
2996        channels_b.read_with(&cx_b, |list, _| {
2997            assert_eq!(
2998                list.available_channels().unwrap(),
2999                &[ChannelDetails {
3000                    id: channel_id.to_proto(),
3001                    name: "test-channel".to_string()
3002                }]
3003            )
3004        });
3005
3006        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3007            this.get_channel(channel_id.to_proto(), cx).unwrap()
3008        });
3009        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3010        channel_b
3011            .condition(&cx_b, |channel, _| {
3012                channel_messages(channel)
3013                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3014            })
3015            .await;
3016
3017        // Disconnect client B, ensuring we can still access its cached channel data.
3018        server.forbid_connections();
3019        server.disconnect_client(client_b.current_user_id(&cx_b));
3020        while !matches!(
3021            status_b.next().await,
3022            Some(client::Status::ReconnectionError { .. })
3023        ) {}
3024
3025        channels_b.read_with(&cx_b, |channels, _| {
3026            assert_eq!(
3027                channels.available_channels().unwrap(),
3028                [ChannelDetails {
3029                    id: channel_id.to_proto(),
3030                    name: "test-channel".to_string()
3031                }]
3032            )
3033        });
3034        channel_b.read_with(&cx_b, |channel, _| {
3035            assert_eq!(
3036                channel_messages(channel),
3037                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3038            )
3039        });
3040
3041        // Send a message from client B while it is disconnected.
3042        channel_b
3043            .update(&mut cx_b, |channel, cx| {
3044                let task = channel
3045                    .send_message("can you see this?".to_string(), cx)
3046                    .unwrap();
3047                assert_eq!(
3048                    channel_messages(channel),
3049                    &[
3050                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3051                        ("user_b".to_string(), "can you see this?".to_string(), true)
3052                    ]
3053                );
3054                task
3055            })
3056            .await
3057            .unwrap_err();
3058
3059        // Send a message from client A while B is disconnected.
3060        channel_a
3061            .update(&mut cx_a, |channel, cx| {
3062                channel
3063                    .send_message("oh, hi B.".to_string(), cx)
3064                    .unwrap()
3065                    .detach();
3066                let task = channel.send_message("sup".to_string(), cx).unwrap();
3067                assert_eq!(
3068                    channel_messages(channel),
3069                    &[
3070                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3071                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3072                        ("user_a".to_string(), "sup".to_string(), true)
3073                    ]
3074                );
3075                task
3076            })
3077            .await
3078            .unwrap();
3079
3080        // Give client B a chance to reconnect.
3081        server.allow_connections();
3082        cx_b.foreground().advance_clock(Duration::from_secs(10));
3083
3084        // Verify that B sees the new messages upon reconnection, as well as the message client B
3085        // sent while offline.
3086        channel_b
3087            .condition(&cx_b, |channel, _| {
3088                channel_messages(channel)
3089                    == [
3090                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3091                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3092                        ("user_a".to_string(), "sup".to_string(), false),
3093                        ("user_b".to_string(), "can you see this?".to_string(), false),
3094                    ]
3095            })
3096            .await;
3097
3098        // Ensure client A and B can communicate normally after reconnection.
3099        channel_a
3100            .update(&mut cx_a, |channel, cx| {
3101                channel.send_message("you online?".to_string(), cx).unwrap()
3102            })
3103            .await
3104            .unwrap();
3105        channel_b
3106            .condition(&cx_b, |channel, _| {
3107                channel_messages(channel)
3108                    == [
3109                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3110                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3111                        ("user_a".to_string(), "sup".to_string(), false),
3112                        ("user_b".to_string(), "can you see this?".to_string(), false),
3113                        ("user_a".to_string(), "you online?".to_string(), false),
3114                    ]
3115            })
3116            .await;
3117
3118        channel_b
3119            .update(&mut cx_b, |channel, cx| {
3120                channel.send_message("yep".to_string(), cx).unwrap()
3121            })
3122            .await
3123            .unwrap();
3124        channel_a
3125            .condition(&cx_a, |channel, _| {
3126                channel_messages(channel)
3127                    == [
3128                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3129                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3130                        ("user_a".to_string(), "sup".to_string(), false),
3131                        ("user_b".to_string(), "can you see this?".to_string(), false),
3132                        ("user_a".to_string(), "you online?".to_string(), false),
3133                        ("user_b".to_string(), "yep".to_string(), false),
3134                    ]
3135            })
3136            .await;
3137    }
3138
3139    #[gpui::test(iterations = 10)]
3140    async fn test_contacts(
3141        mut cx_a: TestAppContext,
3142        mut cx_b: TestAppContext,
3143        mut cx_c: TestAppContext,
3144    ) {
3145        cx_a.foreground().forbid_parking();
3146        let lang_registry = Arc::new(LanguageRegistry::new());
3147        let fs = Arc::new(FakeFs::new(cx_a.background()));
3148
3149        // Connect to a server as 3 clients.
3150        let mut server = TestServer::start(cx_a.foreground()).await;
3151        let client_a = server.create_client(&mut cx_a, "user_a").await;
3152        let client_b = server.create_client(&mut cx_b, "user_b").await;
3153        let client_c = server.create_client(&mut cx_c, "user_c").await;
3154
3155        // Share a worktree as client A.
3156        fs.insert_tree(
3157            "/a",
3158            json!({
3159                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3160            }),
3161        )
3162        .await;
3163
3164        let project_a = cx_a.update(|cx| {
3165            Project::local(
3166                client_a.clone(),
3167                client_a.user_store.clone(),
3168                lang_registry.clone(),
3169                fs.clone(),
3170                cx,
3171            )
3172        });
3173        let (worktree_a, _) = project_a
3174            .update(&mut cx_a, |p, cx| {
3175                p.find_or_create_local_worktree("/a", false, cx)
3176            })
3177            .await
3178            .unwrap();
3179        worktree_a
3180            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3181            .await;
3182
3183        client_a
3184            .user_store
3185            .condition(&cx_a, |user_store, _| {
3186                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3187            })
3188            .await;
3189        client_b
3190            .user_store
3191            .condition(&cx_b, |user_store, _| {
3192                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3193            })
3194            .await;
3195        client_c
3196            .user_store
3197            .condition(&cx_c, |user_store, _| {
3198                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3199            })
3200            .await;
3201
3202        let project_id = project_a
3203            .update(&mut cx_a, |project, _| project.next_remote_id())
3204            .await;
3205        project_a
3206            .update(&mut cx_a, |project, cx| project.share(cx))
3207            .await
3208            .unwrap();
3209
3210        let _project_b = Project::remote(
3211            project_id,
3212            client_b.clone(),
3213            client_b.user_store.clone(),
3214            lang_registry.clone(),
3215            fs.clone(),
3216            &mut cx_b.to_async(),
3217        )
3218        .await
3219        .unwrap();
3220
3221        client_a
3222            .user_store
3223            .condition(&cx_a, |user_store, _| {
3224                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3225            })
3226            .await;
3227        client_b
3228            .user_store
3229            .condition(&cx_b, |user_store, _| {
3230                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3231            })
3232            .await;
3233        client_c
3234            .user_store
3235            .condition(&cx_c, |user_store, _| {
3236                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3237            })
3238            .await;
3239
3240        project_a
3241            .condition(&cx_a, |project, _| {
3242                project.collaborators().contains_key(&client_b.peer_id)
3243            })
3244            .await;
3245
3246        cx_a.update(move |_| drop(project_a));
3247        client_a
3248            .user_store
3249            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3250            .await;
3251        client_b
3252            .user_store
3253            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3254            .await;
3255        client_c
3256            .user_store
3257            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3258            .await;
3259
3260        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3261            user_store
3262                .contacts()
3263                .iter()
3264                .map(|contact| {
3265                    let worktrees = contact
3266                        .projects
3267                        .iter()
3268                        .map(|p| {
3269                            (
3270                                p.worktree_root_names[0].as_str(),
3271                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3272                            )
3273                        })
3274                        .collect();
3275                    (contact.user.github_login.as_str(), worktrees)
3276                })
3277                .collect()
3278        }
3279    }
3280
3281    struct TestServer {
3282        peer: Arc<Peer>,
3283        app_state: Arc<AppState>,
3284        server: Arc<Server>,
3285        foreground: Rc<executor::Foreground>,
3286        notifications: mpsc::Receiver<()>,
3287        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3288        forbid_connections: Arc<AtomicBool>,
3289        _test_db: TestDb,
3290    }
3291
3292    impl TestServer {
3293        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3294            let test_db = TestDb::new();
3295            let app_state = Self::build_app_state(&test_db).await;
3296            let peer = Peer::new();
3297            let notifications = mpsc::channel(128);
3298            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3299            Self {
3300                peer,
3301                app_state,
3302                server,
3303                foreground,
3304                notifications: notifications.1,
3305                connection_killers: Default::default(),
3306                forbid_connections: Default::default(),
3307                _test_db: test_db,
3308            }
3309        }
3310
3311        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3312            let http = FakeHttpClient::with_404_response();
3313            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3314            let client_name = name.to_string();
3315            let mut client = Client::new(http.clone());
3316            let server = self.server.clone();
3317            let connection_killers = self.connection_killers.clone();
3318            let forbid_connections = self.forbid_connections.clone();
3319            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3320
3321            Arc::get_mut(&mut client)
3322                .unwrap()
3323                .override_authenticate(move |cx| {
3324                    cx.spawn(|_| async move {
3325                        let access_token = "the-token".to_string();
3326                        Ok(Credentials {
3327                            user_id: user_id.0 as u64,
3328                            access_token,
3329                        })
3330                    })
3331                })
3332                .override_establish_connection(move |credentials, cx| {
3333                    assert_eq!(credentials.user_id, user_id.0 as u64);
3334                    assert_eq!(credentials.access_token, "the-token");
3335
3336                    let server = server.clone();
3337                    let connection_killers = connection_killers.clone();
3338                    let forbid_connections = forbid_connections.clone();
3339                    let client_name = client_name.clone();
3340                    let connection_id_tx = connection_id_tx.clone();
3341                    cx.spawn(move |cx| async move {
3342                        if forbid_connections.load(SeqCst) {
3343                            Err(EstablishConnectionError::other(anyhow!(
3344                                "server is forbidding connections"
3345                            )))
3346                        } else {
3347                            let (client_conn, server_conn, kill_conn) =
3348                                Connection::in_memory(cx.background());
3349                            connection_killers.lock().insert(user_id, kill_conn);
3350                            cx.background()
3351                                .spawn(server.handle_connection(
3352                                    server_conn,
3353                                    client_name,
3354                                    user_id,
3355                                    Some(connection_id_tx),
3356                                ))
3357                                .detach();
3358                            Ok(client_conn)
3359                        }
3360                    })
3361                });
3362
3363            client
3364                .authenticate_and_connect(&cx.to_async())
3365                .await
3366                .unwrap();
3367
3368            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3369            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3370            let mut authed_user =
3371                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3372            while authed_user.next().await.unwrap().is_none() {}
3373
3374            TestClient {
3375                client,
3376                peer_id,
3377                user_store,
3378            }
3379        }
3380
3381        fn disconnect_client(&self, user_id: UserId) {
3382            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3383                let _ = kill_conn.try_send(Some(()));
3384            }
3385        }
3386
3387        fn forbid_connections(&self) {
3388            self.forbid_connections.store(true, SeqCst);
3389        }
3390
3391        fn allow_connections(&self) {
3392            self.forbid_connections.store(false, SeqCst);
3393        }
3394
3395        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3396            let mut config = Config::default();
3397            config.session_secret = "a".repeat(32);
3398            config.database_url = test_db.url.clone();
3399            let github_client = github::AppClient::test();
3400            Arc::new(AppState {
3401                db: test_db.db().clone(),
3402                handlebars: Default::default(),
3403                auth_client: auth::build_client("", ""),
3404                repo_client: github::RepoClient::test(&github_client),
3405                github_client,
3406                config,
3407            })
3408        }
3409
3410        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3411            self.server.store.read()
3412        }
3413
3414        async fn condition<F>(&mut self, mut predicate: F)
3415        where
3416            F: FnMut(&Store) -> bool,
3417        {
3418            async_std::future::timeout(Duration::from_millis(500), async {
3419                while !(predicate)(&*self.server.store.read()) {
3420                    self.foreground.start_waiting();
3421                    self.notifications.next().await;
3422                    self.foreground.finish_waiting();
3423                }
3424            })
3425            .await
3426            .expect("condition timed out");
3427        }
3428    }
3429
3430    impl Drop for TestServer {
3431        fn drop(&mut self) {
3432            self.peer.reset();
3433        }
3434    }
3435
3436    struct TestClient {
3437        client: Arc<Client>,
3438        pub peer_id: PeerId,
3439        pub user_store: ModelHandle<UserStore>,
3440    }
3441
3442    impl Deref for TestClient {
3443        type Target = Arc<Client>;
3444
3445        fn deref(&self) -> &Self::Target {
3446            &self.client
3447        }
3448    }
3449
3450    impl TestClient {
3451        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3452            UserId::from_proto(
3453                self.user_store
3454                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3455            )
3456        }
3457    }
3458
3459    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3460        channel
3461            .messages()
3462            .cursor::<()>()
3463            .map(|m| {
3464                (
3465                    m.sender.github_login.clone(),
3466                    m.body.clone(),
3467                    m.is_pending(),
3468                )
3469            })
3470            .collect()
3471    }
3472
3473    struct EmptyView;
3474
3475    impl gpui::Entity for EmptyView {
3476        type Event = ();
3477    }
3478
3479    impl gpui::View for EmptyView {
3480        fn ui_name() -> &'static str {
3481            "empty view"
3482        }
3483
3484        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3485            gpui::Element::boxed(gpui::elements::Empty)
3486        }
3487    }
3488}