rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_channels)
  89            .add_handler(Server::get_users)
  90            .add_handler(Server::join_channel)
  91            .add_handler(Server::leave_channel)
  92            .add_handler(Server::send_channel_message)
  93            .add_handler(Server::get_channel_messages);
  94
  95        Arc::new(server)
  96    }
  97
  98    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
  99    where
 100        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 101        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 102        M: EnvelopedMessage,
 103    {
 104        let prev_handler = self.handlers.insert(
 105            TypeId::of::<M>(),
 106            Box::new(move |server, envelope| {
 107                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 108                (handler)(server, *envelope).boxed()
 109            }),
 110        );
 111        if prev_handler.is_some() {
 112            panic!("registered a handler for the same message twice");
 113        }
 114        self
 115    }
 116
 117    pub fn handle_connection(
 118        self: &Arc<Self>,
 119        connection: Connection,
 120        addr: String,
 121        user_id: UserId,
 122        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 123    ) -> impl Future<Output = ()> {
 124        let mut this = self.clone();
 125        async move {
 126            let (connection_id, handle_io, mut incoming_rx) =
 127                this.peer.add_connection(connection).await;
 128
 129            if let Some(send_connection_id) = send_connection_id.as_mut() {
 130                let _ = send_connection_id.send(connection_id).await;
 131            }
 132
 133            this.state_mut().add_connection(connection_id, user_id);
 134            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 135                log::error!("error updating contacts for {:?}: {}", user_id, err);
 136            }
 137
 138            let handle_io = handle_io.fuse();
 139            futures::pin_mut!(handle_io);
 140            loop {
 141                let next_message = incoming_rx.next().fuse();
 142                futures::pin_mut!(next_message);
 143                futures::select_biased! {
 144                    result = handle_io => {
 145                        if let Err(err) = result {
 146                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 147                        }
 148                        break;
 149                    }
 150                    message = next_message => {
 151                        if let Some(message) = message {
 152                            let start_time = Instant::now();
 153                            log::info!("RPC message received: {}", message.payload_type_name());
 154                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 155                                if let Err(err) = (handler)(this.clone(), message).await {
 156                                    log::error!("error handling message: {:?}", err);
 157                                } else {
 158                                    log::info!("RPC message handled. duration:{:?}", start_time.elapsed());
 159                                }
 160
 161                                if let Some(mut notifications) = this.notifications.clone() {
 162                                    let _ = notifications.send(()).await;
 163                                }
 164                            } else {
 165                                log::warn!("unhandled message: {}", message.payload_type_name());
 166                            }
 167                        } else {
 168                            log::info!("rpc connection closed {:?}", addr);
 169                            break;
 170                        }
 171                    }
 172                }
 173            }
 174
 175            if let Err(err) = this.sign_out(connection_id).await {
 176                log::error!("error signing out connection {:?} - {:?}", addr, err);
 177            }
 178        }
 179    }
 180
 181    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 182        self.peer.disconnect(connection_id);
 183        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 184
 185        for (project_id, project) in removed_connection.hosted_projects {
 186            if let Some(share) = project.share {
 187                broadcast(
 188                    connection_id,
 189                    share.guests.keys().copied().collect(),
 190                    |conn_id| {
 191                        self.peer
 192                            .send(conn_id, proto::UnshareProject { project_id })
 193                    },
 194                )?;
 195            }
 196        }
 197
 198        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 199            broadcast(connection_id, peer_ids, |conn_id| {
 200                self.peer.send(
 201                    conn_id,
 202                    proto::RemoveProjectCollaborator {
 203                        project_id,
 204                        peer_id: connection_id.0,
 205                    },
 206                )
 207            })?;
 208        }
 209
 210        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 211        Ok(())
 212    }
 213
 214    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 215        self.peer.respond(request.receipt(), proto::Ack {})?;
 216        Ok(())
 217    }
 218
 219    async fn register_project(
 220        mut self: Arc<Server>,
 221        request: TypedEnvelope<proto::RegisterProject>,
 222    ) -> tide::Result<()> {
 223        let project_id = {
 224            let mut state = self.state_mut();
 225            let user_id = state.user_id_for_connection(request.sender_id)?;
 226            state.register_project(request.sender_id, user_id)
 227        };
 228        self.peer.respond(
 229            request.receipt(),
 230            proto::RegisterProjectResponse { project_id },
 231        )?;
 232        Ok(())
 233    }
 234
 235    async fn unregister_project(
 236        mut self: Arc<Server>,
 237        request: TypedEnvelope<proto::UnregisterProject>,
 238    ) -> tide::Result<()> {
 239        let project = self
 240            .state_mut()
 241            .unregister_project(request.payload.project_id, request.sender_id)
 242            .ok_or_else(|| anyhow!("no such project"))?;
 243        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 244        Ok(())
 245    }
 246
 247    async fn share_project(
 248        mut self: Arc<Server>,
 249        request: TypedEnvelope<proto::ShareProject>,
 250    ) -> tide::Result<()> {
 251        self.state_mut()
 252            .share_project(request.payload.project_id, request.sender_id);
 253        self.peer.respond(request.receipt(), proto::Ack {})?;
 254        Ok(())
 255    }
 256
 257    async fn unshare_project(
 258        mut self: Arc<Server>,
 259        request: TypedEnvelope<proto::UnshareProject>,
 260    ) -> tide::Result<()> {
 261        let project_id = request.payload.project_id;
 262        let project = self
 263            .state_mut()
 264            .unshare_project(project_id, request.sender_id)?;
 265
 266        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 267            self.peer
 268                .send(conn_id, proto::UnshareProject { project_id })
 269        })?;
 270        self.update_contacts_for_users(&project.authorized_user_ids)?;
 271        Ok(())
 272    }
 273
 274    async fn join_project(
 275        mut self: Arc<Server>,
 276        request: TypedEnvelope<proto::JoinProject>,
 277    ) -> tide::Result<()> {
 278        let project_id = request.payload.project_id;
 279
 280        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 281        let response_data = self
 282            .state_mut()
 283            .join_project(request.sender_id, user_id, project_id)
 284            .and_then(|joined| {
 285                let share = joined.project.share()?;
 286                let peer_count = share.guests.len();
 287                let mut collaborators = Vec::with_capacity(peer_count);
 288                collaborators.push(proto::Collaborator {
 289                    peer_id: joined.project.host_connection_id.0,
 290                    replica_id: 0,
 291                    user_id: joined.project.host_user_id.to_proto(),
 292                });
 293                let worktrees = joined
 294                    .project
 295                    .worktrees
 296                    .iter()
 297                    .filter_map(|(id, worktree)| {
 298                        worktree.share.as_ref().map(|share| proto::Worktree {
 299                            id: *id,
 300                            root_name: worktree.root_name.clone(),
 301                            entries: share.entries.values().cloned().collect(),
 302                            diagnostic_summaries: share
 303                                .diagnostic_summaries
 304                                .values()
 305                                .cloned()
 306                                .collect(),
 307                            weak: worktree.weak,
 308                        })
 309                    })
 310                    .collect();
 311                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 312                    if *peer_conn_id != request.sender_id {
 313                        collaborators.push(proto::Collaborator {
 314                            peer_id: peer_conn_id.0,
 315                            replica_id: *peer_replica_id as u32,
 316                            user_id: peer_user_id.to_proto(),
 317                        });
 318                    }
 319                }
 320                let response = proto::JoinProjectResponse {
 321                    worktrees,
 322                    replica_id: joined.replica_id as u32,
 323                    collaborators,
 324                };
 325                let connection_ids = joined.project.connection_ids();
 326                let contact_user_ids = joined.project.authorized_user_ids();
 327                Ok((response, connection_ids, contact_user_ids))
 328            });
 329
 330        match response_data {
 331            Ok((response, connection_ids, contact_user_ids)) => {
 332                broadcast(request.sender_id, connection_ids, |conn_id| {
 333                    self.peer.send(
 334                        conn_id,
 335                        proto::AddProjectCollaborator {
 336                            project_id,
 337                            collaborator: Some(proto::Collaborator {
 338                                peer_id: request.sender_id.0,
 339                                replica_id: response.replica_id,
 340                                user_id: user_id.to_proto(),
 341                            }),
 342                        },
 343                    )
 344                })?;
 345                self.peer.respond(request.receipt(), response)?;
 346                self.update_contacts_for_users(&contact_user_ids)?;
 347            }
 348            Err(error) => {
 349                self.peer.respond_with_error(
 350                    request.receipt(),
 351                    proto::Error {
 352                        message: error.to_string(),
 353                    },
 354                )?;
 355            }
 356        }
 357
 358        Ok(())
 359    }
 360
 361    async fn leave_project(
 362        mut self: Arc<Server>,
 363        request: TypedEnvelope<proto::LeaveProject>,
 364    ) -> tide::Result<()> {
 365        let sender_id = request.sender_id;
 366        let project_id = request.payload.project_id;
 367        let worktree = self.state_mut().leave_project(sender_id, project_id);
 368        if let Some(worktree) = worktree {
 369            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 370                self.peer.send(
 371                    conn_id,
 372                    proto::RemoveProjectCollaborator {
 373                        project_id,
 374                        peer_id: sender_id.0,
 375                    },
 376                )
 377            })?;
 378            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 379        }
 380        Ok(())
 381    }
 382
 383    async fn register_worktree(
 384        mut self: Arc<Server>,
 385        request: TypedEnvelope<proto::RegisterWorktree>,
 386    ) -> tide::Result<()> {
 387        let receipt = request.receipt();
 388        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 389
 390        let mut contact_user_ids = HashSet::default();
 391        contact_user_ids.insert(host_user_id);
 392        for github_login in request.payload.authorized_logins {
 393            match self.app_state.db.create_user(&github_login, false).await {
 394                Ok(contact_user_id) => {
 395                    contact_user_ids.insert(contact_user_id);
 396                }
 397                Err(err) => {
 398                    let message = err.to_string();
 399                    self.peer
 400                        .respond_with_error(receipt, proto::Error { message })?;
 401                    return Ok(());
 402                }
 403            }
 404        }
 405
 406        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 407        let ok = self.state_mut().register_worktree(
 408            request.payload.project_id,
 409            request.payload.worktree_id,
 410            Worktree {
 411                authorized_user_ids: contact_user_ids.clone(),
 412                root_name: request.payload.root_name,
 413                share: None,
 414                weak: false,
 415            },
 416        );
 417
 418        if ok {
 419            self.peer.respond(receipt, proto::Ack {})?;
 420            self.update_contacts_for_users(&contact_user_ids)?;
 421        } else {
 422            self.peer.respond_with_error(
 423                receipt,
 424                proto::Error {
 425                    message: NO_SUCH_PROJECT.to_string(),
 426                },
 427            )?;
 428        }
 429
 430        Ok(())
 431    }
 432
 433    async fn unregister_worktree(
 434        mut self: Arc<Server>,
 435        request: TypedEnvelope<proto::UnregisterWorktree>,
 436    ) -> tide::Result<()> {
 437        let project_id = request.payload.project_id;
 438        let worktree_id = request.payload.worktree_id;
 439        let (worktree, guest_connection_ids) =
 440            self.state_mut()
 441                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 442        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 443            self.peer.send(
 444                conn_id,
 445                proto::UnregisterWorktree {
 446                    project_id,
 447                    worktree_id,
 448                },
 449            )
 450        })?;
 451        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 452        Ok(())
 453    }
 454
 455    async fn share_worktree(
 456        mut self: Arc<Server>,
 457        mut request: TypedEnvelope<proto::ShareWorktree>,
 458    ) -> tide::Result<()> {
 459        let worktree = request
 460            .payload
 461            .worktree
 462            .as_mut()
 463            .ok_or_else(|| anyhow!("missing worktree"))?;
 464        let entries = worktree
 465            .entries
 466            .iter()
 467            .map(|entry| (entry.id, entry.clone()))
 468            .collect();
 469        let diagnostic_summaries = worktree
 470            .diagnostic_summaries
 471            .iter()
 472            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 473            .collect();
 474
 475        let shared_worktree = self.state_mut().share_worktree(
 476            request.payload.project_id,
 477            worktree.id,
 478            request.sender_id,
 479            entries,
 480            diagnostic_summaries,
 481        );
 482        if let Some(shared_worktree) = shared_worktree {
 483            broadcast(
 484                request.sender_id,
 485                shared_worktree.connection_ids,
 486                |connection_id| {
 487                    self.peer.forward_send(
 488                        request.sender_id,
 489                        connection_id,
 490                        request.payload.clone(),
 491                    )
 492                },
 493            )?;
 494            self.peer.respond(request.receipt(), proto::Ack {})?;
 495            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 496        } else {
 497            self.peer.respond_with_error(
 498                request.receipt(),
 499                proto::Error {
 500                    message: "no such worktree".to_string(),
 501                },
 502            )?;
 503        }
 504        Ok(())
 505    }
 506
 507    async fn update_worktree(
 508        mut self: Arc<Server>,
 509        request: TypedEnvelope<proto::UpdateWorktree>,
 510    ) -> tide::Result<()> {
 511        let connection_ids = self
 512            .state_mut()
 513            .update_worktree(
 514                request.sender_id,
 515                request.payload.project_id,
 516                request.payload.worktree_id,
 517                &request.payload.removed_entries,
 518                &request.payload.updated_entries,
 519            )
 520            .ok_or_else(|| anyhow!("no such worktree"))?;
 521
 522        broadcast(request.sender_id, connection_ids, |connection_id| {
 523            self.peer
 524                .forward_send(request.sender_id, connection_id, request.payload.clone())
 525        })?;
 526
 527        Ok(())
 528    }
 529
 530    async fn update_diagnostic_summary(
 531        mut self: Arc<Server>,
 532        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 533    ) -> tide::Result<()> {
 534        let receiver_ids = request
 535            .payload
 536            .summary
 537            .clone()
 538            .and_then(|summary| {
 539                self.state_mut().update_diagnostic_summary(
 540                    request.payload.project_id,
 541                    request.payload.worktree_id,
 542                    request.sender_id,
 543                    summary,
 544                )
 545            })
 546            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 547
 548        broadcast(request.sender_id, receiver_ids, |connection_id| {
 549            self.peer
 550                .forward_send(request.sender_id, connection_id, request.payload.clone())
 551        })?;
 552        Ok(())
 553    }
 554
 555    async fn disk_based_diagnostics_updating(
 556        self: Arc<Server>,
 557        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 558    ) -> tide::Result<()> {
 559        let receiver_ids = self
 560            .state()
 561            .project_connection_ids(request.payload.project_id, request.sender_id)
 562            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 563        broadcast(request.sender_id, receiver_ids, |connection_id| {
 564            self.peer
 565                .forward_send(request.sender_id, connection_id, request.payload.clone())
 566        })?;
 567        Ok(())
 568    }
 569
 570    async fn disk_based_diagnostics_updated(
 571        self: Arc<Server>,
 572        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 573    ) -> tide::Result<()> {
 574        let receiver_ids = self
 575            .state()
 576            .project_connection_ids(request.payload.project_id, request.sender_id)
 577            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 578        broadcast(request.sender_id, receiver_ids, |connection_id| {
 579            self.peer
 580                .forward_send(request.sender_id, connection_id, request.payload.clone())
 581        })?;
 582        Ok(())
 583    }
 584
 585    async fn get_definition(
 586        self: Arc<Server>,
 587        request: TypedEnvelope<proto::GetDefinition>,
 588    ) -> tide::Result<()> {
 589        let receipt = request.receipt();
 590        let host_connection_id = self
 591            .state()
 592            .read_project(request.payload.project_id, request.sender_id)
 593            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 594            .host_connection_id;
 595        let response = self
 596            .peer
 597            .forward_request(request.sender_id, host_connection_id, request.payload)
 598            .await?;
 599        self.peer.respond(receipt, response)?;
 600        Ok(())
 601    }
 602
 603    async fn open_buffer(
 604        self: Arc<Server>,
 605        request: TypedEnvelope<proto::OpenBuffer>,
 606    ) -> tide::Result<()> {
 607        let receipt = request.receipt();
 608        let host_connection_id = self
 609            .state()
 610            .read_project(request.payload.project_id, request.sender_id)
 611            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 612            .host_connection_id;
 613        let response = self
 614            .peer
 615            .forward_request(request.sender_id, host_connection_id, request.payload)
 616            .await?;
 617        self.peer.respond(receipt, response)?;
 618        Ok(())
 619    }
 620
 621    async fn close_buffer(
 622        self: Arc<Server>,
 623        request: TypedEnvelope<proto::CloseBuffer>,
 624    ) -> tide::Result<()> {
 625        let host_connection_id = self
 626            .state()
 627            .read_project(request.payload.project_id, request.sender_id)
 628            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 629            .host_connection_id;
 630        self.peer
 631            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 632        Ok(())
 633    }
 634
 635    async fn save_buffer(
 636        self: Arc<Server>,
 637        request: TypedEnvelope<proto::SaveBuffer>,
 638    ) -> tide::Result<()> {
 639        let host;
 640        let guests;
 641        {
 642            let state = self.state();
 643            let project = state
 644                .read_project(request.payload.project_id, request.sender_id)
 645                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 646            host = project.host_connection_id;
 647            guests = project.guest_connection_ids()
 648        }
 649
 650        let sender = request.sender_id;
 651        let receipt = request.receipt();
 652        let response = self
 653            .peer
 654            .forward_request(sender, host, request.payload.clone())
 655            .await?;
 656
 657        broadcast(host, guests, |conn_id| {
 658            let response = response.clone();
 659            if conn_id == sender {
 660                self.peer.respond(receipt, response)
 661            } else {
 662                self.peer.forward_send(host, conn_id, response)
 663            }
 664        })?;
 665
 666        Ok(())
 667    }
 668
 669    async fn format_buffer(
 670        self: Arc<Server>,
 671        request: TypedEnvelope<proto::FormatBuffer>,
 672    ) -> tide::Result<()> {
 673        let host;
 674        {
 675            let state = self.state();
 676            let project = state
 677                .read_project(request.payload.project_id, request.sender_id)
 678                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 679            host = project.host_connection_id;
 680        }
 681
 682        let sender = request.sender_id;
 683        let receipt = request.receipt();
 684        let response = self
 685            .peer
 686            .forward_request(sender, host, request.payload.clone())
 687            .await?;
 688        self.peer.respond(receipt, response)?;
 689
 690        Ok(())
 691    }
 692
 693    async fn get_completions(
 694        self: Arc<Server>,
 695        request: TypedEnvelope<proto::GetCompletions>,
 696    ) -> tide::Result<()> {
 697        let host;
 698        {
 699            let state = self.state();
 700            let project = state
 701                .read_project(request.payload.project_id, request.sender_id)
 702                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 703            host = project.host_connection_id;
 704        }
 705
 706        let sender = request.sender_id;
 707        let receipt = request.receipt();
 708        let response = self
 709            .peer
 710            .forward_request(sender, host, request.payload.clone())
 711            .await?;
 712        self.peer.respond(receipt, response)?;
 713        Ok(())
 714    }
 715
 716    async fn apply_additional_edits_for_completion(
 717        self: Arc<Server>,
 718        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 719    ) -> tide::Result<()> {
 720        let host;
 721        {
 722            let state = self.state();
 723            let project = state
 724                .read_project(request.payload.project_id, request.sender_id)
 725                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 726            host = project.host_connection_id;
 727        }
 728
 729        let sender = request.sender_id;
 730        let receipt = request.receipt();
 731        let response = self
 732            .peer
 733            .forward_request(sender, host, request.payload.clone())
 734            .await?;
 735        self.peer.respond(receipt, response)?;
 736        Ok(())
 737    }
 738
 739    async fn update_buffer(
 740        self: Arc<Server>,
 741        request: TypedEnvelope<proto::UpdateBuffer>,
 742    ) -> tide::Result<()> {
 743        let receiver_ids = self
 744            .state()
 745            .project_connection_ids(request.payload.project_id, request.sender_id)
 746            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 747        broadcast(request.sender_id, receiver_ids, |connection_id| {
 748            self.peer
 749                .forward_send(request.sender_id, connection_id, request.payload.clone())
 750        })?;
 751        self.peer.respond(request.receipt(), proto::Ack {})?;
 752        Ok(())
 753    }
 754
 755    async fn update_buffer_file(
 756        self: Arc<Server>,
 757        request: TypedEnvelope<proto::UpdateBufferFile>,
 758    ) -> tide::Result<()> {
 759        let receiver_ids = self
 760            .state()
 761            .project_connection_ids(request.payload.project_id, request.sender_id)
 762            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 763        broadcast(request.sender_id, receiver_ids, |connection_id| {
 764            self.peer
 765                .forward_send(request.sender_id, connection_id, request.payload.clone())
 766        })?;
 767        Ok(())
 768    }
 769
 770    async fn buffer_reloaded(
 771        self: Arc<Server>,
 772        request: TypedEnvelope<proto::BufferReloaded>,
 773    ) -> tide::Result<()> {
 774        let receiver_ids = self
 775            .state()
 776            .project_connection_ids(request.payload.project_id, request.sender_id)
 777            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 778        broadcast(request.sender_id, receiver_ids, |connection_id| {
 779            self.peer
 780                .forward_send(request.sender_id, connection_id, request.payload.clone())
 781        })?;
 782        Ok(())
 783    }
 784
 785    async fn buffer_saved(
 786        self: Arc<Server>,
 787        request: TypedEnvelope<proto::BufferSaved>,
 788    ) -> tide::Result<()> {
 789        let receiver_ids = self
 790            .state()
 791            .project_connection_ids(request.payload.project_id, request.sender_id)
 792            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 793        broadcast(request.sender_id, receiver_ids, |connection_id| {
 794            self.peer
 795                .forward_send(request.sender_id, connection_id, request.payload.clone())
 796        })?;
 797        Ok(())
 798    }
 799
 800    async fn get_channels(
 801        self: Arc<Server>,
 802        request: TypedEnvelope<proto::GetChannels>,
 803    ) -> tide::Result<()> {
 804        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 805        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 806        self.peer.respond(
 807            request.receipt(),
 808            proto::GetChannelsResponse {
 809                channels: channels
 810                    .into_iter()
 811                    .map(|chan| proto::Channel {
 812                        id: chan.id.to_proto(),
 813                        name: chan.name,
 814                    })
 815                    .collect(),
 816            },
 817        )?;
 818        Ok(())
 819    }
 820
 821    async fn get_users(
 822        self: Arc<Server>,
 823        request: TypedEnvelope<proto::GetUsers>,
 824    ) -> tide::Result<()> {
 825        let receipt = request.receipt();
 826        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 827        let users = self
 828            .app_state
 829            .db
 830            .get_users_by_ids(user_ids)
 831            .await?
 832            .into_iter()
 833            .map(|user| proto::User {
 834                id: user.id.to_proto(),
 835                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 836                github_login: user.github_login,
 837            })
 838            .collect();
 839        self.peer
 840            .respond(receipt, proto::GetUsersResponse { users })?;
 841        Ok(())
 842    }
 843
 844    fn update_contacts_for_users<'a>(
 845        self: &Arc<Server>,
 846        user_ids: impl IntoIterator<Item = &'a UserId>,
 847    ) -> anyhow::Result<()> {
 848        let mut result = Ok(());
 849        let state = self.state();
 850        for user_id in user_ids {
 851            let contacts = state.contacts_for_user(*user_id);
 852            for connection_id in state.connection_ids_for_user(*user_id) {
 853                if let Err(error) = self.peer.send(
 854                    connection_id,
 855                    proto::UpdateContacts {
 856                        contacts: contacts.clone(),
 857                    },
 858                ) {
 859                    result = Err(error);
 860                }
 861            }
 862        }
 863        result
 864    }
 865
 866    async fn join_channel(
 867        mut self: Arc<Self>,
 868        request: TypedEnvelope<proto::JoinChannel>,
 869    ) -> tide::Result<()> {
 870        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 871        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 872        if !self
 873            .app_state
 874            .db
 875            .can_user_access_channel(user_id, channel_id)
 876            .await?
 877        {
 878            Err(anyhow!("access denied"))?;
 879        }
 880
 881        self.state_mut().join_channel(request.sender_id, channel_id);
 882        let messages = self
 883            .app_state
 884            .db
 885            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 886            .await?
 887            .into_iter()
 888            .map(|msg| proto::ChannelMessage {
 889                id: msg.id.to_proto(),
 890                body: msg.body,
 891                timestamp: msg.sent_at.unix_timestamp() as u64,
 892                sender_id: msg.sender_id.to_proto(),
 893                nonce: Some(msg.nonce.as_u128().into()),
 894            })
 895            .collect::<Vec<_>>();
 896        self.peer.respond(
 897            request.receipt(),
 898            proto::JoinChannelResponse {
 899                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 900                messages,
 901            },
 902        )?;
 903        Ok(())
 904    }
 905
 906    async fn leave_channel(
 907        mut self: Arc<Self>,
 908        request: TypedEnvelope<proto::LeaveChannel>,
 909    ) -> tide::Result<()> {
 910        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 911        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 912        if !self
 913            .app_state
 914            .db
 915            .can_user_access_channel(user_id, channel_id)
 916            .await?
 917        {
 918            Err(anyhow!("access denied"))?;
 919        }
 920
 921        self.state_mut()
 922            .leave_channel(request.sender_id, channel_id);
 923
 924        Ok(())
 925    }
 926
 927    async fn send_channel_message(
 928        self: Arc<Self>,
 929        request: TypedEnvelope<proto::SendChannelMessage>,
 930    ) -> tide::Result<()> {
 931        let receipt = request.receipt();
 932        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 933        let user_id;
 934        let connection_ids;
 935        {
 936            let state = self.state();
 937            user_id = state.user_id_for_connection(request.sender_id)?;
 938            if let Some(ids) = state.channel_connection_ids(channel_id) {
 939                connection_ids = ids;
 940            } else {
 941                return Ok(());
 942            }
 943        }
 944
 945        // Validate the message body.
 946        let body = request.payload.body.trim().to_string();
 947        if body.len() > MAX_MESSAGE_LEN {
 948            self.peer.respond_with_error(
 949                receipt,
 950                proto::Error {
 951                    message: "message is too long".to_string(),
 952                },
 953            )?;
 954            return Ok(());
 955        }
 956        if body.is_empty() {
 957            self.peer.respond_with_error(
 958                receipt,
 959                proto::Error {
 960                    message: "message can't be blank".to_string(),
 961                },
 962            )?;
 963            return Ok(());
 964        }
 965
 966        let timestamp = OffsetDateTime::now_utc();
 967        let nonce = if let Some(nonce) = request.payload.nonce {
 968            nonce
 969        } else {
 970            self.peer.respond_with_error(
 971                receipt,
 972                proto::Error {
 973                    message: "nonce can't be blank".to_string(),
 974                },
 975            )?;
 976            return Ok(());
 977        };
 978
 979        let message_id = self
 980            .app_state
 981            .db
 982            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
 983            .await?
 984            .to_proto();
 985        let message = proto::ChannelMessage {
 986            sender_id: user_id.to_proto(),
 987            id: message_id,
 988            body,
 989            timestamp: timestamp.unix_timestamp() as u64,
 990            nonce: Some(nonce),
 991        };
 992        broadcast(request.sender_id, connection_ids, |conn_id| {
 993            self.peer.send(
 994                conn_id,
 995                proto::ChannelMessageSent {
 996                    channel_id: channel_id.to_proto(),
 997                    message: Some(message.clone()),
 998                },
 999            )
1000        })?;
1001        self.peer.respond(
1002            receipt,
1003            proto::SendChannelMessageResponse {
1004                message: Some(message),
1005            },
1006        )?;
1007        Ok(())
1008    }
1009
1010    async fn get_channel_messages(
1011        self: Arc<Self>,
1012        request: TypedEnvelope<proto::GetChannelMessages>,
1013    ) -> tide::Result<()> {
1014        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1015        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1016        if !self
1017            .app_state
1018            .db
1019            .can_user_access_channel(user_id, channel_id)
1020            .await?
1021        {
1022            Err(anyhow!("access denied"))?;
1023        }
1024
1025        let messages = self
1026            .app_state
1027            .db
1028            .get_channel_messages(
1029                channel_id,
1030                MESSAGE_COUNT_PER_PAGE,
1031                Some(MessageId::from_proto(request.payload.before_message_id)),
1032            )
1033            .await?
1034            .into_iter()
1035            .map(|msg| proto::ChannelMessage {
1036                id: msg.id.to_proto(),
1037                body: msg.body,
1038                timestamp: msg.sent_at.unix_timestamp() as u64,
1039                sender_id: msg.sender_id.to_proto(),
1040                nonce: Some(msg.nonce.as_u128().into()),
1041            })
1042            .collect::<Vec<_>>();
1043        self.peer.respond(
1044            request.receipt(),
1045            proto::GetChannelMessagesResponse {
1046                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1047                messages,
1048            },
1049        )?;
1050        Ok(())
1051    }
1052
1053    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1054        self.store.read()
1055    }
1056
1057    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1058        self.store.write()
1059    }
1060}
1061
1062fn broadcast<F>(
1063    sender_id: ConnectionId,
1064    receiver_ids: Vec<ConnectionId>,
1065    mut f: F,
1066) -> anyhow::Result<()>
1067where
1068    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1069{
1070    let mut result = Ok(());
1071    for receiver_id in receiver_ids {
1072        if receiver_id != sender_id {
1073            if let Err(error) = f(receiver_id) {
1074                if result.is_ok() {
1075                    result = Err(error);
1076                }
1077            }
1078        }
1079    }
1080    result
1081}
1082
1083pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1084    let server = Server::new(app.state().clone(), rpc.clone(), None);
1085    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1086        let server = server.clone();
1087        async move {
1088            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1089
1090            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1091            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1092            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1093            let client_protocol_version: Option<u32> = request
1094                .header("X-Zed-Protocol-Version")
1095                .and_then(|v| v.as_str().parse().ok());
1096
1097            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1098                return Ok(Response::new(StatusCode::UpgradeRequired));
1099            }
1100
1101            let header = match request.header("Sec-Websocket-Key") {
1102                Some(h) => h.as_str(),
1103                None => return Err(anyhow!("expected sec-websocket-key"))?,
1104            };
1105
1106            let user_id = process_auth_header(&request).await?;
1107
1108            let mut response = Response::new(StatusCode::SwitchingProtocols);
1109            response.insert_header(UPGRADE, "websocket");
1110            response.insert_header(CONNECTION, "Upgrade");
1111            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1112            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1113            response.insert_header("Sec-Websocket-Version", "13");
1114
1115            let http_res: &mut tide::http::Response = response.as_mut();
1116            let upgrade_receiver = http_res.recv_upgrade().await;
1117            let addr = request.remote().unwrap_or("unknown").to_string();
1118            task::spawn(async move {
1119                if let Some(stream) = upgrade_receiver.await {
1120                    server
1121                        .handle_connection(
1122                            Connection::new(
1123                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1124                            ),
1125                            addr,
1126                            user_id,
1127                            None,
1128                        )
1129                        .await;
1130                }
1131            });
1132
1133            Ok(response)
1134        }
1135    });
1136}
1137
1138fn header_contains_ignore_case<T>(
1139    request: &tide::Request<T>,
1140    header_name: HeaderName,
1141    value: &str,
1142) -> bool {
1143    request
1144        .header(header_name)
1145        .map(|h| {
1146            h.as_str()
1147                .split(',')
1148                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1149        })
1150        .unwrap_or(false)
1151}
1152
1153#[cfg(test)]
1154mod tests {
1155    use super::*;
1156    use crate::{
1157        auth,
1158        db::{tests::TestDb, UserId},
1159        github, AppState, Config,
1160    };
1161    use ::rpc::Peer;
1162    use async_std::task;
1163    use gpui::{executor, ModelHandle, TestAppContext};
1164    use parking_lot::Mutex;
1165    use postage::{mpsc, watch};
1166    use rand::prelude::*;
1167    use rpc::PeerId;
1168    use serde_json::json;
1169    use sqlx::types::time::OffsetDateTime;
1170    use std::{
1171        ops::Deref,
1172        path::Path,
1173        rc::Rc,
1174        sync::{
1175            atomic::{AtomicBool, Ordering::SeqCst},
1176            Arc,
1177        },
1178        time::Duration,
1179    };
1180    use zed::{
1181        client::{
1182            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1183            EstablishConnectionError, UserStore,
1184        },
1185        editor::{Editor, EditorSettings, Input, MultiBuffer},
1186        fs::{FakeFs, Fs as _},
1187        language::{
1188            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1189            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1190        },
1191        lsp,
1192        project::{DiagnosticSummary, Project, ProjectPath},
1193    };
1194
1195    #[gpui::test]
1196    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1197        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1198        let lang_registry = Arc::new(LanguageRegistry::new());
1199        let fs = Arc::new(FakeFs::new(cx_a.background()));
1200        cx_a.foreground().forbid_parking();
1201
1202        // Connect to a server as 2 clients.
1203        let mut server = TestServer::start(cx_a.foreground()).await;
1204        let client_a = server.create_client(&mut cx_a, "user_a").await;
1205        let client_b = server.create_client(&mut cx_b, "user_b").await;
1206
1207        // Share a project as client A
1208        fs.insert_tree(
1209            "/a",
1210            json!({
1211                ".zed.toml": r#"collaborators = ["user_b"]"#,
1212                "a.txt": "a-contents",
1213                "b.txt": "b-contents",
1214            }),
1215        )
1216        .await;
1217        let project_a = cx_a.update(|cx| {
1218            Project::local(
1219                client_a.clone(),
1220                client_a.user_store.clone(),
1221                lang_registry.clone(),
1222                fs.clone(),
1223                cx,
1224            )
1225        });
1226        let (worktree_a, _) = project_a
1227            .update(&mut cx_a, |p, cx| {
1228                p.find_or_create_local_worktree("/a", false, cx)
1229            })
1230            .await
1231            .unwrap();
1232        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1233        worktree_a
1234            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1235            .await;
1236        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1237        project_a
1238            .update(&mut cx_a, |p, cx| p.share(cx))
1239            .await
1240            .unwrap();
1241
1242        // Join that project as client B
1243        let project_b = Project::remote(
1244            project_id,
1245            client_b.clone(),
1246            client_b.user_store.clone(),
1247            lang_registry.clone(),
1248            fs.clone(),
1249            &mut cx_b.to_async(),
1250        )
1251        .await
1252        .unwrap();
1253
1254        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1255            assert_eq!(
1256                project
1257                    .collaborators()
1258                    .get(&client_a.peer_id)
1259                    .unwrap()
1260                    .user
1261                    .github_login,
1262                "user_a"
1263            );
1264            project.replica_id()
1265        });
1266        project_a
1267            .condition(&cx_a, |tree, _| {
1268                tree.collaborators()
1269                    .get(&client_b.peer_id)
1270                    .map_or(false, |collaborator| {
1271                        collaborator.replica_id == replica_id_b
1272                            && collaborator.user.github_login == "user_b"
1273                    })
1274            })
1275            .await;
1276
1277        // Open the same file as client B and client A.
1278        let buffer_b = project_b
1279            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1280            .await
1281            .unwrap();
1282        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1283        buffer_b.read_with(&cx_b, |buf, cx| {
1284            assert_eq!(buf.read(cx).text(), "b-contents")
1285        });
1286        project_a.read_with(&cx_a, |project, cx| {
1287            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1288        });
1289        let buffer_a = project_a
1290            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1291            .await
1292            .unwrap();
1293
1294        let editor_b = cx_b.add_view(window_b, |cx| {
1295            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1296        });
1297
1298        // TODO
1299        // // Create a selection set as client B and see that selection set as client A.
1300        // buffer_a
1301        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1302        //     .await;
1303
1304        // Edit the buffer as client B and see that edit as client A.
1305        editor_b.update(&mut cx_b, |editor, cx| {
1306            editor.handle_input(&Input("ok, ".into()), cx)
1307        });
1308        buffer_a
1309            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1310            .await;
1311
1312        // TODO
1313        // // Remove the selection set as client B, see those selections disappear as client A.
1314        cx_b.update(move |_| drop(editor_b));
1315        // buffer_a
1316        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1317        //     .await;
1318
1319        // Close the buffer as client A, see that the buffer is closed.
1320        cx_a.update(move |_| drop(buffer_a));
1321        project_a
1322            .condition(&cx_a, |project, cx| {
1323                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1324            })
1325            .await;
1326
1327        // Dropping the client B's project removes client B from client A's collaborators.
1328        cx_b.update(move |_| drop(project_b));
1329        project_a
1330            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1331            .await;
1332    }
1333
1334    #[gpui::test]
1335    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1336        let lang_registry = Arc::new(LanguageRegistry::new());
1337        let fs = Arc::new(FakeFs::new(cx_a.background()));
1338        cx_a.foreground().forbid_parking();
1339
1340        // Connect to a server as 2 clients.
1341        let mut server = TestServer::start(cx_a.foreground()).await;
1342        let client_a = server.create_client(&mut cx_a, "user_a").await;
1343        let client_b = server.create_client(&mut cx_b, "user_b").await;
1344
1345        // Share a project as client A
1346        fs.insert_tree(
1347            "/a",
1348            json!({
1349                ".zed.toml": r#"collaborators = ["user_b"]"#,
1350                "a.txt": "a-contents",
1351                "b.txt": "b-contents",
1352            }),
1353        )
1354        .await;
1355        let project_a = cx_a.update(|cx| {
1356            Project::local(
1357                client_a.clone(),
1358                client_a.user_store.clone(),
1359                lang_registry.clone(),
1360                fs.clone(),
1361                cx,
1362            )
1363        });
1364        let (worktree_a, _) = project_a
1365            .update(&mut cx_a, |p, cx| {
1366                p.find_or_create_local_worktree("/a", false, cx)
1367            })
1368            .await
1369            .unwrap();
1370        worktree_a
1371            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1372            .await;
1373        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1374        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1375        project_a
1376            .update(&mut cx_a, |p, cx| p.share(cx))
1377            .await
1378            .unwrap();
1379        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1380
1381        // Join that project as client B
1382        let project_b = Project::remote(
1383            project_id,
1384            client_b.clone(),
1385            client_b.user_store.clone(),
1386            lang_registry.clone(),
1387            fs.clone(),
1388            &mut cx_b.to_async(),
1389        )
1390        .await
1391        .unwrap();
1392        project_b
1393            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1394            .await
1395            .unwrap();
1396
1397        // Unshare the project as client A
1398        project_a
1399            .update(&mut cx_a, |project, cx| project.unshare(cx))
1400            .await
1401            .unwrap();
1402        project_b
1403            .condition(&mut cx_b, |project, _| project.is_read_only())
1404            .await;
1405        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1406        drop(project_b);
1407
1408        // Share the project again and ensure guests can still join.
1409        project_a
1410            .update(&mut cx_a, |project, cx| project.share(cx))
1411            .await
1412            .unwrap();
1413        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1414
1415        let project_c = Project::remote(
1416            project_id,
1417            client_b.clone(),
1418            client_b.user_store.clone(),
1419            lang_registry.clone(),
1420            fs.clone(),
1421            &mut cx_b.to_async(),
1422        )
1423        .await
1424        .unwrap();
1425        project_c
1426            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1427            .await
1428            .unwrap();
1429    }
1430
1431    #[gpui::test]
1432    async fn test_propagate_saves_and_fs_changes(
1433        mut cx_a: TestAppContext,
1434        mut cx_b: TestAppContext,
1435        mut cx_c: TestAppContext,
1436    ) {
1437        let lang_registry = Arc::new(LanguageRegistry::new());
1438        let fs = Arc::new(FakeFs::new(cx_a.background()));
1439        cx_a.foreground().forbid_parking();
1440
1441        // Connect to a server as 3 clients.
1442        let mut server = TestServer::start(cx_a.foreground()).await;
1443        let client_a = server.create_client(&mut cx_a, "user_a").await;
1444        let client_b = server.create_client(&mut cx_b, "user_b").await;
1445        let client_c = server.create_client(&mut cx_c, "user_c").await;
1446
1447        // Share a worktree as client A.
1448        fs.insert_tree(
1449            "/a",
1450            json!({
1451                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1452                "file1": "",
1453                "file2": ""
1454            }),
1455        )
1456        .await;
1457        let project_a = cx_a.update(|cx| {
1458            Project::local(
1459                client_a.clone(),
1460                client_a.user_store.clone(),
1461                lang_registry.clone(),
1462                fs.clone(),
1463                cx,
1464            )
1465        });
1466        let (worktree_a, _) = project_a
1467            .update(&mut cx_a, |p, cx| {
1468                p.find_or_create_local_worktree("/a", false, cx)
1469            })
1470            .await
1471            .unwrap();
1472        worktree_a
1473            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1474            .await;
1475        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1476        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1477        project_a
1478            .update(&mut cx_a, |p, cx| p.share(cx))
1479            .await
1480            .unwrap();
1481
1482        // Join that worktree as clients B and C.
1483        let project_b = Project::remote(
1484            project_id,
1485            client_b.clone(),
1486            client_b.user_store.clone(),
1487            lang_registry.clone(),
1488            fs.clone(),
1489            &mut cx_b.to_async(),
1490        )
1491        .await
1492        .unwrap();
1493        let project_c = Project::remote(
1494            project_id,
1495            client_c.clone(),
1496            client_c.user_store.clone(),
1497            lang_registry.clone(),
1498            fs.clone(),
1499            &mut cx_c.to_async(),
1500        )
1501        .await
1502        .unwrap();
1503        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1504        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1505
1506        // Open and edit a buffer as both guests B and C.
1507        let buffer_b = project_b
1508            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1509            .await
1510            .unwrap();
1511        let buffer_c = project_c
1512            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1513            .await
1514            .unwrap();
1515        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1516        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1517
1518        // Open and edit that buffer as the host.
1519        let buffer_a = project_a
1520            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1521            .await
1522            .unwrap();
1523
1524        buffer_a
1525            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1526            .await;
1527        buffer_a.update(&mut cx_a, |buf, cx| {
1528            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1529        });
1530
1531        // Wait for edits to propagate
1532        buffer_a
1533            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1534            .await;
1535        buffer_b
1536            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1537            .await;
1538        buffer_c
1539            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1540            .await;
1541
1542        // Edit the buffer as the host and concurrently save as guest B.
1543        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1544        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1545        save_b.await.unwrap();
1546        assert_eq!(
1547            fs.load("/a/file1".as_ref()).await.unwrap(),
1548            "hi-a, i-am-c, i-am-b, i-am-a"
1549        );
1550        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1551        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1552        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1553
1554        // Make changes on host's file system, see those changes on guest worktrees.
1555        fs.rename("/a/file1".as_ref(), "/a/file1-renamed".as_ref())
1556            .await
1557            .unwrap();
1558        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref())
1559            .await
1560            .unwrap();
1561        fs.insert_file(Path::new("/a/file4"), "4".into())
1562            .await
1563            .unwrap();
1564
1565        worktree_a
1566            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1567            .await;
1568        worktree_b
1569            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1570            .await;
1571        worktree_c
1572            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1573            .await;
1574        worktree_a.read_with(&cx_a, |tree, _| {
1575            assert_eq!(
1576                tree.paths()
1577                    .map(|p| p.to_string_lossy())
1578                    .collect::<Vec<_>>(),
1579                &[".zed.toml", "file1-renamed", "file3", "file4"]
1580            )
1581        });
1582        worktree_b.read_with(&cx_b, |tree, _| {
1583            assert_eq!(
1584                tree.paths()
1585                    .map(|p| p.to_string_lossy())
1586                    .collect::<Vec<_>>(),
1587                &[".zed.toml", "file1-renamed", "file3", "file4"]
1588            )
1589        });
1590        worktree_c.read_with(&cx_c, |tree, _| {
1591            assert_eq!(
1592                tree.paths()
1593                    .map(|p| p.to_string_lossy())
1594                    .collect::<Vec<_>>(),
1595                &[".zed.toml", "file1-renamed", "file3", "file4"]
1596            )
1597        });
1598
1599        // Ensure buffer files are updated as well.
1600        buffer_a
1601            .condition(&cx_a, |buf, _| {
1602                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1603            })
1604            .await;
1605        buffer_b
1606            .condition(&cx_b, |buf, _| {
1607                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1608            })
1609            .await;
1610        buffer_c
1611            .condition(&cx_c, |buf, _| {
1612                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1613            })
1614            .await;
1615    }
1616
1617    #[gpui::test]
1618    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1619        cx_a.foreground().forbid_parking();
1620        let lang_registry = Arc::new(LanguageRegistry::new());
1621        let fs = Arc::new(FakeFs::new(cx_a.background()));
1622
1623        // Connect to a server as 2 clients.
1624        let mut server = TestServer::start(cx_a.foreground()).await;
1625        let client_a = server.create_client(&mut cx_a, "user_a").await;
1626        let client_b = server.create_client(&mut cx_b, "user_b").await;
1627
1628        // Share a project as client A
1629        fs.insert_tree(
1630            "/dir",
1631            json!({
1632                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1633                "a.txt": "a-contents",
1634            }),
1635        )
1636        .await;
1637
1638        let project_a = cx_a.update(|cx| {
1639            Project::local(
1640                client_a.clone(),
1641                client_a.user_store.clone(),
1642                lang_registry.clone(),
1643                fs.clone(),
1644                cx,
1645            )
1646        });
1647        let (worktree_a, _) = project_a
1648            .update(&mut cx_a, |p, cx| {
1649                p.find_or_create_local_worktree("/dir", false, cx)
1650            })
1651            .await
1652            .unwrap();
1653        worktree_a
1654            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1655            .await;
1656        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1657        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1658        project_a
1659            .update(&mut cx_a, |p, cx| p.share(cx))
1660            .await
1661            .unwrap();
1662
1663        // Join that project as client B
1664        let project_b = Project::remote(
1665            project_id,
1666            client_b.clone(),
1667            client_b.user_store.clone(),
1668            lang_registry.clone(),
1669            fs.clone(),
1670            &mut cx_b.to_async(),
1671        )
1672        .await
1673        .unwrap();
1674        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1675
1676        // Open a buffer as client B
1677        let buffer_b = project_b
1678            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1679            .await
1680            .unwrap();
1681        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1682
1683        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1684        buffer_b.read_with(&cx_b, |buf, _| {
1685            assert!(buf.is_dirty());
1686            assert!(!buf.has_conflict());
1687        });
1688
1689        buffer_b
1690            .update(&mut cx_b, |buf, cx| buf.save(cx))
1691            .await
1692            .unwrap();
1693        worktree_b
1694            .condition(&cx_b, |_, cx| {
1695                buffer_b.read(cx).file().unwrap().mtime() != mtime
1696            })
1697            .await;
1698        buffer_b.read_with(&cx_b, |buf, _| {
1699            assert!(!buf.is_dirty());
1700            assert!(!buf.has_conflict());
1701        });
1702
1703        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1704        buffer_b.read_with(&cx_b, |buf, _| {
1705            assert!(buf.is_dirty());
1706            assert!(!buf.has_conflict());
1707        });
1708    }
1709
1710    #[gpui::test]
1711    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1712        cx_a.foreground().forbid_parking();
1713        let lang_registry = Arc::new(LanguageRegistry::new());
1714        let fs = Arc::new(FakeFs::new(cx_a.background()));
1715
1716        // Connect to a server as 2 clients.
1717        let mut server = TestServer::start(cx_a.foreground()).await;
1718        let client_a = server.create_client(&mut cx_a, "user_a").await;
1719        let client_b = server.create_client(&mut cx_b, "user_b").await;
1720
1721        // Share a project as client A
1722        fs.insert_tree(
1723            "/dir",
1724            json!({
1725                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1726                "a.txt": "a-contents",
1727            }),
1728        )
1729        .await;
1730
1731        let project_a = cx_a.update(|cx| {
1732            Project::local(
1733                client_a.clone(),
1734                client_a.user_store.clone(),
1735                lang_registry.clone(),
1736                fs.clone(),
1737                cx,
1738            )
1739        });
1740        let (worktree_a, _) = project_a
1741            .update(&mut cx_a, |p, cx| {
1742                p.find_or_create_local_worktree("/dir", false, cx)
1743            })
1744            .await
1745            .unwrap();
1746        worktree_a
1747            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1748            .await;
1749        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1750        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1751        project_a
1752            .update(&mut cx_a, |p, cx| p.share(cx))
1753            .await
1754            .unwrap();
1755
1756        // Join that project as client B
1757        let project_b = Project::remote(
1758            project_id,
1759            client_b.clone(),
1760            client_b.user_store.clone(),
1761            lang_registry.clone(),
1762            fs.clone(),
1763            &mut cx_b.to_async(),
1764        )
1765        .await
1766        .unwrap();
1767        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1768
1769        // Open a buffer as client B
1770        let buffer_b = project_b
1771            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1772            .await
1773            .unwrap();
1774        buffer_b.read_with(&cx_b, |buf, _| {
1775            assert!(!buf.is_dirty());
1776            assert!(!buf.has_conflict());
1777        });
1778
1779        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1780            .await
1781            .unwrap();
1782        buffer_b
1783            .condition(&cx_b, |buf, _| {
1784                buf.text() == "new contents" && !buf.is_dirty()
1785            })
1786            .await;
1787        buffer_b.read_with(&cx_b, |buf, _| {
1788            assert!(!buf.has_conflict());
1789        });
1790    }
1791
1792    #[gpui::test(iterations = 100)]
1793    async fn test_editing_while_guest_opens_buffer(
1794        mut cx_a: TestAppContext,
1795        mut cx_b: TestAppContext,
1796    ) {
1797        cx_a.foreground().forbid_parking();
1798        let lang_registry = Arc::new(LanguageRegistry::new());
1799        let fs = Arc::new(FakeFs::new(cx_a.background()));
1800
1801        // Connect to a server as 2 clients.
1802        let mut server = TestServer::start(cx_a.foreground()).await;
1803        let client_a = server.create_client(&mut cx_a, "user_a").await;
1804        let client_b = server.create_client(&mut cx_b, "user_b").await;
1805
1806        // Share a project as client A
1807        fs.insert_tree(
1808            "/dir",
1809            json!({
1810                ".zed.toml": r#"collaborators = ["user_b"]"#,
1811                "a.txt": "a-contents",
1812            }),
1813        )
1814        .await;
1815        let project_a = cx_a.update(|cx| {
1816            Project::local(
1817                client_a.clone(),
1818                client_a.user_store.clone(),
1819                lang_registry.clone(),
1820                fs.clone(),
1821                cx,
1822            )
1823        });
1824        let (worktree_a, _) = project_a
1825            .update(&mut cx_a, |p, cx| {
1826                p.find_or_create_local_worktree("/dir", false, cx)
1827            })
1828            .await
1829            .unwrap();
1830        worktree_a
1831            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1832            .await;
1833        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1834        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1835        project_a
1836            .update(&mut cx_a, |p, cx| p.share(cx))
1837            .await
1838            .unwrap();
1839
1840        // Join that project as client B
1841        let project_b = Project::remote(
1842            project_id,
1843            client_b.clone(),
1844            client_b.user_store.clone(),
1845            lang_registry.clone(),
1846            fs.clone(),
1847            &mut cx_b.to_async(),
1848        )
1849        .await
1850        .unwrap();
1851
1852        // Open a buffer as client A
1853        let buffer_a = project_a
1854            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1855            .await
1856            .unwrap();
1857
1858        // Start opening the same buffer as client B
1859        let buffer_b = cx_b
1860            .background()
1861            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1862        task::yield_now().await;
1863
1864        // Edit the buffer as client A while client B is still opening it.
1865        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1866
1867        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1868        let buffer_b = buffer_b.await.unwrap();
1869        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1870    }
1871
1872    #[gpui::test]
1873    async fn test_leaving_worktree_while_opening_buffer(
1874        mut cx_a: TestAppContext,
1875        mut cx_b: TestAppContext,
1876    ) {
1877        cx_a.foreground().forbid_parking();
1878        let lang_registry = Arc::new(LanguageRegistry::new());
1879        let fs = Arc::new(FakeFs::new(cx_a.background()));
1880
1881        // Connect to a server as 2 clients.
1882        let mut server = TestServer::start(cx_a.foreground()).await;
1883        let client_a = server.create_client(&mut cx_a, "user_a").await;
1884        let client_b = server.create_client(&mut cx_b, "user_b").await;
1885
1886        // Share a project as client A
1887        fs.insert_tree(
1888            "/dir",
1889            json!({
1890                ".zed.toml": r#"collaborators = ["user_b"]"#,
1891                "a.txt": "a-contents",
1892            }),
1893        )
1894        .await;
1895        let project_a = cx_a.update(|cx| {
1896            Project::local(
1897                client_a.clone(),
1898                client_a.user_store.clone(),
1899                lang_registry.clone(),
1900                fs.clone(),
1901                cx,
1902            )
1903        });
1904        let (worktree_a, _) = project_a
1905            .update(&mut cx_a, |p, cx| {
1906                p.find_or_create_local_worktree("/dir", false, cx)
1907            })
1908            .await
1909            .unwrap();
1910        worktree_a
1911            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1912            .await;
1913        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1914        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1915        project_a
1916            .update(&mut cx_a, |p, cx| p.share(cx))
1917            .await
1918            .unwrap();
1919
1920        // Join that project as client B
1921        let project_b = Project::remote(
1922            project_id,
1923            client_b.clone(),
1924            client_b.user_store.clone(),
1925            lang_registry.clone(),
1926            fs.clone(),
1927            &mut cx_b.to_async(),
1928        )
1929        .await
1930        .unwrap();
1931
1932        // See that a guest has joined as client A.
1933        project_a
1934            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1935            .await;
1936
1937        // Begin opening a buffer as client B, but leave the project before the open completes.
1938        let buffer_b = cx_b
1939            .background()
1940            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1941        cx_b.update(|_| drop(project_b));
1942        drop(buffer_b);
1943
1944        // See that the guest has left.
1945        project_a
1946            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
1947            .await;
1948    }
1949
1950    #[gpui::test]
1951    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1952        cx_a.foreground().forbid_parking();
1953        let lang_registry = Arc::new(LanguageRegistry::new());
1954        let fs = Arc::new(FakeFs::new(cx_a.background()));
1955
1956        // Connect to a server as 2 clients.
1957        let mut server = TestServer::start(cx_a.foreground()).await;
1958        let client_a = server.create_client(&mut cx_a, "user_a").await;
1959        let client_b = server.create_client(&mut cx_b, "user_b").await;
1960
1961        // Share a project as client A
1962        fs.insert_tree(
1963            "/a",
1964            json!({
1965                ".zed.toml": r#"collaborators = ["user_b"]"#,
1966                "a.txt": "a-contents",
1967                "b.txt": "b-contents",
1968            }),
1969        )
1970        .await;
1971        let project_a = cx_a.update(|cx| {
1972            Project::local(
1973                client_a.clone(),
1974                client_a.user_store.clone(),
1975                lang_registry.clone(),
1976                fs.clone(),
1977                cx,
1978            )
1979        });
1980        let (worktree_a, _) = project_a
1981            .update(&mut cx_a, |p, cx| {
1982                p.find_or_create_local_worktree("/a", false, cx)
1983            })
1984            .await
1985            .unwrap();
1986        worktree_a
1987            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1988            .await;
1989        let project_id = project_a
1990            .update(&mut cx_a, |project, _| project.next_remote_id())
1991            .await;
1992        project_a
1993            .update(&mut cx_a, |project, cx| project.share(cx))
1994            .await
1995            .unwrap();
1996
1997        // Join that project as client B
1998        let _project_b = Project::remote(
1999            project_id,
2000            client_b.clone(),
2001            client_b.user_store.clone(),
2002            lang_registry.clone(),
2003            fs.clone(),
2004            &mut cx_b.to_async(),
2005        )
2006        .await
2007        .unwrap();
2008
2009        // See that a guest has joined as client A.
2010        project_a
2011            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2012            .await;
2013
2014        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2015        client_b.disconnect(&cx_b.to_async()).unwrap();
2016        project_a
2017            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2018            .await;
2019    }
2020
2021    #[gpui::test]
2022    async fn test_collaborating_with_diagnostics(
2023        mut cx_a: TestAppContext,
2024        mut cx_b: TestAppContext,
2025    ) {
2026        cx_a.foreground().forbid_parking();
2027        let mut lang_registry = Arc::new(LanguageRegistry::new());
2028        let fs = Arc::new(FakeFs::new(cx_a.background()));
2029
2030        // Set up a fake language server.
2031        let (language_server_config, mut fake_language_server) =
2032            LanguageServerConfig::fake(cx_a.background()).await;
2033        Arc::get_mut(&mut lang_registry)
2034            .unwrap()
2035            .add(Arc::new(Language::new(
2036                LanguageConfig {
2037                    name: "Rust".to_string(),
2038                    path_suffixes: vec!["rs".to_string()],
2039                    language_server: Some(language_server_config),
2040                    ..Default::default()
2041                },
2042                Some(tree_sitter_rust::language()),
2043            )));
2044
2045        // Connect to a server as 2 clients.
2046        let mut server = TestServer::start(cx_a.foreground()).await;
2047        let client_a = server.create_client(&mut cx_a, "user_a").await;
2048        let client_b = server.create_client(&mut cx_b, "user_b").await;
2049
2050        // Share a project as client A
2051        fs.insert_tree(
2052            "/a",
2053            json!({
2054                ".zed.toml": r#"collaborators = ["user_b"]"#,
2055                "a.rs": "let one = two",
2056                "other.rs": "",
2057            }),
2058        )
2059        .await;
2060        let project_a = cx_a.update(|cx| {
2061            Project::local(
2062                client_a.clone(),
2063                client_a.user_store.clone(),
2064                lang_registry.clone(),
2065                fs.clone(),
2066                cx,
2067            )
2068        });
2069        let (worktree_a, _) = project_a
2070            .update(&mut cx_a, |p, cx| {
2071                p.find_or_create_local_worktree("/a", false, cx)
2072            })
2073            .await
2074            .unwrap();
2075        worktree_a
2076            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2077            .await;
2078        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2079        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2080        project_a
2081            .update(&mut cx_a, |p, cx| p.share(cx))
2082            .await
2083            .unwrap();
2084
2085        // Cause the language server to start.
2086        let _ = cx_a
2087            .background()
2088            .spawn(project_a.update(&mut cx_a, |project, cx| {
2089                project.open_buffer(
2090                    ProjectPath {
2091                        worktree_id,
2092                        path: Path::new("other.rs").into(),
2093                    },
2094                    cx,
2095                )
2096            }))
2097            .await
2098            .unwrap();
2099
2100        // Simulate a language server reporting errors for a file.
2101        fake_language_server
2102            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2103                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2104                version: None,
2105                diagnostics: vec![lsp::Diagnostic {
2106                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2107                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2108                    message: "message 1".to_string(),
2109                    ..Default::default()
2110                }],
2111            })
2112            .await;
2113
2114        // Wait for server to see the diagnostics update.
2115        server
2116            .condition(|store| {
2117                let worktree = store
2118                    .project(project_id)
2119                    .unwrap()
2120                    .worktrees
2121                    .get(&worktree_id.to_proto())
2122                    .unwrap();
2123
2124                !worktree
2125                    .share
2126                    .as_ref()
2127                    .unwrap()
2128                    .diagnostic_summaries
2129                    .is_empty()
2130            })
2131            .await;
2132
2133        // Join the worktree as client B.
2134        let project_b = Project::remote(
2135            project_id,
2136            client_b.clone(),
2137            client_b.user_store.clone(),
2138            lang_registry.clone(),
2139            fs.clone(),
2140            &mut cx_b.to_async(),
2141        )
2142        .await
2143        .unwrap();
2144
2145        project_b.read_with(&cx_b, |project, cx| {
2146            assert_eq!(
2147                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2148                &[(
2149                    ProjectPath {
2150                        worktree_id,
2151                        path: Arc::from(Path::new("a.rs")),
2152                    },
2153                    DiagnosticSummary {
2154                        error_count: 1,
2155                        warning_count: 0,
2156                        ..Default::default()
2157                    },
2158                )]
2159            )
2160        });
2161
2162        // Simulate a language server reporting more errors for a file.
2163        fake_language_server
2164            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2165                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2166                version: None,
2167                diagnostics: vec![
2168                    lsp::Diagnostic {
2169                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2170                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2171                        message: "message 1".to_string(),
2172                        ..Default::default()
2173                    },
2174                    lsp::Diagnostic {
2175                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2176                        range: lsp::Range::new(
2177                            lsp::Position::new(0, 10),
2178                            lsp::Position::new(0, 13),
2179                        ),
2180                        message: "message 2".to_string(),
2181                        ..Default::default()
2182                    },
2183                ],
2184            })
2185            .await;
2186
2187        // Client b gets the updated summaries
2188        project_b
2189            .condition(&cx_b, |project, cx| {
2190                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2191                    == &[(
2192                        ProjectPath {
2193                            worktree_id,
2194                            path: Arc::from(Path::new("a.rs")),
2195                        },
2196                        DiagnosticSummary {
2197                            error_count: 1,
2198                            warning_count: 1,
2199                            ..Default::default()
2200                        },
2201                    )]
2202            })
2203            .await;
2204
2205        // Open the file with the errors on client B. They should be present.
2206        let buffer_b = cx_b
2207            .background()
2208            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2209            .await
2210            .unwrap();
2211
2212        buffer_b.read_with(&cx_b, |buffer, _| {
2213            assert_eq!(
2214                buffer
2215                    .snapshot()
2216                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2217                    .map(|entry| entry)
2218                    .collect::<Vec<_>>(),
2219                &[
2220                    DiagnosticEntry {
2221                        range: Point::new(0, 4)..Point::new(0, 7),
2222                        diagnostic: Diagnostic {
2223                            group_id: 0,
2224                            message: "message 1".to_string(),
2225                            severity: lsp::DiagnosticSeverity::ERROR,
2226                            is_primary: true,
2227                            ..Default::default()
2228                        }
2229                    },
2230                    DiagnosticEntry {
2231                        range: Point::new(0, 10)..Point::new(0, 13),
2232                        diagnostic: Diagnostic {
2233                            group_id: 1,
2234                            severity: lsp::DiagnosticSeverity::WARNING,
2235                            message: "message 2".to_string(),
2236                            is_primary: true,
2237                            ..Default::default()
2238                        }
2239                    }
2240                ]
2241            );
2242        });
2243    }
2244
2245    #[gpui::test]
2246    async fn test_collaborating_with_completion(
2247        mut cx_a: TestAppContext,
2248        mut cx_b: TestAppContext,
2249    ) {
2250        cx_a.foreground().forbid_parking();
2251        let mut lang_registry = Arc::new(LanguageRegistry::new());
2252        let fs = Arc::new(FakeFs::new(cx_a.background()));
2253
2254        // Set up a fake language server.
2255        let (language_server_config, mut fake_language_server) =
2256            LanguageServerConfig::fake_with_capabilities(
2257                lsp::ServerCapabilities {
2258                    completion_provider: Some(lsp::CompletionOptions {
2259                        trigger_characters: Some(vec![".".to_string()]),
2260                        ..Default::default()
2261                    }),
2262                    ..Default::default()
2263                },
2264                cx_a.background(),
2265            )
2266            .await;
2267        Arc::get_mut(&mut lang_registry)
2268            .unwrap()
2269            .add(Arc::new(Language::new(
2270                LanguageConfig {
2271                    name: "Rust".to_string(),
2272                    path_suffixes: vec!["rs".to_string()],
2273                    language_server: Some(language_server_config),
2274                    ..Default::default()
2275                },
2276                Some(tree_sitter_rust::language()),
2277            )));
2278
2279        // Connect to a server as 2 clients.
2280        let mut server = TestServer::start(cx_a.foreground()).await;
2281        let client_a = server.create_client(&mut cx_a, "user_a").await;
2282        let client_b = server.create_client(&mut cx_b, "user_b").await;
2283
2284        // Share a project as client A
2285        fs.insert_tree(
2286            "/a",
2287            json!({
2288                ".zed.toml": r#"collaborators = ["user_b"]"#,
2289                "main.rs": "fn main() { a }",
2290                "other.rs": "",
2291            }),
2292        )
2293        .await;
2294        let project_a = cx_a.update(|cx| {
2295            Project::local(
2296                client_a.clone(),
2297                client_a.user_store.clone(),
2298                lang_registry.clone(),
2299                fs.clone(),
2300                cx,
2301            )
2302        });
2303        let (worktree_a, _) = project_a
2304            .update(&mut cx_a, |p, cx| {
2305                p.find_or_create_local_worktree("/a", false, cx)
2306            })
2307            .await
2308            .unwrap();
2309        worktree_a
2310            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2311            .await;
2312        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2313        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2314        project_a
2315            .update(&mut cx_a, |p, cx| p.share(cx))
2316            .await
2317            .unwrap();
2318
2319        // Join the worktree as client B.
2320        let project_b = Project::remote(
2321            project_id,
2322            client_b.clone(),
2323            client_b.user_store.clone(),
2324            lang_registry.clone(),
2325            fs.clone(),
2326            &mut cx_b.to_async(),
2327        )
2328        .await
2329        .unwrap();
2330
2331        // Open a file in an editor as the guest.
2332        let buffer_b = project_b
2333            .update(&mut cx_b, |p, cx| {
2334                p.open_buffer((worktree_id, "main.rs"), cx)
2335            })
2336            .await
2337            .unwrap();
2338        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2339        let editor_b = cx_b.add_view(window_b, |cx| {
2340            Editor::for_buffer(
2341                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2342                Arc::new(|cx| EditorSettings::test(cx)),
2343                cx,
2344            )
2345        });
2346
2347        // Type a completion trigger character as the guest.
2348        editor_b.update(&mut cx_b, |editor, cx| {
2349            editor.select_ranges([13..13], None, cx);
2350            editor.handle_input(&Input(".".into()), cx);
2351            cx.focus(&editor_b);
2352        });
2353
2354        // Receive a completion request as the host's language server.
2355        let (request_id, params) = fake_language_server
2356            .receive_request::<lsp::request::Completion>()
2357            .await;
2358        assert_eq!(
2359            params.text_document_position.text_document.uri,
2360            lsp::Url::from_file_path("/a/main.rs").unwrap(),
2361        );
2362        assert_eq!(
2363            params.text_document_position.position,
2364            lsp::Position::new(0, 14),
2365        );
2366
2367        // Return some completions from the host's language server.
2368        fake_language_server
2369            .respond(
2370                request_id,
2371                Some(lsp::CompletionResponse::Array(vec![
2372                    lsp::CompletionItem {
2373                        label: "first_method(…)".into(),
2374                        detail: Some("fn(&mut self, B) -> C".into()),
2375                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2376                            new_text: "first_method($1)".to_string(),
2377                            range: lsp::Range::new(
2378                                lsp::Position::new(0, 14),
2379                                lsp::Position::new(0, 14),
2380                            ),
2381                        })),
2382                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2383                        ..Default::default()
2384                    },
2385                    lsp::CompletionItem {
2386                        label: "second_method(…)".into(),
2387                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2388                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2389                            new_text: "second_method()".to_string(),
2390                            range: lsp::Range::new(
2391                                lsp::Position::new(0, 14),
2392                                lsp::Position::new(0, 14),
2393                            ),
2394                        })),
2395                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2396                        ..Default::default()
2397                    },
2398                ])),
2399            )
2400            .await;
2401
2402        // Open the buffer on the host.
2403        let buffer_a = project_a
2404            .update(&mut cx_a, |p, cx| {
2405                p.open_buffer((worktree_id, "main.rs"), cx)
2406            })
2407            .await
2408            .unwrap();
2409        buffer_a
2410            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2411            .await;
2412
2413        // Confirm a completion on the guest.
2414        editor_b.next_notification(&cx_b).await;
2415        editor_b.update(&mut cx_b, |editor, cx| {
2416            assert!(editor.has_completions());
2417            editor.confirm_completion(Some(0), cx);
2418            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2419        });
2420
2421        buffer_a
2422            .condition(&cx_a, |buffer, _| {
2423                buffer.text() == "fn main() { a.first_method() }"
2424            })
2425            .await;
2426
2427        // Receive a request resolve the selected completion on the host's language server.
2428        let (request_id, params) = fake_language_server
2429            .receive_request::<lsp::request::ResolveCompletionItem>()
2430            .await;
2431        assert_eq!(params.label, "first_method(…)");
2432
2433        // Return a resolved completion from the host's language server.
2434        // The resolved completion has an additional text edit.
2435        fake_language_server
2436            .respond(
2437                request_id,
2438                lsp::CompletionItem {
2439                    label: "first_method(…)".into(),
2440                    detail: Some("fn(&mut self, B) -> C".into()),
2441                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2442                        new_text: "first_method($1)".to_string(),
2443                        range: lsp::Range::new(
2444                            lsp::Position::new(0, 14),
2445                            lsp::Position::new(0, 14),
2446                        ),
2447                    })),
2448                    additional_text_edits: Some(vec![lsp::TextEdit {
2449                        new_text: "use d::SomeTrait;\n".to_string(),
2450                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2451                    }]),
2452                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2453                    ..Default::default()
2454                },
2455            )
2456            .await;
2457
2458        // The additional edit is applied.
2459        buffer_b
2460            .condition(&cx_b, |buffer, _| {
2461                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2462            })
2463            .await;
2464        assert_eq!(
2465            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2466            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2467        );
2468    }
2469
2470    #[gpui::test]
2471    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2472        cx_a.foreground().forbid_parking();
2473        let mut lang_registry = Arc::new(LanguageRegistry::new());
2474        let fs = Arc::new(FakeFs::new(cx_a.background()));
2475
2476        // Set up a fake language server.
2477        let (language_server_config, mut fake_language_server) =
2478            LanguageServerConfig::fake(cx_a.background()).await;
2479        Arc::get_mut(&mut lang_registry)
2480            .unwrap()
2481            .add(Arc::new(Language::new(
2482                LanguageConfig {
2483                    name: "Rust".to_string(),
2484                    path_suffixes: vec!["rs".to_string()],
2485                    language_server: Some(language_server_config),
2486                    ..Default::default()
2487                },
2488                Some(tree_sitter_rust::language()),
2489            )));
2490
2491        // Connect to a server as 2 clients.
2492        let mut server = TestServer::start(cx_a.foreground()).await;
2493        let client_a = server.create_client(&mut cx_a, "user_a").await;
2494        let client_b = server.create_client(&mut cx_b, "user_b").await;
2495
2496        // Share a project as client A
2497        fs.insert_tree(
2498            "/a",
2499            json!({
2500                ".zed.toml": r#"collaborators = ["user_b"]"#,
2501                "a.rs": "let one = two",
2502            }),
2503        )
2504        .await;
2505        let project_a = cx_a.update(|cx| {
2506            Project::local(
2507                client_a.clone(),
2508                client_a.user_store.clone(),
2509                lang_registry.clone(),
2510                fs.clone(),
2511                cx,
2512            )
2513        });
2514        let (worktree_a, _) = project_a
2515            .update(&mut cx_a, |p, cx| {
2516                p.find_or_create_local_worktree("/a", false, cx)
2517            })
2518            .await
2519            .unwrap();
2520        worktree_a
2521            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2522            .await;
2523        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2524        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2525        project_a
2526            .update(&mut cx_a, |p, cx| p.share(cx))
2527            .await
2528            .unwrap();
2529
2530        // Join the worktree as client B.
2531        let project_b = Project::remote(
2532            project_id,
2533            client_b.clone(),
2534            client_b.user_store.clone(),
2535            lang_registry.clone(),
2536            fs.clone(),
2537            &mut cx_b.to_async(),
2538        )
2539        .await
2540        .unwrap();
2541
2542        let buffer_b = cx_b
2543            .background()
2544            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2545            .await
2546            .unwrap();
2547
2548        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2549        let (request_id, _) = fake_language_server
2550            .receive_request::<lsp::request::Formatting>()
2551            .await;
2552        fake_language_server
2553            .respond(
2554                request_id,
2555                Some(vec![
2556                    lsp::TextEdit {
2557                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2558                        new_text: "h".to_string(),
2559                    },
2560                    lsp::TextEdit {
2561                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2562                        new_text: "y".to_string(),
2563                    },
2564                ]),
2565            )
2566            .await;
2567        format.await.unwrap();
2568        assert_eq!(
2569            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2570            "let honey = two"
2571        );
2572    }
2573
2574    #[gpui::test]
2575    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2576        cx_a.foreground().forbid_parking();
2577        let mut lang_registry = Arc::new(LanguageRegistry::new());
2578        let fs = Arc::new(FakeFs::new(cx_a.background()));
2579        fs.insert_tree(
2580            "/root-1",
2581            json!({
2582                ".zed.toml": r#"collaborators = ["user_b"]"#,
2583                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2584            }),
2585        )
2586        .await;
2587        fs.insert_tree(
2588            "/root-2",
2589            json!({
2590                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2591            }),
2592        )
2593        .await;
2594
2595        // Set up a fake language server.
2596        let (language_server_config, mut fake_language_server) =
2597            LanguageServerConfig::fake(cx_a.background()).await;
2598        Arc::get_mut(&mut lang_registry)
2599            .unwrap()
2600            .add(Arc::new(Language::new(
2601                LanguageConfig {
2602                    name: "Rust".to_string(),
2603                    path_suffixes: vec!["rs".to_string()],
2604                    language_server: Some(language_server_config),
2605                    ..Default::default()
2606                },
2607                Some(tree_sitter_rust::language()),
2608            )));
2609
2610        // Connect to a server as 2 clients.
2611        let mut server = TestServer::start(cx_a.foreground()).await;
2612        let client_a = server.create_client(&mut cx_a, "user_a").await;
2613        let client_b = server.create_client(&mut cx_b, "user_b").await;
2614
2615        // Share a project as client A
2616        let project_a = cx_a.update(|cx| {
2617            Project::local(
2618                client_a.clone(),
2619                client_a.user_store.clone(),
2620                lang_registry.clone(),
2621                fs.clone(),
2622                cx,
2623            )
2624        });
2625        let (worktree_a, _) = project_a
2626            .update(&mut cx_a, |p, cx| {
2627                p.find_or_create_local_worktree("/root-1", false, cx)
2628            })
2629            .await
2630            .unwrap();
2631        worktree_a
2632            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2633            .await;
2634        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2635        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2636        project_a
2637            .update(&mut cx_a, |p, cx| p.share(cx))
2638            .await
2639            .unwrap();
2640
2641        // Join the worktree as client B.
2642        let project_b = Project::remote(
2643            project_id,
2644            client_b.clone(),
2645            client_b.user_store.clone(),
2646            lang_registry.clone(),
2647            fs.clone(),
2648            &mut cx_b.to_async(),
2649        )
2650        .await
2651        .unwrap();
2652
2653        // Open the file to be formatted on client B.
2654        let buffer_b = cx_b
2655            .background()
2656            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2657            .await
2658            .unwrap();
2659
2660        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2661        let (request_id, _) = fake_language_server
2662            .receive_request::<lsp::request::GotoDefinition>()
2663            .await;
2664        fake_language_server
2665            .respond(
2666                request_id,
2667                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2668                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2669                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2670                ))),
2671            )
2672            .await;
2673        let definitions_1 = definitions_1.await.unwrap();
2674        cx_b.read(|cx| {
2675            assert_eq!(definitions_1.len(), 1);
2676            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2677            let target_buffer = definitions_1[0].target_buffer.read(cx);
2678            assert_eq!(
2679                target_buffer.text(),
2680                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2681            );
2682            assert_eq!(
2683                definitions_1[0].target_range.to_point(target_buffer),
2684                Point::new(0, 6)..Point::new(0, 9)
2685            );
2686        });
2687
2688        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2689        // the previous call to `definition`.
2690        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2691        let (request_id, _) = fake_language_server
2692            .receive_request::<lsp::request::GotoDefinition>()
2693            .await;
2694        fake_language_server
2695            .respond(
2696                request_id,
2697                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2698                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2699                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2700                ))),
2701            )
2702            .await;
2703        let definitions_2 = definitions_2.await.unwrap();
2704        cx_b.read(|cx| {
2705            assert_eq!(definitions_2.len(), 1);
2706            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2707            let target_buffer = definitions_2[0].target_buffer.read(cx);
2708            assert_eq!(
2709                target_buffer.text(),
2710                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2711            );
2712            assert_eq!(
2713                definitions_2[0].target_range.to_point(target_buffer),
2714                Point::new(1, 6)..Point::new(1, 11)
2715            );
2716        });
2717        assert_eq!(
2718            definitions_1[0].target_buffer,
2719            definitions_2[0].target_buffer
2720        );
2721
2722        cx_b.update(|_| {
2723            drop(definitions_1);
2724            drop(definitions_2);
2725        });
2726        project_b
2727            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2728            .await;
2729    }
2730
2731    #[gpui::test]
2732    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2733        mut cx_a: TestAppContext,
2734        mut cx_b: TestAppContext,
2735        mut rng: StdRng,
2736    ) {
2737        cx_a.foreground().forbid_parking();
2738        let mut lang_registry = Arc::new(LanguageRegistry::new());
2739        let fs = Arc::new(FakeFs::new(cx_a.background()));
2740        fs.insert_tree(
2741            "/root",
2742            json!({
2743                ".zed.toml": r#"collaborators = ["user_b"]"#,
2744                "a.rs": "const ONE: usize = b::TWO;",
2745                "b.rs": "const TWO: usize = 2",
2746            }),
2747        )
2748        .await;
2749
2750        // Set up a fake language server.
2751        let (language_server_config, mut fake_language_server) =
2752            LanguageServerConfig::fake(cx_a.background()).await;
2753        Arc::get_mut(&mut lang_registry)
2754            .unwrap()
2755            .add(Arc::new(Language::new(
2756                LanguageConfig {
2757                    name: "Rust".to_string(),
2758                    path_suffixes: vec!["rs".to_string()],
2759                    language_server: Some(language_server_config),
2760                    ..Default::default()
2761                },
2762                Some(tree_sitter_rust::language()),
2763            )));
2764
2765        // Connect to a server as 2 clients.
2766        let mut server = TestServer::start(cx_a.foreground()).await;
2767        let client_a = server.create_client(&mut cx_a, "user_a").await;
2768        let client_b = server.create_client(&mut cx_b, "user_b").await;
2769
2770        // Share a project as client A
2771        let project_a = cx_a.update(|cx| {
2772            Project::local(
2773                client_a.clone(),
2774                client_a.user_store.clone(),
2775                lang_registry.clone(),
2776                fs.clone(),
2777                cx,
2778            )
2779        });
2780        let (worktree_a, _) = project_a
2781            .update(&mut cx_a, |p, cx| {
2782                p.find_or_create_local_worktree("/root", false, cx)
2783            })
2784            .await
2785            .unwrap();
2786        worktree_a
2787            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2788            .await;
2789        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2790        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2791        project_a
2792            .update(&mut cx_a, |p, cx| p.share(cx))
2793            .await
2794            .unwrap();
2795
2796        // Join the worktree as client B.
2797        let project_b = Project::remote(
2798            project_id,
2799            client_b.clone(),
2800            client_b.user_store.clone(),
2801            lang_registry.clone(),
2802            fs.clone(),
2803            &mut cx_b.to_async(),
2804        )
2805        .await
2806        .unwrap();
2807
2808        let buffer_b1 = cx_b
2809            .background()
2810            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2811            .await
2812            .unwrap();
2813
2814        let definitions;
2815        let buffer_b2;
2816        if rng.gen() {
2817            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2818            buffer_b2 =
2819                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2820        } else {
2821            buffer_b2 =
2822                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2823            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2824        }
2825
2826        let (request_id, _) = fake_language_server
2827            .receive_request::<lsp::request::GotoDefinition>()
2828            .await;
2829        fake_language_server
2830            .respond(
2831                request_id,
2832                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2833                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2834                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2835                ))),
2836            )
2837            .await;
2838
2839        let buffer_b2 = buffer_b2.await.unwrap();
2840        let definitions = definitions.await.unwrap();
2841        assert_eq!(definitions.len(), 1);
2842        assert_eq!(definitions[0].target_buffer, buffer_b2);
2843    }
2844
2845    #[gpui::test]
2846    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2847        cx_a.foreground().forbid_parking();
2848
2849        // Connect to a server as 2 clients.
2850        let mut server = TestServer::start(cx_a.foreground()).await;
2851        let client_a = server.create_client(&mut cx_a, "user_a").await;
2852        let client_b = server.create_client(&mut cx_b, "user_b").await;
2853
2854        // Create an org that includes these 2 users.
2855        let db = &server.app_state.db;
2856        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2857        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2858            .await
2859            .unwrap();
2860        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2861            .await
2862            .unwrap();
2863
2864        // Create a channel that includes all the users.
2865        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2866        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2867            .await
2868            .unwrap();
2869        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2870            .await
2871            .unwrap();
2872        db.create_channel_message(
2873            channel_id,
2874            client_b.current_user_id(&cx_b),
2875            "hello A, it's B.",
2876            OffsetDateTime::now_utc(),
2877            1,
2878        )
2879        .await
2880        .unwrap();
2881
2882        let channels_a = cx_a
2883            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2884        channels_a
2885            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2886            .await;
2887        channels_a.read_with(&cx_a, |list, _| {
2888            assert_eq!(
2889                list.available_channels().unwrap(),
2890                &[ChannelDetails {
2891                    id: channel_id.to_proto(),
2892                    name: "test-channel".to_string()
2893                }]
2894            )
2895        });
2896        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2897            this.get_channel(channel_id.to_proto(), cx).unwrap()
2898        });
2899        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2900        channel_a
2901            .condition(&cx_a, |channel, _| {
2902                channel_messages(channel)
2903                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2904            })
2905            .await;
2906
2907        let channels_b = cx_b
2908            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2909        channels_b
2910            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2911            .await;
2912        channels_b.read_with(&cx_b, |list, _| {
2913            assert_eq!(
2914                list.available_channels().unwrap(),
2915                &[ChannelDetails {
2916                    id: channel_id.to_proto(),
2917                    name: "test-channel".to_string()
2918                }]
2919            )
2920        });
2921
2922        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2923            this.get_channel(channel_id.to_proto(), cx).unwrap()
2924        });
2925        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2926        channel_b
2927            .condition(&cx_b, |channel, _| {
2928                channel_messages(channel)
2929                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2930            })
2931            .await;
2932
2933        channel_a
2934            .update(&mut cx_a, |channel, cx| {
2935                channel
2936                    .send_message("oh, hi B.".to_string(), cx)
2937                    .unwrap()
2938                    .detach();
2939                let task = channel.send_message("sup".to_string(), cx).unwrap();
2940                assert_eq!(
2941                    channel_messages(channel),
2942                    &[
2943                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2944                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2945                        ("user_a".to_string(), "sup".to_string(), true)
2946                    ]
2947                );
2948                task
2949            })
2950            .await
2951            .unwrap();
2952
2953        channel_b
2954            .condition(&cx_b, |channel, _| {
2955                channel_messages(channel)
2956                    == [
2957                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2958                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2959                        ("user_a".to_string(), "sup".to_string(), false),
2960                    ]
2961            })
2962            .await;
2963
2964        assert_eq!(
2965            server
2966                .state()
2967                .await
2968                .channel(channel_id)
2969                .unwrap()
2970                .connection_ids
2971                .len(),
2972            2
2973        );
2974        cx_b.update(|_| drop(channel_b));
2975        server
2976            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
2977            .await;
2978
2979        cx_a.update(|_| drop(channel_a));
2980        server
2981            .condition(|state| state.channel(channel_id).is_none())
2982            .await;
2983    }
2984
2985    #[gpui::test]
2986    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
2987        cx_a.foreground().forbid_parking();
2988
2989        let mut server = TestServer::start(cx_a.foreground()).await;
2990        let client_a = server.create_client(&mut cx_a, "user_a").await;
2991
2992        let db = &server.app_state.db;
2993        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2994        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2995        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2996            .await
2997            .unwrap();
2998        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2999            .await
3000            .unwrap();
3001
3002        let channels_a = cx_a
3003            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3004        channels_a
3005            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3006            .await;
3007        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3008            this.get_channel(channel_id.to_proto(), cx).unwrap()
3009        });
3010
3011        // Messages aren't allowed to be too long.
3012        channel_a
3013            .update(&mut cx_a, |channel, cx| {
3014                let long_body = "this is long.\n".repeat(1024);
3015                channel.send_message(long_body, cx).unwrap()
3016            })
3017            .await
3018            .unwrap_err();
3019
3020        // Messages aren't allowed to be blank.
3021        channel_a.update(&mut cx_a, |channel, cx| {
3022            channel.send_message(String::new(), cx).unwrap_err()
3023        });
3024
3025        // Leading and trailing whitespace are trimmed.
3026        channel_a
3027            .update(&mut cx_a, |channel, cx| {
3028                channel
3029                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3030                    .unwrap()
3031            })
3032            .await
3033            .unwrap();
3034        assert_eq!(
3035            db.get_channel_messages(channel_id, 10, None)
3036                .await
3037                .unwrap()
3038                .iter()
3039                .map(|m| &m.body)
3040                .collect::<Vec<_>>(),
3041            &["surrounded by whitespace"]
3042        );
3043    }
3044
3045    #[gpui::test]
3046    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3047        cx_a.foreground().forbid_parking();
3048
3049        // Connect to a server as 2 clients.
3050        let mut server = TestServer::start(cx_a.foreground()).await;
3051        let client_a = server.create_client(&mut cx_a, "user_a").await;
3052        let client_b = server.create_client(&mut cx_b, "user_b").await;
3053        let mut status_b = client_b.status();
3054
3055        // Create an org that includes these 2 users.
3056        let db = &server.app_state.db;
3057        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3058        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3059            .await
3060            .unwrap();
3061        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3062            .await
3063            .unwrap();
3064
3065        // Create a channel that includes all the users.
3066        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3067        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3068            .await
3069            .unwrap();
3070        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3071            .await
3072            .unwrap();
3073        db.create_channel_message(
3074            channel_id,
3075            client_b.current_user_id(&cx_b),
3076            "hello A, it's B.",
3077            OffsetDateTime::now_utc(),
3078            2,
3079        )
3080        .await
3081        .unwrap();
3082
3083        let channels_a = cx_a
3084            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3085        channels_a
3086            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3087            .await;
3088
3089        channels_a.read_with(&cx_a, |list, _| {
3090            assert_eq!(
3091                list.available_channels().unwrap(),
3092                &[ChannelDetails {
3093                    id: channel_id.to_proto(),
3094                    name: "test-channel".to_string()
3095                }]
3096            )
3097        });
3098        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3099            this.get_channel(channel_id.to_proto(), cx).unwrap()
3100        });
3101        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3102        channel_a
3103            .condition(&cx_a, |channel, _| {
3104                channel_messages(channel)
3105                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3106            })
3107            .await;
3108
3109        let channels_b = cx_b
3110            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3111        channels_b
3112            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3113            .await;
3114        channels_b.read_with(&cx_b, |list, _| {
3115            assert_eq!(
3116                list.available_channels().unwrap(),
3117                &[ChannelDetails {
3118                    id: channel_id.to_proto(),
3119                    name: "test-channel".to_string()
3120                }]
3121            )
3122        });
3123
3124        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3125            this.get_channel(channel_id.to_proto(), cx).unwrap()
3126        });
3127        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3128        channel_b
3129            .condition(&cx_b, |channel, _| {
3130                channel_messages(channel)
3131                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3132            })
3133            .await;
3134
3135        // Disconnect client B, ensuring we can still access its cached channel data.
3136        server.forbid_connections();
3137        server.disconnect_client(client_b.current_user_id(&cx_b));
3138        while !matches!(
3139            status_b.next().await,
3140            Some(client::Status::ReconnectionError { .. })
3141        ) {}
3142
3143        channels_b.read_with(&cx_b, |channels, _| {
3144            assert_eq!(
3145                channels.available_channels().unwrap(),
3146                [ChannelDetails {
3147                    id: channel_id.to_proto(),
3148                    name: "test-channel".to_string()
3149                }]
3150            )
3151        });
3152        channel_b.read_with(&cx_b, |channel, _| {
3153            assert_eq!(
3154                channel_messages(channel),
3155                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3156            )
3157        });
3158
3159        // Send a message from client B while it is disconnected.
3160        channel_b
3161            .update(&mut cx_b, |channel, cx| {
3162                let task = channel
3163                    .send_message("can you see this?".to_string(), cx)
3164                    .unwrap();
3165                assert_eq!(
3166                    channel_messages(channel),
3167                    &[
3168                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3169                        ("user_b".to_string(), "can you see this?".to_string(), true)
3170                    ]
3171                );
3172                task
3173            })
3174            .await
3175            .unwrap_err();
3176
3177        // Send a message from client A while B is disconnected.
3178        channel_a
3179            .update(&mut cx_a, |channel, cx| {
3180                channel
3181                    .send_message("oh, hi B.".to_string(), cx)
3182                    .unwrap()
3183                    .detach();
3184                let task = channel.send_message("sup".to_string(), cx).unwrap();
3185                assert_eq!(
3186                    channel_messages(channel),
3187                    &[
3188                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3189                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3190                        ("user_a".to_string(), "sup".to_string(), true)
3191                    ]
3192                );
3193                task
3194            })
3195            .await
3196            .unwrap();
3197
3198        // Give client B a chance to reconnect.
3199        server.allow_connections();
3200        cx_b.foreground().advance_clock(Duration::from_secs(10));
3201
3202        // Verify that B sees the new messages upon reconnection, as well as the message client B
3203        // sent while offline.
3204        channel_b
3205            .condition(&cx_b, |channel, _| {
3206                channel_messages(channel)
3207                    == [
3208                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3209                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3210                        ("user_a".to_string(), "sup".to_string(), false),
3211                        ("user_b".to_string(), "can you see this?".to_string(), false),
3212                    ]
3213            })
3214            .await;
3215
3216        // Ensure client A and B can communicate normally after reconnection.
3217        channel_a
3218            .update(&mut cx_a, |channel, cx| {
3219                channel.send_message("you online?".to_string(), cx).unwrap()
3220            })
3221            .await
3222            .unwrap();
3223        channel_b
3224            .condition(&cx_b, |channel, _| {
3225                channel_messages(channel)
3226                    == [
3227                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3228                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3229                        ("user_a".to_string(), "sup".to_string(), false),
3230                        ("user_b".to_string(), "can you see this?".to_string(), false),
3231                        ("user_a".to_string(), "you online?".to_string(), false),
3232                    ]
3233            })
3234            .await;
3235
3236        channel_b
3237            .update(&mut cx_b, |channel, cx| {
3238                channel.send_message("yep".to_string(), cx).unwrap()
3239            })
3240            .await
3241            .unwrap();
3242        channel_a
3243            .condition(&cx_a, |channel, _| {
3244                channel_messages(channel)
3245                    == [
3246                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3247                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3248                        ("user_a".to_string(), "sup".to_string(), false),
3249                        ("user_b".to_string(), "can you see this?".to_string(), false),
3250                        ("user_a".to_string(), "you online?".to_string(), false),
3251                        ("user_b".to_string(), "yep".to_string(), false),
3252                    ]
3253            })
3254            .await;
3255    }
3256
3257    #[gpui::test]
3258    async fn test_contacts(
3259        mut cx_a: TestAppContext,
3260        mut cx_b: TestAppContext,
3261        mut cx_c: TestAppContext,
3262    ) {
3263        cx_a.foreground().forbid_parking();
3264        let lang_registry = Arc::new(LanguageRegistry::new());
3265        let fs = Arc::new(FakeFs::new(cx_a.background()));
3266
3267        // Connect to a server as 3 clients.
3268        let mut server = TestServer::start(cx_a.foreground()).await;
3269        let client_a = server.create_client(&mut cx_a, "user_a").await;
3270        let client_b = server.create_client(&mut cx_b, "user_b").await;
3271        let client_c = server.create_client(&mut cx_c, "user_c").await;
3272
3273        // Share a worktree as client A.
3274        fs.insert_tree(
3275            "/a",
3276            json!({
3277                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3278            }),
3279        )
3280        .await;
3281
3282        let project_a = cx_a.update(|cx| {
3283            Project::local(
3284                client_a.clone(),
3285                client_a.user_store.clone(),
3286                lang_registry.clone(),
3287                fs.clone(),
3288                cx,
3289            )
3290        });
3291        let (worktree_a, _) = project_a
3292            .update(&mut cx_a, |p, cx| {
3293                p.find_or_create_local_worktree("/a", false, cx)
3294            })
3295            .await
3296            .unwrap();
3297        worktree_a
3298            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3299            .await;
3300
3301        client_a
3302            .user_store
3303            .condition(&cx_a, |user_store, _| {
3304                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3305            })
3306            .await;
3307        client_b
3308            .user_store
3309            .condition(&cx_b, |user_store, _| {
3310                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3311            })
3312            .await;
3313        client_c
3314            .user_store
3315            .condition(&cx_c, |user_store, _| {
3316                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3317            })
3318            .await;
3319
3320        let project_id = project_a
3321            .update(&mut cx_a, |project, _| project.next_remote_id())
3322            .await;
3323        project_a
3324            .update(&mut cx_a, |project, cx| project.share(cx))
3325            .await
3326            .unwrap();
3327
3328        let _project_b = Project::remote(
3329            project_id,
3330            client_b.clone(),
3331            client_b.user_store.clone(),
3332            lang_registry.clone(),
3333            fs.clone(),
3334            &mut cx_b.to_async(),
3335        )
3336        .await
3337        .unwrap();
3338
3339        client_a
3340            .user_store
3341            .condition(&cx_a, |user_store, _| {
3342                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3343            })
3344            .await;
3345        client_b
3346            .user_store
3347            .condition(&cx_b, |user_store, _| {
3348                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3349            })
3350            .await;
3351        client_c
3352            .user_store
3353            .condition(&cx_c, |user_store, _| {
3354                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3355            })
3356            .await;
3357
3358        project_a
3359            .condition(&cx_a, |project, _| {
3360                project.collaborators().contains_key(&client_b.peer_id)
3361            })
3362            .await;
3363
3364        cx_a.update(move |_| drop(project_a));
3365        client_a
3366            .user_store
3367            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3368            .await;
3369        client_b
3370            .user_store
3371            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3372            .await;
3373        client_c
3374            .user_store
3375            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3376            .await;
3377
3378        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3379            user_store
3380                .contacts()
3381                .iter()
3382                .map(|contact| {
3383                    let worktrees = contact
3384                        .projects
3385                        .iter()
3386                        .map(|p| {
3387                            (
3388                                p.worktree_root_names[0].as_str(),
3389                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3390                            )
3391                        })
3392                        .collect();
3393                    (contact.user.github_login.as_str(), worktrees)
3394                })
3395                .collect()
3396        }
3397    }
3398
3399    struct TestServer {
3400        peer: Arc<Peer>,
3401        app_state: Arc<AppState>,
3402        server: Arc<Server>,
3403        foreground: Rc<executor::Foreground>,
3404        notifications: mpsc::Receiver<()>,
3405        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3406        forbid_connections: Arc<AtomicBool>,
3407        _test_db: TestDb,
3408    }
3409
3410    impl TestServer {
3411        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3412            let test_db = TestDb::new();
3413            let app_state = Self::build_app_state(&test_db).await;
3414            let peer = Peer::new();
3415            let notifications = mpsc::channel(128);
3416            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3417            Self {
3418                peer,
3419                app_state,
3420                server,
3421                foreground,
3422                notifications: notifications.1,
3423                connection_killers: Default::default(),
3424                forbid_connections: Default::default(),
3425                _test_db: test_db,
3426            }
3427        }
3428
3429        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3430            let http = FakeHttpClient::with_404_response();
3431            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3432            let client_name = name.to_string();
3433            let mut client = Client::new(http.clone());
3434            let server = self.server.clone();
3435            let connection_killers = self.connection_killers.clone();
3436            let forbid_connections = self.forbid_connections.clone();
3437            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3438
3439            Arc::get_mut(&mut client)
3440                .unwrap()
3441                .override_authenticate(move |cx| {
3442                    cx.spawn(|_| async move {
3443                        let access_token = "the-token".to_string();
3444                        Ok(Credentials {
3445                            user_id: user_id.0 as u64,
3446                            access_token,
3447                        })
3448                    })
3449                })
3450                .override_establish_connection(move |credentials, cx| {
3451                    assert_eq!(credentials.user_id, user_id.0 as u64);
3452                    assert_eq!(credentials.access_token, "the-token");
3453
3454                    let server = server.clone();
3455                    let connection_killers = connection_killers.clone();
3456                    let forbid_connections = forbid_connections.clone();
3457                    let client_name = client_name.clone();
3458                    let connection_id_tx = connection_id_tx.clone();
3459                    cx.spawn(move |cx| async move {
3460                        if forbid_connections.load(SeqCst) {
3461                            Err(EstablishConnectionError::other(anyhow!(
3462                                "server is forbidding connections"
3463                            )))
3464                        } else {
3465                            let (client_conn, server_conn, kill_conn) =
3466                                Connection::in_memory(cx.background());
3467                            connection_killers.lock().insert(user_id, kill_conn);
3468                            cx.background()
3469                                .spawn(server.handle_connection(
3470                                    server_conn,
3471                                    client_name,
3472                                    user_id,
3473                                    Some(connection_id_tx),
3474                                ))
3475                                .detach();
3476                            Ok(client_conn)
3477                        }
3478                    })
3479                });
3480
3481            client
3482                .authenticate_and_connect(&cx.to_async())
3483                .await
3484                .unwrap();
3485
3486            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3487            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3488            let mut authed_user =
3489                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3490            while authed_user.next().await.unwrap().is_none() {}
3491
3492            TestClient {
3493                client,
3494                peer_id,
3495                user_store,
3496            }
3497        }
3498
3499        fn disconnect_client(&self, user_id: UserId) {
3500            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3501                let _ = kill_conn.try_send(Some(()));
3502            }
3503        }
3504
3505        fn forbid_connections(&self) {
3506            self.forbid_connections.store(true, SeqCst);
3507        }
3508
3509        fn allow_connections(&self) {
3510            self.forbid_connections.store(false, SeqCst);
3511        }
3512
3513        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3514            let mut config = Config::default();
3515            config.session_secret = "a".repeat(32);
3516            config.database_url = test_db.url.clone();
3517            let github_client = github::AppClient::test();
3518            Arc::new(AppState {
3519                db: test_db.db().clone(),
3520                handlebars: Default::default(),
3521                auth_client: auth::build_client("", ""),
3522                repo_client: github::RepoClient::test(&github_client),
3523                github_client,
3524                config,
3525            })
3526        }
3527
3528        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3529            self.server.store.read()
3530        }
3531
3532        async fn condition<F>(&mut self, mut predicate: F)
3533        where
3534            F: FnMut(&Store) -> bool,
3535        {
3536            async_std::future::timeout(Duration::from_millis(500), async {
3537                while !(predicate)(&*self.server.store.read()) {
3538                    self.foreground.start_waiting();
3539                    self.notifications.next().await;
3540                    self.foreground.finish_waiting();
3541                }
3542            })
3543            .await
3544            .expect("condition timed out");
3545        }
3546    }
3547
3548    impl Drop for TestServer {
3549        fn drop(&mut self) {
3550            self.peer.reset();
3551        }
3552    }
3553
3554    struct TestClient {
3555        client: Arc<Client>,
3556        pub peer_id: PeerId,
3557        pub user_store: ModelHandle<UserStore>,
3558    }
3559
3560    impl Deref for TestClient {
3561        type Target = Arc<Client>;
3562
3563        fn deref(&self) -> &Self::Target {
3564            &self.client
3565        }
3566    }
3567
3568    impl TestClient {
3569        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3570            UserId::from_proto(
3571                self.user_store
3572                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3573            )
3574        }
3575    }
3576
3577    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3578        channel
3579            .messages()
3580            .cursor::<()>()
3581            .map(|m| {
3582                (
3583                    m.sender.github_login.clone(),
3584                    m.body.clone(),
3585                    m.is_pending(),
3586                )
3587            })
3588            .collect()
3589    }
3590
3591    struct EmptyView;
3592
3593    impl gpui::Entity for EmptyView {
3594        type Event = ();
3595    }
3596
3597    impl gpui::View for EmptyView {
3598        fn ui_name() -> &'static str {
3599            "empty view"
3600        }
3601
3602        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3603            gpui::Element::boxed(gpui::elements::Empty)
3604        }
3605    }
3606}