rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_code_actions)
  89            .add_handler(Server::apply_code_action)
  90            .add_handler(Server::get_channels)
  91            .add_handler(Server::get_users)
  92            .add_handler(Server::join_channel)
  93            .add_handler(Server::leave_channel)
  94            .add_handler(Server::send_channel_message)
  95            .add_handler(Server::get_channel_messages);
  96
  97        Arc::new(server)
  98    }
  99
 100    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 101    where
 102        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 103        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 104        M: EnvelopedMessage,
 105    {
 106        let prev_handler = self.handlers.insert(
 107            TypeId::of::<M>(),
 108            Box::new(move |server, envelope| {
 109                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 110                (handler)(server, *envelope).boxed()
 111            }),
 112        );
 113        if prev_handler.is_some() {
 114            panic!("registered a handler for the same message twice");
 115        }
 116        self
 117    }
 118
 119    pub fn handle_connection(
 120        self: &Arc<Self>,
 121        connection: Connection,
 122        addr: String,
 123        user_id: UserId,
 124        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 125    ) -> impl Future<Output = ()> {
 126        let mut this = self.clone();
 127        async move {
 128            let (connection_id, handle_io, mut incoming_rx) =
 129                this.peer.add_connection(connection).await;
 130
 131            if let Some(send_connection_id) = send_connection_id.as_mut() {
 132                let _ = send_connection_id.send(connection_id).await;
 133            }
 134
 135            this.state_mut().add_connection(connection_id, user_id);
 136            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 137                log::error!("error updating contacts for {:?}: {}", user_id, err);
 138            }
 139
 140            let handle_io = handle_io.fuse();
 141            futures::pin_mut!(handle_io);
 142            loop {
 143                let next_message = incoming_rx.next().fuse();
 144                futures::pin_mut!(next_message);
 145                futures::select_biased! {
 146                    result = handle_io => {
 147                        if let Err(err) = result {
 148                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 149                        }
 150                        break;
 151                    }
 152                    message = next_message => {
 153                        if let Some(message) = message {
 154                            let start_time = Instant::now();
 155                            let type_name = message.payload_type_name();
 156                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 157                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 158                                if let Err(err) = (handler)(this.clone(), message).await {
 159                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 160                                } else {
 161                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 162                                }
 163
 164                                if let Some(mut notifications) = this.notifications.clone() {
 165                                    let _ = notifications.send(()).await;
 166                                }
 167                            } else {
 168                                log::warn!("unhandled message: {}", type_name);
 169                            }
 170                        } else {
 171                            log::info!("rpc connection closed {:?}", addr);
 172                            break;
 173                        }
 174                    }
 175                }
 176            }
 177
 178            if let Err(err) = this.sign_out(connection_id).await {
 179                log::error!("error signing out connection {:?} - {:?}", addr, err);
 180            }
 181        }
 182    }
 183
 184    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 185        self.peer.disconnect(connection_id);
 186        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 187
 188        for (project_id, project) in removed_connection.hosted_projects {
 189            if let Some(share) = project.share {
 190                broadcast(
 191                    connection_id,
 192                    share.guests.keys().copied().collect(),
 193                    |conn_id| {
 194                        self.peer
 195                            .send(conn_id, proto::UnshareProject { project_id })
 196                    },
 197                )?;
 198            }
 199        }
 200
 201        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 202            broadcast(connection_id, peer_ids, |conn_id| {
 203                self.peer.send(
 204                    conn_id,
 205                    proto::RemoveProjectCollaborator {
 206                        project_id,
 207                        peer_id: connection_id.0,
 208                    },
 209                )
 210            })?;
 211        }
 212
 213        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 214        Ok(())
 215    }
 216
 217    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 218        self.peer.respond(request.receipt(), proto::Ack {})?;
 219        Ok(())
 220    }
 221
 222    async fn register_project(
 223        mut self: Arc<Server>,
 224        request: TypedEnvelope<proto::RegisterProject>,
 225    ) -> tide::Result<()> {
 226        let project_id = {
 227            let mut state = self.state_mut();
 228            let user_id = state.user_id_for_connection(request.sender_id)?;
 229            state.register_project(request.sender_id, user_id)
 230        };
 231        self.peer.respond(
 232            request.receipt(),
 233            proto::RegisterProjectResponse { project_id },
 234        )?;
 235        Ok(())
 236    }
 237
 238    async fn unregister_project(
 239        mut self: Arc<Server>,
 240        request: TypedEnvelope<proto::UnregisterProject>,
 241    ) -> tide::Result<()> {
 242        let project = self
 243            .state_mut()
 244            .unregister_project(request.payload.project_id, request.sender_id)
 245            .ok_or_else(|| anyhow!("no such project"))?;
 246        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 247        Ok(())
 248    }
 249
 250    async fn share_project(
 251        mut self: Arc<Server>,
 252        request: TypedEnvelope<proto::ShareProject>,
 253    ) -> tide::Result<()> {
 254        self.state_mut()
 255            .share_project(request.payload.project_id, request.sender_id);
 256        self.peer.respond(request.receipt(), proto::Ack {})?;
 257        Ok(())
 258    }
 259
 260    async fn unshare_project(
 261        mut self: Arc<Server>,
 262        request: TypedEnvelope<proto::UnshareProject>,
 263    ) -> tide::Result<()> {
 264        let project_id = request.payload.project_id;
 265        let project = self
 266            .state_mut()
 267            .unshare_project(project_id, request.sender_id)?;
 268
 269        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 270            self.peer
 271                .send(conn_id, proto::UnshareProject { project_id })
 272        })?;
 273        self.update_contacts_for_users(&project.authorized_user_ids)?;
 274        Ok(())
 275    }
 276
 277    async fn join_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::JoinProject>,
 280    ) -> tide::Result<()> {
 281        let project_id = request.payload.project_id;
 282
 283        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 284        let response_data = self
 285            .state_mut()
 286            .join_project(request.sender_id, user_id, project_id)
 287            .and_then(|joined| {
 288                let share = joined.project.share()?;
 289                let peer_count = share.guests.len();
 290                let mut collaborators = Vec::with_capacity(peer_count);
 291                collaborators.push(proto::Collaborator {
 292                    peer_id: joined.project.host_connection_id.0,
 293                    replica_id: 0,
 294                    user_id: joined.project.host_user_id.to_proto(),
 295                });
 296                let worktrees = joined
 297                    .project
 298                    .worktrees
 299                    .iter()
 300                    .filter_map(|(id, worktree)| {
 301                        worktree.share.as_ref().map(|share| proto::Worktree {
 302                            id: *id,
 303                            root_name: worktree.root_name.clone(),
 304                            entries: share.entries.values().cloned().collect(),
 305                            diagnostic_summaries: share
 306                                .diagnostic_summaries
 307                                .values()
 308                                .cloned()
 309                                .collect(),
 310                            weak: worktree.weak,
 311                        })
 312                    })
 313                    .collect();
 314                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 315                    if *peer_conn_id != request.sender_id {
 316                        collaborators.push(proto::Collaborator {
 317                            peer_id: peer_conn_id.0,
 318                            replica_id: *peer_replica_id as u32,
 319                            user_id: peer_user_id.to_proto(),
 320                        });
 321                    }
 322                }
 323                let response = proto::JoinProjectResponse {
 324                    worktrees,
 325                    replica_id: joined.replica_id as u32,
 326                    collaborators,
 327                };
 328                let connection_ids = joined.project.connection_ids();
 329                let contact_user_ids = joined.project.authorized_user_ids();
 330                Ok((response, connection_ids, contact_user_ids))
 331            });
 332
 333        match response_data {
 334            Ok((response, connection_ids, contact_user_ids)) => {
 335                broadcast(request.sender_id, connection_ids, |conn_id| {
 336                    self.peer.send(
 337                        conn_id,
 338                        proto::AddProjectCollaborator {
 339                            project_id,
 340                            collaborator: Some(proto::Collaborator {
 341                                peer_id: request.sender_id.0,
 342                                replica_id: response.replica_id,
 343                                user_id: user_id.to_proto(),
 344                            }),
 345                        },
 346                    )
 347                })?;
 348                self.peer.respond(request.receipt(), response)?;
 349                self.update_contacts_for_users(&contact_user_ids)?;
 350            }
 351            Err(error) => {
 352                self.peer.respond_with_error(
 353                    request.receipt(),
 354                    proto::Error {
 355                        message: error.to_string(),
 356                    },
 357                )?;
 358            }
 359        }
 360
 361        Ok(())
 362    }
 363
 364    async fn leave_project(
 365        mut self: Arc<Server>,
 366        request: TypedEnvelope<proto::LeaveProject>,
 367    ) -> tide::Result<()> {
 368        let sender_id = request.sender_id;
 369        let project_id = request.payload.project_id;
 370        let worktree = self.state_mut().leave_project(sender_id, project_id);
 371        if let Some(worktree) = worktree {
 372            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 373                self.peer.send(
 374                    conn_id,
 375                    proto::RemoveProjectCollaborator {
 376                        project_id,
 377                        peer_id: sender_id.0,
 378                    },
 379                )
 380            })?;
 381            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 382        }
 383        Ok(())
 384    }
 385
 386    async fn register_worktree(
 387        mut self: Arc<Server>,
 388        request: TypedEnvelope<proto::RegisterWorktree>,
 389    ) -> tide::Result<()> {
 390        let receipt = request.receipt();
 391        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 392
 393        let mut contact_user_ids = HashSet::default();
 394        contact_user_ids.insert(host_user_id);
 395        for github_login in request.payload.authorized_logins {
 396            match self.app_state.db.create_user(&github_login, false).await {
 397                Ok(contact_user_id) => {
 398                    contact_user_ids.insert(contact_user_id);
 399                }
 400                Err(err) => {
 401                    let message = err.to_string();
 402                    self.peer
 403                        .respond_with_error(receipt, proto::Error { message })?;
 404                    return Ok(());
 405                }
 406            }
 407        }
 408
 409        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 410        let ok = self.state_mut().register_worktree(
 411            request.payload.project_id,
 412            request.payload.worktree_id,
 413            Worktree {
 414                authorized_user_ids: contact_user_ids.clone(),
 415                root_name: request.payload.root_name,
 416                share: None,
 417                weak: false,
 418            },
 419        );
 420
 421        if ok {
 422            self.peer.respond(receipt, proto::Ack {})?;
 423            self.update_contacts_for_users(&contact_user_ids)?;
 424        } else {
 425            self.peer.respond_with_error(
 426                receipt,
 427                proto::Error {
 428                    message: NO_SUCH_PROJECT.to_string(),
 429                },
 430            )?;
 431        }
 432
 433        Ok(())
 434    }
 435
 436    async fn unregister_worktree(
 437        mut self: Arc<Server>,
 438        request: TypedEnvelope<proto::UnregisterWorktree>,
 439    ) -> tide::Result<()> {
 440        let project_id = request.payload.project_id;
 441        let worktree_id = request.payload.worktree_id;
 442        let (worktree, guest_connection_ids) =
 443            self.state_mut()
 444                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 445        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 446            self.peer.send(
 447                conn_id,
 448                proto::UnregisterWorktree {
 449                    project_id,
 450                    worktree_id,
 451                },
 452            )
 453        })?;
 454        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 455        Ok(())
 456    }
 457
 458    async fn share_worktree(
 459        mut self: Arc<Server>,
 460        mut request: TypedEnvelope<proto::ShareWorktree>,
 461    ) -> tide::Result<()> {
 462        let worktree = request
 463            .payload
 464            .worktree
 465            .as_mut()
 466            .ok_or_else(|| anyhow!("missing worktree"))?;
 467        let entries = worktree
 468            .entries
 469            .iter()
 470            .map(|entry| (entry.id, entry.clone()))
 471            .collect();
 472        let diagnostic_summaries = worktree
 473            .diagnostic_summaries
 474            .iter()
 475            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 476            .collect();
 477
 478        let shared_worktree = self.state_mut().share_worktree(
 479            request.payload.project_id,
 480            worktree.id,
 481            request.sender_id,
 482            entries,
 483            diagnostic_summaries,
 484        );
 485        if let Some(shared_worktree) = shared_worktree {
 486            broadcast(
 487                request.sender_id,
 488                shared_worktree.connection_ids,
 489                |connection_id| {
 490                    self.peer.forward_send(
 491                        request.sender_id,
 492                        connection_id,
 493                        request.payload.clone(),
 494                    )
 495                },
 496            )?;
 497            self.peer.respond(request.receipt(), proto::Ack {})?;
 498            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 499        } else {
 500            self.peer.respond_with_error(
 501                request.receipt(),
 502                proto::Error {
 503                    message: "no such worktree".to_string(),
 504                },
 505            )?;
 506        }
 507        Ok(())
 508    }
 509
 510    async fn update_worktree(
 511        mut self: Arc<Server>,
 512        request: TypedEnvelope<proto::UpdateWorktree>,
 513    ) -> tide::Result<()> {
 514        let connection_ids = self
 515            .state_mut()
 516            .update_worktree(
 517                request.sender_id,
 518                request.payload.project_id,
 519                request.payload.worktree_id,
 520                &request.payload.removed_entries,
 521                &request.payload.updated_entries,
 522            )
 523            .ok_or_else(|| anyhow!("no such worktree"))?;
 524
 525        broadcast(request.sender_id, connection_ids, |connection_id| {
 526            self.peer
 527                .forward_send(request.sender_id, connection_id, request.payload.clone())
 528        })?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_diagnostic_summary(
 534        mut self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = request
 538            .payload
 539            .summary
 540            .clone()
 541            .and_then(|summary| {
 542                self.state_mut().update_diagnostic_summary(
 543                    request.payload.project_id,
 544                    request.payload.worktree_id,
 545                    request.sender_id,
 546                    summary,
 547                )
 548            })
 549            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 550
 551        broadcast(request.sender_id, receiver_ids, |connection_id| {
 552            self.peer
 553                .forward_send(request.sender_id, connection_id, request.payload.clone())
 554        })?;
 555        Ok(())
 556    }
 557
 558    async fn disk_based_diagnostics_updating(
 559        self: Arc<Server>,
 560        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 561    ) -> tide::Result<()> {
 562        let receiver_ids = self
 563            .state()
 564            .project_connection_ids(request.payload.project_id, request.sender_id)
 565            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn disk_based_diagnostics_updated(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 576    ) -> tide::Result<()> {
 577        let receiver_ids = self
 578            .state()
 579            .project_connection_ids(request.payload.project_id, request.sender_id)
 580            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 581        broadcast(request.sender_id, receiver_ids, |connection_id| {
 582            self.peer
 583                .forward_send(request.sender_id, connection_id, request.payload.clone())
 584        })?;
 585        Ok(())
 586    }
 587
 588    async fn get_definition(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::GetDefinition>,
 591    ) -> tide::Result<()> {
 592        let receipt = request.receipt();
 593        let host_connection_id = self
 594            .state()
 595            .read_project(request.payload.project_id, request.sender_id)
 596            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 597            .host_connection_id;
 598        let response = self
 599            .peer
 600            .forward_request(request.sender_id, host_connection_id, request.payload)
 601            .await?;
 602        self.peer.respond(receipt, response)?;
 603        Ok(())
 604    }
 605
 606    async fn open_buffer(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::OpenBuffer>,
 609    ) -> tide::Result<()> {
 610        let receipt = request.receipt();
 611        let host_connection_id = self
 612            .state()
 613            .read_project(request.payload.project_id, request.sender_id)
 614            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 615            .host_connection_id;
 616        let response = self
 617            .peer
 618            .forward_request(request.sender_id, host_connection_id, request.payload)
 619            .await?;
 620        self.peer.respond(receipt, response)?;
 621        Ok(())
 622    }
 623
 624    async fn close_buffer(
 625        self: Arc<Server>,
 626        request: TypedEnvelope<proto::CloseBuffer>,
 627    ) -> tide::Result<()> {
 628        let host_connection_id = self
 629            .state()
 630            .read_project(request.payload.project_id, request.sender_id)
 631            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 632            .host_connection_id;
 633        self.peer
 634            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 635        Ok(())
 636    }
 637
 638    async fn save_buffer(
 639        self: Arc<Server>,
 640        request: TypedEnvelope<proto::SaveBuffer>,
 641    ) -> tide::Result<()> {
 642        let host;
 643        let guests;
 644        {
 645            let state = self.state();
 646            let project = state
 647                .read_project(request.payload.project_id, request.sender_id)
 648                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 649            host = project.host_connection_id;
 650            guests = project.guest_connection_ids()
 651        }
 652
 653        let sender = request.sender_id;
 654        let receipt = request.receipt();
 655        let response = self
 656            .peer
 657            .forward_request(sender, host, request.payload.clone())
 658            .await?;
 659
 660        broadcast(host, guests, |conn_id| {
 661            let response = response.clone();
 662            if conn_id == sender {
 663                self.peer.respond(receipt, response)
 664            } else {
 665                self.peer.forward_send(host, conn_id, response)
 666            }
 667        })?;
 668
 669        Ok(())
 670    }
 671
 672    async fn format_buffer(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::FormatBuffer>,
 675    ) -> tide::Result<()> {
 676        let host;
 677        {
 678            let state = self.state();
 679            let project = state
 680                .read_project(request.payload.project_id, request.sender_id)
 681                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 682            host = project.host_connection_id;
 683        }
 684
 685        let sender = request.sender_id;
 686        let receipt = request.receipt();
 687        let response = self
 688            .peer
 689            .forward_request(sender, host, request.payload.clone())
 690            .await?;
 691        self.peer.respond(receipt, response)?;
 692
 693        Ok(())
 694    }
 695
 696    async fn get_completions(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::GetCompletions>,
 699    ) -> tide::Result<()> {
 700        let host;
 701        {
 702            let state = self.state();
 703            let project = state
 704                .read_project(request.payload.project_id, request.sender_id)
 705                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 706            host = project.host_connection_id;
 707        }
 708
 709        let sender = request.sender_id;
 710        let receipt = request.receipt();
 711        let response = self
 712            .peer
 713            .forward_request(sender, host, request.payload.clone())
 714            .await?;
 715        self.peer.respond(receipt, response)?;
 716        Ok(())
 717    }
 718
 719    async fn apply_additional_edits_for_completion(
 720        self: Arc<Server>,
 721        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 722    ) -> tide::Result<()> {
 723        let host;
 724        {
 725            let state = self.state();
 726            let project = state
 727                .read_project(request.payload.project_id, request.sender_id)
 728                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 729            host = project.host_connection_id;
 730        }
 731
 732        let sender = request.sender_id;
 733        let receipt = request.receipt();
 734        let response = self
 735            .peer
 736            .forward_request(sender, host, request.payload.clone())
 737            .await?;
 738        self.peer.respond(receipt, response)?;
 739        Ok(())
 740    }
 741
 742    async fn get_code_actions(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::GetCodeActions>,
 745    ) -> tide::Result<()> {
 746        let host;
 747        {
 748            let state = self.state();
 749            let project = state
 750                .read_project(request.payload.project_id, request.sender_id)
 751                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 752            host = project.host_connection_id;
 753        }
 754
 755        let sender = request.sender_id;
 756        let receipt = request.receipt();
 757        let response = self
 758            .peer
 759            .forward_request(sender, host, request.payload.clone())
 760            .await?;
 761        self.peer.respond(receipt, response)?;
 762        Ok(())
 763    }
 764
 765    async fn apply_code_action(
 766        self: Arc<Server>,
 767        request: TypedEnvelope<proto::ApplyCodeAction>,
 768    ) -> tide::Result<()> {
 769        let host;
 770        {
 771            let state = self.state();
 772            let project = state
 773                .read_project(request.payload.project_id, request.sender_id)
 774                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 775            host = project.host_connection_id;
 776        }
 777
 778        let sender = request.sender_id;
 779        let receipt = request.receipt();
 780        let response = self
 781            .peer
 782            .forward_request(sender, host, request.payload.clone())
 783            .await?;
 784        self.peer.respond(receipt, response)?;
 785        Ok(())
 786    }
 787
 788    async fn update_buffer(
 789        self: Arc<Server>,
 790        request: TypedEnvelope<proto::UpdateBuffer>,
 791    ) -> tide::Result<()> {
 792        let receiver_ids = self
 793            .state()
 794            .project_connection_ids(request.payload.project_id, request.sender_id)
 795            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 796        broadcast(request.sender_id, receiver_ids, |connection_id| {
 797            self.peer
 798                .forward_send(request.sender_id, connection_id, request.payload.clone())
 799        })?;
 800        self.peer.respond(request.receipt(), proto::Ack {})?;
 801        Ok(())
 802    }
 803
 804    async fn update_buffer_file(
 805        self: Arc<Server>,
 806        request: TypedEnvelope<proto::UpdateBufferFile>,
 807    ) -> tide::Result<()> {
 808        let receiver_ids = self
 809            .state()
 810            .project_connection_ids(request.payload.project_id, request.sender_id)
 811            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 812        broadcast(request.sender_id, receiver_ids, |connection_id| {
 813            self.peer
 814                .forward_send(request.sender_id, connection_id, request.payload.clone())
 815        })?;
 816        Ok(())
 817    }
 818
 819    async fn buffer_reloaded(
 820        self: Arc<Server>,
 821        request: TypedEnvelope<proto::BufferReloaded>,
 822    ) -> tide::Result<()> {
 823        let receiver_ids = self
 824            .state()
 825            .project_connection_ids(request.payload.project_id, request.sender_id)
 826            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 827        broadcast(request.sender_id, receiver_ids, |connection_id| {
 828            self.peer
 829                .forward_send(request.sender_id, connection_id, request.payload.clone())
 830        })?;
 831        Ok(())
 832    }
 833
 834    async fn buffer_saved(
 835        self: Arc<Server>,
 836        request: TypedEnvelope<proto::BufferSaved>,
 837    ) -> tide::Result<()> {
 838        let receiver_ids = self
 839            .state()
 840            .project_connection_ids(request.payload.project_id, request.sender_id)
 841            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 842        broadcast(request.sender_id, receiver_ids, |connection_id| {
 843            self.peer
 844                .forward_send(request.sender_id, connection_id, request.payload.clone())
 845        })?;
 846        Ok(())
 847    }
 848
 849    async fn get_channels(
 850        self: Arc<Server>,
 851        request: TypedEnvelope<proto::GetChannels>,
 852    ) -> tide::Result<()> {
 853        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 854        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 855        self.peer.respond(
 856            request.receipt(),
 857            proto::GetChannelsResponse {
 858                channels: channels
 859                    .into_iter()
 860                    .map(|chan| proto::Channel {
 861                        id: chan.id.to_proto(),
 862                        name: chan.name,
 863                    })
 864                    .collect(),
 865            },
 866        )?;
 867        Ok(())
 868    }
 869
 870    async fn get_users(
 871        self: Arc<Server>,
 872        request: TypedEnvelope<proto::GetUsers>,
 873    ) -> tide::Result<()> {
 874        let receipt = request.receipt();
 875        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 876        let users = self
 877            .app_state
 878            .db
 879            .get_users_by_ids(user_ids)
 880            .await?
 881            .into_iter()
 882            .map(|user| proto::User {
 883                id: user.id.to_proto(),
 884                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 885                github_login: user.github_login,
 886            })
 887            .collect();
 888        self.peer
 889            .respond(receipt, proto::GetUsersResponse { users })?;
 890        Ok(())
 891    }
 892
 893    fn update_contacts_for_users<'a>(
 894        self: &Arc<Server>,
 895        user_ids: impl IntoIterator<Item = &'a UserId>,
 896    ) -> anyhow::Result<()> {
 897        let mut result = Ok(());
 898        let state = self.state();
 899        for user_id in user_ids {
 900            let contacts = state.contacts_for_user(*user_id);
 901            for connection_id in state.connection_ids_for_user(*user_id) {
 902                if let Err(error) = self.peer.send(
 903                    connection_id,
 904                    proto::UpdateContacts {
 905                        contacts: contacts.clone(),
 906                    },
 907                ) {
 908                    result = Err(error);
 909                }
 910            }
 911        }
 912        result
 913    }
 914
 915    async fn join_channel(
 916        mut self: Arc<Self>,
 917        request: TypedEnvelope<proto::JoinChannel>,
 918    ) -> tide::Result<()> {
 919        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 920        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 921        if !self
 922            .app_state
 923            .db
 924            .can_user_access_channel(user_id, channel_id)
 925            .await?
 926        {
 927            Err(anyhow!("access denied"))?;
 928        }
 929
 930        self.state_mut().join_channel(request.sender_id, channel_id);
 931        let messages = self
 932            .app_state
 933            .db
 934            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 935            .await?
 936            .into_iter()
 937            .map(|msg| proto::ChannelMessage {
 938                id: msg.id.to_proto(),
 939                body: msg.body,
 940                timestamp: msg.sent_at.unix_timestamp() as u64,
 941                sender_id: msg.sender_id.to_proto(),
 942                nonce: Some(msg.nonce.as_u128().into()),
 943            })
 944            .collect::<Vec<_>>();
 945        self.peer.respond(
 946            request.receipt(),
 947            proto::JoinChannelResponse {
 948                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 949                messages,
 950            },
 951        )?;
 952        Ok(())
 953    }
 954
 955    async fn leave_channel(
 956        mut self: Arc<Self>,
 957        request: TypedEnvelope<proto::LeaveChannel>,
 958    ) -> tide::Result<()> {
 959        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 960        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 961        if !self
 962            .app_state
 963            .db
 964            .can_user_access_channel(user_id, channel_id)
 965            .await?
 966        {
 967            Err(anyhow!("access denied"))?;
 968        }
 969
 970        self.state_mut()
 971            .leave_channel(request.sender_id, channel_id);
 972
 973        Ok(())
 974    }
 975
 976    async fn send_channel_message(
 977        self: Arc<Self>,
 978        request: TypedEnvelope<proto::SendChannelMessage>,
 979    ) -> tide::Result<()> {
 980        let receipt = request.receipt();
 981        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 982        let user_id;
 983        let connection_ids;
 984        {
 985            let state = self.state();
 986            user_id = state.user_id_for_connection(request.sender_id)?;
 987            if let Some(ids) = state.channel_connection_ids(channel_id) {
 988                connection_ids = ids;
 989            } else {
 990                return Ok(());
 991            }
 992        }
 993
 994        // Validate the message body.
 995        let body = request.payload.body.trim().to_string();
 996        if body.len() > MAX_MESSAGE_LEN {
 997            self.peer.respond_with_error(
 998                receipt,
 999                proto::Error {
1000                    message: "message is too long".to_string(),
1001                },
1002            )?;
1003            return Ok(());
1004        }
1005        if body.is_empty() {
1006            self.peer.respond_with_error(
1007                receipt,
1008                proto::Error {
1009                    message: "message can't be blank".to_string(),
1010                },
1011            )?;
1012            return Ok(());
1013        }
1014
1015        let timestamp = OffsetDateTime::now_utc();
1016        let nonce = if let Some(nonce) = request.payload.nonce {
1017            nonce
1018        } else {
1019            self.peer.respond_with_error(
1020                receipt,
1021                proto::Error {
1022                    message: "nonce can't be blank".to_string(),
1023                },
1024            )?;
1025            return Ok(());
1026        };
1027
1028        let message_id = self
1029            .app_state
1030            .db
1031            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1032            .await?
1033            .to_proto();
1034        let message = proto::ChannelMessage {
1035            sender_id: user_id.to_proto(),
1036            id: message_id,
1037            body,
1038            timestamp: timestamp.unix_timestamp() as u64,
1039            nonce: Some(nonce),
1040        };
1041        broadcast(request.sender_id, connection_ids, |conn_id| {
1042            self.peer.send(
1043                conn_id,
1044                proto::ChannelMessageSent {
1045                    channel_id: channel_id.to_proto(),
1046                    message: Some(message.clone()),
1047                },
1048            )
1049        })?;
1050        self.peer.respond(
1051            receipt,
1052            proto::SendChannelMessageResponse {
1053                message: Some(message),
1054            },
1055        )?;
1056        Ok(())
1057    }
1058
1059    async fn get_channel_messages(
1060        self: Arc<Self>,
1061        request: TypedEnvelope<proto::GetChannelMessages>,
1062    ) -> tide::Result<()> {
1063        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1064        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1065        if !self
1066            .app_state
1067            .db
1068            .can_user_access_channel(user_id, channel_id)
1069            .await?
1070        {
1071            Err(anyhow!("access denied"))?;
1072        }
1073
1074        let messages = self
1075            .app_state
1076            .db
1077            .get_channel_messages(
1078                channel_id,
1079                MESSAGE_COUNT_PER_PAGE,
1080                Some(MessageId::from_proto(request.payload.before_message_id)),
1081            )
1082            .await?
1083            .into_iter()
1084            .map(|msg| proto::ChannelMessage {
1085                id: msg.id.to_proto(),
1086                body: msg.body,
1087                timestamp: msg.sent_at.unix_timestamp() as u64,
1088                sender_id: msg.sender_id.to_proto(),
1089                nonce: Some(msg.nonce.as_u128().into()),
1090            })
1091            .collect::<Vec<_>>();
1092        self.peer.respond(
1093            request.receipt(),
1094            proto::GetChannelMessagesResponse {
1095                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1096                messages,
1097            },
1098        )?;
1099        Ok(())
1100    }
1101
1102    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1103        self.store.read()
1104    }
1105
1106    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1107        self.store.write()
1108    }
1109}
1110
1111fn broadcast<F>(
1112    sender_id: ConnectionId,
1113    receiver_ids: Vec<ConnectionId>,
1114    mut f: F,
1115) -> anyhow::Result<()>
1116where
1117    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1118{
1119    let mut result = Ok(());
1120    for receiver_id in receiver_ids {
1121        if receiver_id != sender_id {
1122            if let Err(error) = f(receiver_id) {
1123                if result.is_ok() {
1124                    result = Err(error);
1125                }
1126            }
1127        }
1128    }
1129    result
1130}
1131
1132pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1133    let server = Server::new(app.state().clone(), rpc.clone(), None);
1134    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1135        let server = server.clone();
1136        async move {
1137            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1138
1139            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1140            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1141            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1142            let client_protocol_version: Option<u32> = request
1143                .header("X-Zed-Protocol-Version")
1144                .and_then(|v| v.as_str().parse().ok());
1145
1146            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1147                return Ok(Response::new(StatusCode::UpgradeRequired));
1148            }
1149
1150            let header = match request.header("Sec-Websocket-Key") {
1151                Some(h) => h.as_str(),
1152                None => return Err(anyhow!("expected sec-websocket-key"))?,
1153            };
1154
1155            let user_id = process_auth_header(&request).await?;
1156
1157            let mut response = Response::new(StatusCode::SwitchingProtocols);
1158            response.insert_header(UPGRADE, "websocket");
1159            response.insert_header(CONNECTION, "Upgrade");
1160            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1161            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1162            response.insert_header("Sec-Websocket-Version", "13");
1163
1164            let http_res: &mut tide::http::Response = response.as_mut();
1165            let upgrade_receiver = http_res.recv_upgrade().await;
1166            let addr = request.remote().unwrap_or("unknown").to_string();
1167            task::spawn(async move {
1168                if let Some(stream) = upgrade_receiver.await {
1169                    server
1170                        .handle_connection(
1171                            Connection::new(
1172                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1173                            ),
1174                            addr,
1175                            user_id,
1176                            None,
1177                        )
1178                        .await;
1179                }
1180            });
1181
1182            Ok(response)
1183        }
1184    });
1185}
1186
1187fn header_contains_ignore_case<T>(
1188    request: &tide::Request<T>,
1189    header_name: HeaderName,
1190    value: &str,
1191) -> bool {
1192    request
1193        .header(header_name)
1194        .map(|h| {
1195            h.as_str()
1196                .split(',')
1197                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1198        })
1199        .unwrap_or(false)
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204    use super::*;
1205    use crate::{
1206        auth,
1207        db::{tests::TestDb, UserId},
1208        github, AppState, Config,
1209    };
1210    use ::rpc::Peer;
1211    use async_std::task;
1212    use gpui::{executor, ModelHandle, TestAppContext};
1213    use parking_lot::Mutex;
1214    use postage::{mpsc, watch};
1215    use rand::prelude::*;
1216    use rpc::PeerId;
1217    use serde_json::json;
1218    use sqlx::types::time::OffsetDateTime;
1219    use std::{
1220        ops::Deref,
1221        path::Path,
1222        rc::Rc,
1223        sync::{
1224            atomic::{AtomicBool, Ordering::SeqCst},
1225            Arc,
1226        },
1227        time::Duration,
1228    };
1229    use zed::{
1230        client::{
1231            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1232            EstablishConnectionError, UserStore,
1233        },
1234        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1235        fs::{FakeFs, Fs as _},
1236        language::{
1237            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1238            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1239        },
1240        lsp,
1241        project::{DiagnosticSummary, Project, ProjectPath},
1242    };
1243
1244    #[cfg(test)]
1245    #[ctor::ctor]
1246    fn init_logger() {
1247        // std::env::set_var("RUST_LOG", "info");
1248        env_logger::init();
1249    }
1250
1251    #[gpui::test]
1252    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1253        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1254        let lang_registry = Arc::new(LanguageRegistry::new());
1255        let fs = Arc::new(FakeFs::new(cx_a.background()));
1256        cx_a.foreground().forbid_parking();
1257
1258        // Connect to a server as 2 clients.
1259        let mut server = TestServer::start(cx_a.foreground()).await;
1260        let client_a = server.create_client(&mut cx_a, "user_a").await;
1261        let client_b = server.create_client(&mut cx_b, "user_b").await;
1262
1263        // Share a project as client A
1264        fs.insert_tree(
1265            "/a",
1266            json!({
1267                ".zed.toml": r#"collaborators = ["user_b"]"#,
1268                "a.txt": "a-contents",
1269                "b.txt": "b-contents",
1270            }),
1271        )
1272        .await;
1273        let project_a = cx_a.update(|cx| {
1274            Project::local(
1275                client_a.clone(),
1276                client_a.user_store.clone(),
1277                lang_registry.clone(),
1278                fs.clone(),
1279                cx,
1280            )
1281        });
1282        let (worktree_a, _) = project_a
1283            .update(&mut cx_a, |p, cx| {
1284                p.find_or_create_local_worktree("/a", false, cx)
1285            })
1286            .await
1287            .unwrap();
1288        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1289        worktree_a
1290            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1291            .await;
1292        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1293        project_a
1294            .update(&mut cx_a, |p, cx| p.share(cx))
1295            .await
1296            .unwrap();
1297
1298        // Join that project as client B
1299        let project_b = Project::remote(
1300            project_id,
1301            client_b.clone(),
1302            client_b.user_store.clone(),
1303            lang_registry.clone(),
1304            fs.clone(),
1305            &mut cx_b.to_async(),
1306        )
1307        .await
1308        .unwrap();
1309
1310        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1311            assert_eq!(
1312                project
1313                    .collaborators()
1314                    .get(&client_a.peer_id)
1315                    .unwrap()
1316                    .user
1317                    .github_login,
1318                "user_a"
1319            );
1320            project.replica_id()
1321        });
1322        project_a
1323            .condition(&cx_a, |tree, _| {
1324                tree.collaborators()
1325                    .get(&client_b.peer_id)
1326                    .map_or(false, |collaborator| {
1327                        collaborator.replica_id == replica_id_b
1328                            && collaborator.user.github_login == "user_b"
1329                    })
1330            })
1331            .await;
1332
1333        // Open the same file as client B and client A.
1334        let buffer_b = project_b
1335            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1336            .await
1337            .unwrap();
1338        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1339        buffer_b.read_with(&cx_b, |buf, cx| {
1340            assert_eq!(buf.read(cx).text(), "b-contents")
1341        });
1342        project_a.read_with(&cx_a, |project, cx| {
1343            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1344        });
1345        let buffer_a = project_a
1346            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1347            .await
1348            .unwrap();
1349
1350        let editor_b = cx_b.add_view(window_b, |cx| {
1351            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), cx)
1352        });
1353
1354        // TODO
1355        // // Create a selection set as client B and see that selection set as client A.
1356        // buffer_a
1357        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1358        //     .await;
1359
1360        // Edit the buffer as client B and see that edit as client A.
1361        editor_b.update(&mut cx_b, |editor, cx| {
1362            editor.handle_input(&Input("ok, ".into()), cx)
1363        });
1364        buffer_a
1365            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1366            .await;
1367
1368        // TODO
1369        // // Remove the selection set as client B, see those selections disappear as client A.
1370        cx_b.update(move |_| drop(editor_b));
1371        // buffer_a
1372        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1373        //     .await;
1374
1375        // Close the buffer as client A, see that the buffer is closed.
1376        cx_a.update(move |_| drop(buffer_a));
1377        project_a
1378            .condition(&cx_a, |project, cx| {
1379                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1380            })
1381            .await;
1382
1383        // Dropping the client B's project removes client B from client A's collaborators.
1384        cx_b.update(move |_| drop(project_b));
1385        project_a
1386            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1387            .await;
1388    }
1389
1390    #[gpui::test]
1391    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1392        let lang_registry = Arc::new(LanguageRegistry::new());
1393        let fs = Arc::new(FakeFs::new(cx_a.background()));
1394        cx_a.foreground().forbid_parking();
1395
1396        // Connect to a server as 2 clients.
1397        let mut server = TestServer::start(cx_a.foreground()).await;
1398        let client_a = server.create_client(&mut cx_a, "user_a").await;
1399        let client_b = server.create_client(&mut cx_b, "user_b").await;
1400
1401        // Share a project as client A
1402        fs.insert_tree(
1403            "/a",
1404            json!({
1405                ".zed.toml": r#"collaborators = ["user_b"]"#,
1406                "a.txt": "a-contents",
1407                "b.txt": "b-contents",
1408            }),
1409        )
1410        .await;
1411        let project_a = cx_a.update(|cx| {
1412            Project::local(
1413                client_a.clone(),
1414                client_a.user_store.clone(),
1415                lang_registry.clone(),
1416                fs.clone(),
1417                cx,
1418            )
1419        });
1420        let (worktree_a, _) = project_a
1421            .update(&mut cx_a, |p, cx| {
1422                p.find_or_create_local_worktree("/a", false, cx)
1423            })
1424            .await
1425            .unwrap();
1426        worktree_a
1427            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1428            .await;
1429        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1430        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1431        project_a
1432            .update(&mut cx_a, |p, cx| p.share(cx))
1433            .await
1434            .unwrap();
1435        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1436
1437        // Join that project as client B
1438        let project_b = Project::remote(
1439            project_id,
1440            client_b.clone(),
1441            client_b.user_store.clone(),
1442            lang_registry.clone(),
1443            fs.clone(),
1444            &mut cx_b.to_async(),
1445        )
1446        .await
1447        .unwrap();
1448        project_b
1449            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1450            .await
1451            .unwrap();
1452
1453        // Unshare the project as client A
1454        project_a
1455            .update(&mut cx_a, |project, cx| project.unshare(cx))
1456            .await
1457            .unwrap();
1458        project_b
1459            .condition(&mut cx_b, |project, _| project.is_read_only())
1460            .await;
1461        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1462        drop(project_b);
1463
1464        // Share the project again and ensure guests can still join.
1465        project_a
1466            .update(&mut cx_a, |project, cx| project.share(cx))
1467            .await
1468            .unwrap();
1469        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1470
1471        let project_c = Project::remote(
1472            project_id,
1473            client_b.clone(),
1474            client_b.user_store.clone(),
1475            lang_registry.clone(),
1476            fs.clone(),
1477            &mut cx_b.to_async(),
1478        )
1479        .await
1480        .unwrap();
1481        project_c
1482            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1483            .await
1484            .unwrap();
1485    }
1486
1487    #[gpui::test]
1488    async fn test_propagate_saves_and_fs_changes(
1489        mut cx_a: TestAppContext,
1490        mut cx_b: TestAppContext,
1491        mut cx_c: TestAppContext,
1492    ) {
1493        let lang_registry = Arc::new(LanguageRegistry::new());
1494        let fs = Arc::new(FakeFs::new(cx_a.background()));
1495        cx_a.foreground().forbid_parking();
1496
1497        // Connect to a server as 3 clients.
1498        let mut server = TestServer::start(cx_a.foreground()).await;
1499        let client_a = server.create_client(&mut cx_a, "user_a").await;
1500        let client_b = server.create_client(&mut cx_b, "user_b").await;
1501        let client_c = server.create_client(&mut cx_c, "user_c").await;
1502
1503        // Share a worktree as client A.
1504        fs.insert_tree(
1505            "/a",
1506            json!({
1507                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1508                "file1": "",
1509                "file2": ""
1510            }),
1511        )
1512        .await;
1513        let project_a = cx_a.update(|cx| {
1514            Project::local(
1515                client_a.clone(),
1516                client_a.user_store.clone(),
1517                lang_registry.clone(),
1518                fs.clone(),
1519                cx,
1520            )
1521        });
1522        let (worktree_a, _) = project_a
1523            .update(&mut cx_a, |p, cx| {
1524                p.find_or_create_local_worktree("/a", false, cx)
1525            })
1526            .await
1527            .unwrap();
1528        worktree_a
1529            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1530            .await;
1531        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1532        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1533        project_a
1534            .update(&mut cx_a, |p, cx| p.share(cx))
1535            .await
1536            .unwrap();
1537
1538        // Join that worktree as clients B and C.
1539        let project_b = Project::remote(
1540            project_id,
1541            client_b.clone(),
1542            client_b.user_store.clone(),
1543            lang_registry.clone(),
1544            fs.clone(),
1545            &mut cx_b.to_async(),
1546        )
1547        .await
1548        .unwrap();
1549        let project_c = Project::remote(
1550            project_id,
1551            client_c.clone(),
1552            client_c.user_store.clone(),
1553            lang_registry.clone(),
1554            fs.clone(),
1555            &mut cx_c.to_async(),
1556        )
1557        .await
1558        .unwrap();
1559        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1560        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1561
1562        // Open and edit a buffer as both guests B and C.
1563        let buffer_b = project_b
1564            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1565            .await
1566            .unwrap();
1567        let buffer_c = project_c
1568            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1569            .await
1570            .unwrap();
1571        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1572        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1573
1574        // Open and edit that buffer as the host.
1575        let buffer_a = project_a
1576            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1577            .await
1578            .unwrap();
1579
1580        buffer_a
1581            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1582            .await;
1583        buffer_a.update(&mut cx_a, |buf, cx| {
1584            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1585        });
1586
1587        // Wait for edits to propagate
1588        buffer_a
1589            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1590            .await;
1591        buffer_b
1592            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1593            .await;
1594        buffer_c
1595            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1596            .await;
1597
1598        // Edit the buffer as the host and concurrently save as guest B.
1599        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1600        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1601        save_b.await.unwrap();
1602        assert_eq!(
1603            fs.load("/a/file1".as_ref()).await.unwrap(),
1604            "hi-a, i-am-c, i-am-b, i-am-a"
1605        );
1606        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1607        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1608        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1609
1610        // Make changes on host's file system, see those changes on guest worktrees.
1611        fs.rename(
1612            "/a/file1".as_ref(),
1613            "/a/file1-renamed".as_ref(),
1614            Default::default(),
1615        )
1616        .await
1617        .unwrap();
1618        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1619            .await
1620            .unwrap();
1621        fs.insert_file(Path::new("/a/file4"), "4".into())
1622            .await
1623            .unwrap();
1624
1625        worktree_a
1626            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1627            .await;
1628        worktree_b
1629            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1630            .await;
1631        worktree_c
1632            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1633            .await;
1634        worktree_a.read_with(&cx_a, |tree, _| {
1635            assert_eq!(
1636                tree.paths()
1637                    .map(|p| p.to_string_lossy())
1638                    .collect::<Vec<_>>(),
1639                &[".zed.toml", "file1-renamed", "file3", "file4"]
1640            )
1641        });
1642        worktree_b.read_with(&cx_b, |tree, _| {
1643            assert_eq!(
1644                tree.paths()
1645                    .map(|p| p.to_string_lossy())
1646                    .collect::<Vec<_>>(),
1647                &[".zed.toml", "file1-renamed", "file3", "file4"]
1648            )
1649        });
1650        worktree_c.read_with(&cx_c, |tree, _| {
1651            assert_eq!(
1652                tree.paths()
1653                    .map(|p| p.to_string_lossy())
1654                    .collect::<Vec<_>>(),
1655                &[".zed.toml", "file1-renamed", "file3", "file4"]
1656            )
1657        });
1658
1659        // Ensure buffer files are updated as well.
1660        buffer_a
1661            .condition(&cx_a, |buf, _| {
1662                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1663            })
1664            .await;
1665        buffer_b
1666            .condition(&cx_b, |buf, _| {
1667                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1668            })
1669            .await;
1670        buffer_c
1671            .condition(&cx_c, |buf, _| {
1672                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1673            })
1674            .await;
1675    }
1676
1677    #[gpui::test]
1678    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1679        cx_a.foreground().forbid_parking();
1680        let lang_registry = Arc::new(LanguageRegistry::new());
1681        let fs = Arc::new(FakeFs::new(cx_a.background()));
1682
1683        // Connect to a server as 2 clients.
1684        let mut server = TestServer::start(cx_a.foreground()).await;
1685        let client_a = server.create_client(&mut cx_a, "user_a").await;
1686        let client_b = server.create_client(&mut cx_b, "user_b").await;
1687
1688        // Share a project as client A
1689        fs.insert_tree(
1690            "/dir",
1691            json!({
1692                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1693                "a.txt": "a-contents",
1694            }),
1695        )
1696        .await;
1697
1698        let project_a = cx_a.update(|cx| {
1699            Project::local(
1700                client_a.clone(),
1701                client_a.user_store.clone(),
1702                lang_registry.clone(),
1703                fs.clone(),
1704                cx,
1705            )
1706        });
1707        let (worktree_a, _) = project_a
1708            .update(&mut cx_a, |p, cx| {
1709                p.find_or_create_local_worktree("/dir", false, cx)
1710            })
1711            .await
1712            .unwrap();
1713        worktree_a
1714            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1715            .await;
1716        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1717        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1718        project_a
1719            .update(&mut cx_a, |p, cx| p.share(cx))
1720            .await
1721            .unwrap();
1722
1723        // Join that project as client B
1724        let project_b = Project::remote(
1725            project_id,
1726            client_b.clone(),
1727            client_b.user_store.clone(),
1728            lang_registry.clone(),
1729            fs.clone(),
1730            &mut cx_b.to_async(),
1731        )
1732        .await
1733        .unwrap();
1734        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1735
1736        // Open a buffer as client B
1737        let buffer_b = project_b
1738            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1739            .await
1740            .unwrap();
1741        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1742
1743        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1744        buffer_b.read_with(&cx_b, |buf, _| {
1745            assert!(buf.is_dirty());
1746            assert!(!buf.has_conflict());
1747        });
1748
1749        buffer_b
1750            .update(&mut cx_b, |buf, cx| buf.save(cx))
1751            .await
1752            .unwrap();
1753        worktree_b
1754            .condition(&cx_b, |_, cx| {
1755                buffer_b.read(cx).file().unwrap().mtime() != mtime
1756            })
1757            .await;
1758        buffer_b.read_with(&cx_b, |buf, _| {
1759            assert!(!buf.is_dirty());
1760            assert!(!buf.has_conflict());
1761        });
1762
1763        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1764        buffer_b.read_with(&cx_b, |buf, _| {
1765            assert!(buf.is_dirty());
1766            assert!(!buf.has_conflict());
1767        });
1768    }
1769
1770    #[gpui::test]
1771    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1772        cx_a.foreground().forbid_parking();
1773        let lang_registry = Arc::new(LanguageRegistry::new());
1774        let fs = Arc::new(FakeFs::new(cx_a.background()));
1775
1776        // Connect to a server as 2 clients.
1777        let mut server = TestServer::start(cx_a.foreground()).await;
1778        let client_a = server.create_client(&mut cx_a, "user_a").await;
1779        let client_b = server.create_client(&mut cx_b, "user_b").await;
1780
1781        // Share a project as client A
1782        fs.insert_tree(
1783            "/dir",
1784            json!({
1785                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1786                "a.txt": "a-contents",
1787            }),
1788        )
1789        .await;
1790
1791        let project_a = cx_a.update(|cx| {
1792            Project::local(
1793                client_a.clone(),
1794                client_a.user_store.clone(),
1795                lang_registry.clone(),
1796                fs.clone(),
1797                cx,
1798            )
1799        });
1800        let (worktree_a, _) = project_a
1801            .update(&mut cx_a, |p, cx| {
1802                p.find_or_create_local_worktree("/dir", false, cx)
1803            })
1804            .await
1805            .unwrap();
1806        worktree_a
1807            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1808            .await;
1809        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1810        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1811        project_a
1812            .update(&mut cx_a, |p, cx| p.share(cx))
1813            .await
1814            .unwrap();
1815
1816        // Join that project as client B
1817        let project_b = Project::remote(
1818            project_id,
1819            client_b.clone(),
1820            client_b.user_store.clone(),
1821            lang_registry.clone(),
1822            fs.clone(),
1823            &mut cx_b.to_async(),
1824        )
1825        .await
1826        .unwrap();
1827        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1828
1829        // Open a buffer as client B
1830        let buffer_b = project_b
1831            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1832            .await
1833            .unwrap();
1834        buffer_b.read_with(&cx_b, |buf, _| {
1835            assert!(!buf.is_dirty());
1836            assert!(!buf.has_conflict());
1837        });
1838
1839        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1840            .await
1841            .unwrap();
1842        buffer_b
1843            .condition(&cx_b, |buf, _| {
1844                buf.text() == "new contents" && !buf.is_dirty()
1845            })
1846            .await;
1847        buffer_b.read_with(&cx_b, |buf, _| {
1848            assert!(!buf.has_conflict());
1849        });
1850    }
1851
1852    #[gpui::test(iterations = 100)]
1853    async fn test_editing_while_guest_opens_buffer(
1854        mut cx_a: TestAppContext,
1855        mut cx_b: TestAppContext,
1856    ) {
1857        cx_a.foreground().forbid_parking();
1858        let lang_registry = Arc::new(LanguageRegistry::new());
1859        let fs = Arc::new(FakeFs::new(cx_a.background()));
1860
1861        // Connect to a server as 2 clients.
1862        let mut server = TestServer::start(cx_a.foreground()).await;
1863        let client_a = server.create_client(&mut cx_a, "user_a").await;
1864        let client_b = server.create_client(&mut cx_b, "user_b").await;
1865
1866        // Share a project as client A
1867        fs.insert_tree(
1868            "/dir",
1869            json!({
1870                ".zed.toml": r#"collaborators = ["user_b"]"#,
1871                "a.txt": "a-contents",
1872            }),
1873        )
1874        .await;
1875        let project_a = cx_a.update(|cx| {
1876            Project::local(
1877                client_a.clone(),
1878                client_a.user_store.clone(),
1879                lang_registry.clone(),
1880                fs.clone(),
1881                cx,
1882            )
1883        });
1884        let (worktree_a, _) = project_a
1885            .update(&mut cx_a, |p, cx| {
1886                p.find_or_create_local_worktree("/dir", false, cx)
1887            })
1888            .await
1889            .unwrap();
1890        worktree_a
1891            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1892            .await;
1893        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1894        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1895        project_a
1896            .update(&mut cx_a, |p, cx| p.share(cx))
1897            .await
1898            .unwrap();
1899
1900        // Join that project as client B
1901        let project_b = Project::remote(
1902            project_id,
1903            client_b.clone(),
1904            client_b.user_store.clone(),
1905            lang_registry.clone(),
1906            fs.clone(),
1907            &mut cx_b.to_async(),
1908        )
1909        .await
1910        .unwrap();
1911
1912        // Open a buffer as client A
1913        let buffer_a = project_a
1914            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1915            .await
1916            .unwrap();
1917
1918        // Start opening the same buffer as client B
1919        let buffer_b = cx_b
1920            .background()
1921            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1922        task::yield_now().await;
1923
1924        // Edit the buffer as client A while client B is still opening it.
1925        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1926
1927        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1928        let buffer_b = buffer_b.await.unwrap();
1929        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1930    }
1931
1932    #[gpui::test]
1933    async fn test_leaving_worktree_while_opening_buffer(
1934        mut cx_a: TestAppContext,
1935        mut cx_b: TestAppContext,
1936    ) {
1937        cx_a.foreground().forbid_parking();
1938        let lang_registry = Arc::new(LanguageRegistry::new());
1939        let fs = Arc::new(FakeFs::new(cx_a.background()));
1940
1941        // Connect to a server as 2 clients.
1942        let mut server = TestServer::start(cx_a.foreground()).await;
1943        let client_a = server.create_client(&mut cx_a, "user_a").await;
1944        let client_b = server.create_client(&mut cx_b, "user_b").await;
1945
1946        // Share a project as client A
1947        fs.insert_tree(
1948            "/dir",
1949            json!({
1950                ".zed.toml": r#"collaborators = ["user_b"]"#,
1951                "a.txt": "a-contents",
1952            }),
1953        )
1954        .await;
1955        let project_a = cx_a.update(|cx| {
1956            Project::local(
1957                client_a.clone(),
1958                client_a.user_store.clone(),
1959                lang_registry.clone(),
1960                fs.clone(),
1961                cx,
1962            )
1963        });
1964        let (worktree_a, _) = project_a
1965            .update(&mut cx_a, |p, cx| {
1966                p.find_or_create_local_worktree("/dir", false, cx)
1967            })
1968            .await
1969            .unwrap();
1970        worktree_a
1971            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1972            .await;
1973        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1974        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1975        project_a
1976            .update(&mut cx_a, |p, cx| p.share(cx))
1977            .await
1978            .unwrap();
1979
1980        // Join that project as client B
1981        let project_b = Project::remote(
1982            project_id,
1983            client_b.clone(),
1984            client_b.user_store.clone(),
1985            lang_registry.clone(),
1986            fs.clone(),
1987            &mut cx_b.to_async(),
1988        )
1989        .await
1990        .unwrap();
1991
1992        // See that a guest has joined as client A.
1993        project_a
1994            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1995            .await;
1996
1997        // Begin opening a buffer as client B, but leave the project before the open completes.
1998        let buffer_b = cx_b
1999            .background()
2000            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2001        cx_b.update(|_| drop(project_b));
2002        drop(buffer_b);
2003
2004        // See that the guest has left.
2005        project_a
2006            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2007            .await;
2008    }
2009
2010    #[gpui::test]
2011    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2012        cx_a.foreground().forbid_parking();
2013        let lang_registry = Arc::new(LanguageRegistry::new());
2014        let fs = Arc::new(FakeFs::new(cx_a.background()));
2015
2016        // Connect to a server as 2 clients.
2017        let mut server = TestServer::start(cx_a.foreground()).await;
2018        let client_a = server.create_client(&mut cx_a, "user_a").await;
2019        let client_b = server.create_client(&mut cx_b, "user_b").await;
2020
2021        // Share a project as client A
2022        fs.insert_tree(
2023            "/a",
2024            json!({
2025                ".zed.toml": r#"collaborators = ["user_b"]"#,
2026                "a.txt": "a-contents",
2027                "b.txt": "b-contents",
2028            }),
2029        )
2030        .await;
2031        let project_a = cx_a.update(|cx| {
2032            Project::local(
2033                client_a.clone(),
2034                client_a.user_store.clone(),
2035                lang_registry.clone(),
2036                fs.clone(),
2037                cx,
2038            )
2039        });
2040        let (worktree_a, _) = project_a
2041            .update(&mut cx_a, |p, cx| {
2042                p.find_or_create_local_worktree("/a", false, cx)
2043            })
2044            .await
2045            .unwrap();
2046        worktree_a
2047            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2048            .await;
2049        let project_id = project_a
2050            .update(&mut cx_a, |project, _| project.next_remote_id())
2051            .await;
2052        project_a
2053            .update(&mut cx_a, |project, cx| project.share(cx))
2054            .await
2055            .unwrap();
2056
2057        // Join that project as client B
2058        let _project_b = Project::remote(
2059            project_id,
2060            client_b.clone(),
2061            client_b.user_store.clone(),
2062            lang_registry.clone(),
2063            fs.clone(),
2064            &mut cx_b.to_async(),
2065        )
2066        .await
2067        .unwrap();
2068
2069        // See that a guest has joined as client A.
2070        project_a
2071            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2072            .await;
2073
2074        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2075        client_b.disconnect(&cx_b.to_async()).unwrap();
2076        project_a
2077            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2078            .await;
2079    }
2080
2081    #[gpui::test]
2082    async fn test_collaborating_with_diagnostics(
2083        mut cx_a: TestAppContext,
2084        mut cx_b: TestAppContext,
2085    ) {
2086        cx_a.foreground().forbid_parking();
2087        let mut lang_registry = Arc::new(LanguageRegistry::new());
2088        let fs = Arc::new(FakeFs::new(cx_a.background()));
2089
2090        // Set up a fake language server.
2091        let (language_server_config, mut fake_language_server) =
2092            LanguageServerConfig::fake(cx_a.background()).await;
2093        Arc::get_mut(&mut lang_registry)
2094            .unwrap()
2095            .add(Arc::new(Language::new(
2096                LanguageConfig {
2097                    name: "Rust".to_string(),
2098                    path_suffixes: vec!["rs".to_string()],
2099                    language_server: Some(language_server_config),
2100                    ..Default::default()
2101                },
2102                Some(tree_sitter_rust::language()),
2103            )));
2104
2105        // Connect to a server as 2 clients.
2106        let mut server = TestServer::start(cx_a.foreground()).await;
2107        let client_a = server.create_client(&mut cx_a, "user_a").await;
2108        let client_b = server.create_client(&mut cx_b, "user_b").await;
2109
2110        // Share a project as client A
2111        fs.insert_tree(
2112            "/a",
2113            json!({
2114                ".zed.toml": r#"collaborators = ["user_b"]"#,
2115                "a.rs": "let one = two",
2116                "other.rs": "",
2117            }),
2118        )
2119        .await;
2120        let project_a = cx_a.update(|cx| {
2121            Project::local(
2122                client_a.clone(),
2123                client_a.user_store.clone(),
2124                lang_registry.clone(),
2125                fs.clone(),
2126                cx,
2127            )
2128        });
2129        let (worktree_a, _) = project_a
2130            .update(&mut cx_a, |p, cx| {
2131                p.find_or_create_local_worktree("/a", false, cx)
2132            })
2133            .await
2134            .unwrap();
2135        worktree_a
2136            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2137            .await;
2138        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2139        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2140        project_a
2141            .update(&mut cx_a, |p, cx| p.share(cx))
2142            .await
2143            .unwrap();
2144
2145        // Cause the language server to start.
2146        let _ = cx_a
2147            .background()
2148            .spawn(project_a.update(&mut cx_a, |project, cx| {
2149                project.open_buffer(
2150                    ProjectPath {
2151                        worktree_id,
2152                        path: Path::new("other.rs").into(),
2153                    },
2154                    cx,
2155                )
2156            }))
2157            .await
2158            .unwrap();
2159
2160        // Simulate a language server reporting errors for a file.
2161        fake_language_server
2162            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2163                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2164                version: None,
2165                diagnostics: vec![lsp::Diagnostic {
2166                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2167                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2168                    message: "message 1".to_string(),
2169                    ..Default::default()
2170                }],
2171            })
2172            .await;
2173
2174        // Wait for server to see the diagnostics update.
2175        server
2176            .condition(|store| {
2177                let worktree = store
2178                    .project(project_id)
2179                    .unwrap()
2180                    .worktrees
2181                    .get(&worktree_id.to_proto())
2182                    .unwrap();
2183
2184                !worktree
2185                    .share
2186                    .as_ref()
2187                    .unwrap()
2188                    .diagnostic_summaries
2189                    .is_empty()
2190            })
2191            .await;
2192
2193        // Join the worktree as client B.
2194        let project_b = Project::remote(
2195            project_id,
2196            client_b.clone(),
2197            client_b.user_store.clone(),
2198            lang_registry.clone(),
2199            fs.clone(),
2200            &mut cx_b.to_async(),
2201        )
2202        .await
2203        .unwrap();
2204
2205        project_b.read_with(&cx_b, |project, cx| {
2206            assert_eq!(
2207                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2208                &[(
2209                    ProjectPath {
2210                        worktree_id,
2211                        path: Arc::from(Path::new("a.rs")),
2212                    },
2213                    DiagnosticSummary {
2214                        error_count: 1,
2215                        warning_count: 0,
2216                        ..Default::default()
2217                    },
2218                )]
2219            )
2220        });
2221
2222        // Simulate a language server reporting more errors for a file.
2223        fake_language_server
2224            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2225                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2226                version: None,
2227                diagnostics: vec![
2228                    lsp::Diagnostic {
2229                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2230                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2231                        message: "message 1".to_string(),
2232                        ..Default::default()
2233                    },
2234                    lsp::Diagnostic {
2235                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2236                        range: lsp::Range::new(
2237                            lsp::Position::new(0, 10),
2238                            lsp::Position::new(0, 13),
2239                        ),
2240                        message: "message 2".to_string(),
2241                        ..Default::default()
2242                    },
2243                ],
2244            })
2245            .await;
2246
2247        // Client b gets the updated summaries
2248        project_b
2249            .condition(&cx_b, |project, cx| {
2250                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2251                    == &[(
2252                        ProjectPath {
2253                            worktree_id,
2254                            path: Arc::from(Path::new("a.rs")),
2255                        },
2256                        DiagnosticSummary {
2257                            error_count: 1,
2258                            warning_count: 1,
2259                            ..Default::default()
2260                        },
2261                    )]
2262            })
2263            .await;
2264
2265        // Open the file with the errors on client B. They should be present.
2266        let buffer_b = cx_b
2267            .background()
2268            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2269            .await
2270            .unwrap();
2271
2272        buffer_b.read_with(&cx_b, |buffer, _| {
2273            assert_eq!(
2274                buffer
2275                    .snapshot()
2276                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2277                    .map(|entry| entry)
2278                    .collect::<Vec<_>>(),
2279                &[
2280                    DiagnosticEntry {
2281                        range: Point::new(0, 4)..Point::new(0, 7),
2282                        diagnostic: Diagnostic {
2283                            group_id: 0,
2284                            message: "message 1".to_string(),
2285                            severity: lsp::DiagnosticSeverity::ERROR,
2286                            is_primary: true,
2287                            ..Default::default()
2288                        }
2289                    },
2290                    DiagnosticEntry {
2291                        range: Point::new(0, 10)..Point::new(0, 13),
2292                        diagnostic: Diagnostic {
2293                            group_id: 1,
2294                            severity: lsp::DiagnosticSeverity::WARNING,
2295                            message: "message 2".to_string(),
2296                            is_primary: true,
2297                            ..Default::default()
2298                        }
2299                    }
2300                ]
2301            );
2302        });
2303    }
2304
2305    #[gpui::test]
2306    async fn test_collaborating_with_completion(
2307        mut cx_a: TestAppContext,
2308        mut cx_b: TestAppContext,
2309    ) {
2310        cx_a.foreground().forbid_parking();
2311        let mut lang_registry = Arc::new(LanguageRegistry::new());
2312        let fs = Arc::new(FakeFs::new(cx_a.background()));
2313
2314        // Set up a fake language server.
2315        let (language_server_config, mut fake_language_server) =
2316            LanguageServerConfig::fake_with_capabilities(
2317                lsp::ServerCapabilities {
2318                    completion_provider: Some(lsp::CompletionOptions {
2319                        trigger_characters: Some(vec![".".to_string()]),
2320                        ..Default::default()
2321                    }),
2322                    ..Default::default()
2323                },
2324                cx_a.background(),
2325            )
2326            .await;
2327        Arc::get_mut(&mut lang_registry)
2328            .unwrap()
2329            .add(Arc::new(Language::new(
2330                LanguageConfig {
2331                    name: "Rust".to_string(),
2332                    path_suffixes: vec!["rs".to_string()],
2333                    language_server: Some(language_server_config),
2334                    ..Default::default()
2335                },
2336                Some(tree_sitter_rust::language()),
2337            )));
2338
2339        // Connect to a server as 2 clients.
2340        let mut server = TestServer::start(cx_a.foreground()).await;
2341        let client_a = server.create_client(&mut cx_a, "user_a").await;
2342        let client_b = server.create_client(&mut cx_b, "user_b").await;
2343
2344        // Share a project as client A
2345        fs.insert_tree(
2346            "/a",
2347            json!({
2348                ".zed.toml": r#"collaborators = ["user_b"]"#,
2349                "main.rs": "fn main() { a }",
2350                "other.rs": "",
2351            }),
2352        )
2353        .await;
2354        let project_a = cx_a.update(|cx| {
2355            Project::local(
2356                client_a.clone(),
2357                client_a.user_store.clone(),
2358                lang_registry.clone(),
2359                fs.clone(),
2360                cx,
2361            )
2362        });
2363        let (worktree_a, _) = project_a
2364            .update(&mut cx_a, |p, cx| {
2365                p.find_or_create_local_worktree("/a", false, cx)
2366            })
2367            .await
2368            .unwrap();
2369        worktree_a
2370            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2371            .await;
2372        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2373        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2374        project_a
2375            .update(&mut cx_a, |p, cx| p.share(cx))
2376            .await
2377            .unwrap();
2378
2379        // Join the worktree as client B.
2380        let project_b = Project::remote(
2381            project_id,
2382            client_b.clone(),
2383            client_b.user_store.clone(),
2384            lang_registry.clone(),
2385            fs.clone(),
2386            &mut cx_b.to_async(),
2387        )
2388        .await
2389        .unwrap();
2390
2391        // Open a file in an editor as the guest.
2392        let buffer_b = project_b
2393            .update(&mut cx_b, |p, cx| {
2394                p.open_buffer((worktree_id, "main.rs"), cx)
2395            })
2396            .await
2397            .unwrap();
2398        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2399        let editor_b = cx_b.add_view(window_b, |cx| {
2400            Editor::for_buffer(
2401                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2402                Arc::new(|cx| EditorSettings::test(cx)),
2403                cx,
2404            )
2405        });
2406
2407        // Type a completion trigger character as the guest.
2408        editor_b.update(&mut cx_b, |editor, cx| {
2409            editor.select_ranges([13..13], None, cx);
2410            editor.handle_input(&Input(".".into()), cx);
2411            cx.focus(&editor_b);
2412        });
2413
2414        // Receive a completion request as the host's language server.
2415        let (request_id, params) = fake_language_server
2416            .receive_request::<lsp::request::Completion>()
2417            .await;
2418        assert_eq!(
2419            params.text_document_position.text_document.uri,
2420            lsp::Url::from_file_path("/a/main.rs").unwrap(),
2421        );
2422        assert_eq!(
2423            params.text_document_position.position,
2424            lsp::Position::new(0, 14),
2425        );
2426
2427        // Return some completions from the host's language server.
2428        fake_language_server
2429            .respond(
2430                request_id,
2431                Some(lsp::CompletionResponse::Array(vec![
2432                    lsp::CompletionItem {
2433                        label: "first_method(…)".into(),
2434                        detail: Some("fn(&mut self, B) -> C".into()),
2435                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2436                            new_text: "first_method($1)".to_string(),
2437                            range: lsp::Range::new(
2438                                lsp::Position::new(0, 14),
2439                                lsp::Position::new(0, 14),
2440                            ),
2441                        })),
2442                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2443                        ..Default::default()
2444                    },
2445                    lsp::CompletionItem {
2446                        label: "second_method(…)".into(),
2447                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2448                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2449                            new_text: "second_method()".to_string(),
2450                            range: lsp::Range::new(
2451                                lsp::Position::new(0, 14),
2452                                lsp::Position::new(0, 14),
2453                            ),
2454                        })),
2455                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2456                        ..Default::default()
2457                    },
2458                ])),
2459            )
2460            .await;
2461
2462        // Open the buffer on the host.
2463        let buffer_a = project_a
2464            .update(&mut cx_a, |p, cx| {
2465                p.open_buffer((worktree_id, "main.rs"), cx)
2466            })
2467            .await
2468            .unwrap();
2469        buffer_a
2470            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2471            .await;
2472
2473        // Confirm a completion on the guest.
2474        editor_b.next_notification(&cx_b).await;
2475        editor_b.update(&mut cx_b, |editor, cx| {
2476            assert!(editor.showing_context_menu());
2477            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2478            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2479        });
2480
2481        buffer_a
2482            .condition(&cx_a, |buffer, _| {
2483                buffer.text() == "fn main() { a.first_method() }"
2484            })
2485            .await;
2486
2487        // Receive a request resolve the selected completion on the host's language server.
2488        let (request_id, params) = fake_language_server
2489            .receive_request::<lsp::request::ResolveCompletionItem>()
2490            .await;
2491        assert_eq!(params.label, "first_method(…)");
2492
2493        // Return a resolved completion from the host's language server.
2494        // The resolved completion has an additional text edit.
2495        fake_language_server
2496            .respond(
2497                request_id,
2498                lsp::CompletionItem {
2499                    label: "first_method(…)".into(),
2500                    detail: Some("fn(&mut self, B) -> C".into()),
2501                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2502                        new_text: "first_method($1)".to_string(),
2503                        range: lsp::Range::new(
2504                            lsp::Position::new(0, 14),
2505                            lsp::Position::new(0, 14),
2506                        ),
2507                    })),
2508                    additional_text_edits: Some(vec![lsp::TextEdit {
2509                        new_text: "use d::SomeTrait;\n".to_string(),
2510                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2511                    }]),
2512                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2513                    ..Default::default()
2514                },
2515            )
2516            .await;
2517
2518        // The additional edit is applied.
2519        buffer_b
2520            .condition(&cx_b, |buffer, _| {
2521                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2522            })
2523            .await;
2524        assert_eq!(
2525            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2526            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2527        );
2528    }
2529
2530    #[gpui::test]
2531    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2532        cx_a.foreground().forbid_parking();
2533        let mut lang_registry = Arc::new(LanguageRegistry::new());
2534        let fs = Arc::new(FakeFs::new(cx_a.background()));
2535
2536        // Set up a fake language server.
2537        let (language_server_config, mut fake_language_server) =
2538            LanguageServerConfig::fake(cx_a.background()).await;
2539        Arc::get_mut(&mut lang_registry)
2540            .unwrap()
2541            .add(Arc::new(Language::new(
2542                LanguageConfig {
2543                    name: "Rust".to_string(),
2544                    path_suffixes: vec!["rs".to_string()],
2545                    language_server: Some(language_server_config),
2546                    ..Default::default()
2547                },
2548                Some(tree_sitter_rust::language()),
2549            )));
2550
2551        // Connect to a server as 2 clients.
2552        let mut server = TestServer::start(cx_a.foreground()).await;
2553        let client_a = server.create_client(&mut cx_a, "user_a").await;
2554        let client_b = server.create_client(&mut cx_b, "user_b").await;
2555
2556        // Share a project as client A
2557        fs.insert_tree(
2558            "/a",
2559            json!({
2560                ".zed.toml": r#"collaborators = ["user_b"]"#,
2561                "a.rs": "let one = two",
2562            }),
2563        )
2564        .await;
2565        let project_a = cx_a.update(|cx| {
2566            Project::local(
2567                client_a.clone(),
2568                client_a.user_store.clone(),
2569                lang_registry.clone(),
2570                fs.clone(),
2571                cx,
2572            )
2573        });
2574        let (worktree_a, _) = project_a
2575            .update(&mut cx_a, |p, cx| {
2576                p.find_or_create_local_worktree("/a", false, cx)
2577            })
2578            .await
2579            .unwrap();
2580        worktree_a
2581            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2582            .await;
2583        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2584        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2585        project_a
2586            .update(&mut cx_a, |p, cx| p.share(cx))
2587            .await
2588            .unwrap();
2589
2590        // Join the worktree as client B.
2591        let project_b = Project::remote(
2592            project_id,
2593            client_b.clone(),
2594            client_b.user_store.clone(),
2595            lang_registry.clone(),
2596            fs.clone(),
2597            &mut cx_b.to_async(),
2598        )
2599        .await
2600        .unwrap();
2601
2602        let buffer_b = cx_b
2603            .background()
2604            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2605            .await
2606            .unwrap();
2607
2608        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2609        let (request_id, _) = fake_language_server
2610            .receive_request::<lsp::request::Formatting>()
2611            .await;
2612        fake_language_server
2613            .respond(
2614                request_id,
2615                Some(vec![
2616                    lsp::TextEdit {
2617                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2618                        new_text: "h".to_string(),
2619                    },
2620                    lsp::TextEdit {
2621                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2622                        new_text: "y".to_string(),
2623                    },
2624                ]),
2625            )
2626            .await;
2627        format.await.unwrap();
2628        assert_eq!(
2629            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2630            "let honey = two"
2631        );
2632    }
2633
2634    #[gpui::test]
2635    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2636        cx_a.foreground().forbid_parking();
2637        let mut lang_registry = Arc::new(LanguageRegistry::new());
2638        let fs = Arc::new(FakeFs::new(cx_a.background()));
2639        fs.insert_tree(
2640            "/root-1",
2641            json!({
2642                ".zed.toml": r#"collaborators = ["user_b"]"#,
2643                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2644            }),
2645        )
2646        .await;
2647        fs.insert_tree(
2648            "/root-2",
2649            json!({
2650                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2651            }),
2652        )
2653        .await;
2654
2655        // Set up a fake language server.
2656        let (language_server_config, mut fake_language_server) =
2657            LanguageServerConfig::fake(cx_a.background()).await;
2658        Arc::get_mut(&mut lang_registry)
2659            .unwrap()
2660            .add(Arc::new(Language::new(
2661                LanguageConfig {
2662                    name: "Rust".to_string(),
2663                    path_suffixes: vec!["rs".to_string()],
2664                    language_server: Some(language_server_config),
2665                    ..Default::default()
2666                },
2667                Some(tree_sitter_rust::language()),
2668            )));
2669
2670        // Connect to a server as 2 clients.
2671        let mut server = TestServer::start(cx_a.foreground()).await;
2672        let client_a = server.create_client(&mut cx_a, "user_a").await;
2673        let client_b = server.create_client(&mut cx_b, "user_b").await;
2674
2675        // Share a project as client A
2676        let project_a = cx_a.update(|cx| {
2677            Project::local(
2678                client_a.clone(),
2679                client_a.user_store.clone(),
2680                lang_registry.clone(),
2681                fs.clone(),
2682                cx,
2683            )
2684        });
2685        let (worktree_a, _) = project_a
2686            .update(&mut cx_a, |p, cx| {
2687                p.find_or_create_local_worktree("/root-1", false, cx)
2688            })
2689            .await
2690            .unwrap();
2691        worktree_a
2692            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2693            .await;
2694        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2695        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2696        project_a
2697            .update(&mut cx_a, |p, cx| p.share(cx))
2698            .await
2699            .unwrap();
2700
2701        // Join the worktree as client B.
2702        let project_b = Project::remote(
2703            project_id,
2704            client_b.clone(),
2705            client_b.user_store.clone(),
2706            lang_registry.clone(),
2707            fs.clone(),
2708            &mut cx_b.to_async(),
2709        )
2710        .await
2711        .unwrap();
2712
2713        // Open the file to be formatted on client B.
2714        let buffer_b = cx_b
2715            .background()
2716            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2717            .await
2718            .unwrap();
2719
2720        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2721        let (request_id, _) = fake_language_server
2722            .receive_request::<lsp::request::GotoDefinition>()
2723            .await;
2724        fake_language_server
2725            .respond(
2726                request_id,
2727                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2728                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2729                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2730                ))),
2731            )
2732            .await;
2733        let definitions_1 = definitions_1.await.unwrap();
2734        cx_b.read(|cx| {
2735            assert_eq!(definitions_1.len(), 1);
2736            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2737            let target_buffer = definitions_1[0].target_buffer.read(cx);
2738            assert_eq!(
2739                target_buffer.text(),
2740                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2741            );
2742            assert_eq!(
2743                definitions_1[0].target_range.to_point(target_buffer),
2744                Point::new(0, 6)..Point::new(0, 9)
2745            );
2746        });
2747
2748        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2749        // the previous call to `definition`.
2750        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2751        let (request_id, _) = fake_language_server
2752            .receive_request::<lsp::request::GotoDefinition>()
2753            .await;
2754        fake_language_server
2755            .respond(
2756                request_id,
2757                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2758                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2759                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2760                ))),
2761            )
2762            .await;
2763        let definitions_2 = definitions_2.await.unwrap();
2764        cx_b.read(|cx| {
2765            assert_eq!(definitions_2.len(), 1);
2766            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2767            let target_buffer = definitions_2[0].target_buffer.read(cx);
2768            assert_eq!(
2769                target_buffer.text(),
2770                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2771            );
2772            assert_eq!(
2773                definitions_2[0].target_range.to_point(target_buffer),
2774                Point::new(1, 6)..Point::new(1, 11)
2775            );
2776        });
2777        assert_eq!(
2778            definitions_1[0].target_buffer,
2779            definitions_2[0].target_buffer
2780        );
2781
2782        cx_b.update(|_| {
2783            drop(definitions_1);
2784            drop(definitions_2);
2785        });
2786        project_b
2787            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2788            .await;
2789    }
2790
2791    #[gpui::test]
2792    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2793        mut cx_a: TestAppContext,
2794        mut cx_b: TestAppContext,
2795        mut rng: StdRng,
2796    ) {
2797        cx_a.foreground().forbid_parking();
2798        let mut lang_registry = Arc::new(LanguageRegistry::new());
2799        let fs = Arc::new(FakeFs::new(cx_a.background()));
2800        fs.insert_tree(
2801            "/root",
2802            json!({
2803                ".zed.toml": r#"collaborators = ["user_b"]"#,
2804                "a.rs": "const ONE: usize = b::TWO;",
2805                "b.rs": "const TWO: usize = 2",
2806            }),
2807        )
2808        .await;
2809
2810        // Set up a fake language server.
2811        let (language_server_config, mut fake_language_server) =
2812            LanguageServerConfig::fake(cx_a.background()).await;
2813        Arc::get_mut(&mut lang_registry)
2814            .unwrap()
2815            .add(Arc::new(Language::new(
2816                LanguageConfig {
2817                    name: "Rust".to_string(),
2818                    path_suffixes: vec!["rs".to_string()],
2819                    language_server: Some(language_server_config),
2820                    ..Default::default()
2821                },
2822                Some(tree_sitter_rust::language()),
2823            )));
2824
2825        // Connect to a server as 2 clients.
2826        let mut server = TestServer::start(cx_a.foreground()).await;
2827        let client_a = server.create_client(&mut cx_a, "user_a").await;
2828        let client_b = server.create_client(&mut cx_b, "user_b").await;
2829
2830        // Share a project as client A
2831        let project_a = cx_a.update(|cx| {
2832            Project::local(
2833                client_a.clone(),
2834                client_a.user_store.clone(),
2835                lang_registry.clone(),
2836                fs.clone(),
2837                cx,
2838            )
2839        });
2840        let (worktree_a, _) = project_a
2841            .update(&mut cx_a, |p, cx| {
2842                p.find_or_create_local_worktree("/root", false, cx)
2843            })
2844            .await
2845            .unwrap();
2846        worktree_a
2847            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2848            .await;
2849        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2850        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2851        project_a
2852            .update(&mut cx_a, |p, cx| p.share(cx))
2853            .await
2854            .unwrap();
2855
2856        // Join the worktree as client B.
2857        let project_b = Project::remote(
2858            project_id,
2859            client_b.clone(),
2860            client_b.user_store.clone(),
2861            lang_registry.clone(),
2862            fs.clone(),
2863            &mut cx_b.to_async(),
2864        )
2865        .await
2866        .unwrap();
2867
2868        let buffer_b1 = cx_b
2869            .background()
2870            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2871            .await
2872            .unwrap();
2873
2874        let definitions;
2875        let buffer_b2;
2876        if rng.gen() {
2877            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2878            buffer_b2 =
2879                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2880        } else {
2881            buffer_b2 =
2882                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2883            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2884        }
2885
2886        let (request_id, _) = fake_language_server
2887            .receive_request::<lsp::request::GotoDefinition>()
2888            .await;
2889        fake_language_server
2890            .respond(
2891                request_id,
2892                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2893                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2894                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2895                ))),
2896            )
2897            .await;
2898
2899        let buffer_b2 = buffer_b2.await.unwrap();
2900        let definitions = definitions.await.unwrap();
2901        assert_eq!(definitions.len(), 1);
2902        assert_eq!(definitions[0].target_buffer, buffer_b2);
2903    }
2904
2905    #[gpui::test]
2906    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2907        cx_a.foreground().forbid_parking();
2908
2909        // Connect to a server as 2 clients.
2910        let mut server = TestServer::start(cx_a.foreground()).await;
2911        let client_a = server.create_client(&mut cx_a, "user_a").await;
2912        let client_b = server.create_client(&mut cx_b, "user_b").await;
2913
2914        // Create an org that includes these 2 users.
2915        let db = &server.app_state.db;
2916        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2917        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2918            .await
2919            .unwrap();
2920        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2921            .await
2922            .unwrap();
2923
2924        // Create a channel that includes all the users.
2925        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2926        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2927            .await
2928            .unwrap();
2929        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2930            .await
2931            .unwrap();
2932        db.create_channel_message(
2933            channel_id,
2934            client_b.current_user_id(&cx_b),
2935            "hello A, it's B.",
2936            OffsetDateTime::now_utc(),
2937            1,
2938        )
2939        .await
2940        .unwrap();
2941
2942        let channels_a = cx_a
2943            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2944        channels_a
2945            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2946            .await;
2947        channels_a.read_with(&cx_a, |list, _| {
2948            assert_eq!(
2949                list.available_channels().unwrap(),
2950                &[ChannelDetails {
2951                    id: channel_id.to_proto(),
2952                    name: "test-channel".to_string()
2953                }]
2954            )
2955        });
2956        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2957            this.get_channel(channel_id.to_proto(), cx).unwrap()
2958        });
2959        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2960        channel_a
2961            .condition(&cx_a, |channel, _| {
2962                channel_messages(channel)
2963                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2964            })
2965            .await;
2966
2967        let channels_b = cx_b
2968            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2969        channels_b
2970            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2971            .await;
2972        channels_b.read_with(&cx_b, |list, _| {
2973            assert_eq!(
2974                list.available_channels().unwrap(),
2975                &[ChannelDetails {
2976                    id: channel_id.to_proto(),
2977                    name: "test-channel".to_string()
2978                }]
2979            )
2980        });
2981
2982        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2983            this.get_channel(channel_id.to_proto(), cx).unwrap()
2984        });
2985        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2986        channel_b
2987            .condition(&cx_b, |channel, _| {
2988                channel_messages(channel)
2989                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2990            })
2991            .await;
2992
2993        channel_a
2994            .update(&mut cx_a, |channel, cx| {
2995                channel
2996                    .send_message("oh, hi B.".to_string(), cx)
2997                    .unwrap()
2998                    .detach();
2999                let task = channel.send_message("sup".to_string(), cx).unwrap();
3000                assert_eq!(
3001                    channel_messages(channel),
3002                    &[
3003                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3004                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3005                        ("user_a".to_string(), "sup".to_string(), true)
3006                    ]
3007                );
3008                task
3009            })
3010            .await
3011            .unwrap();
3012
3013        channel_b
3014            .condition(&cx_b, |channel, _| {
3015                channel_messages(channel)
3016                    == [
3017                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3018                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3019                        ("user_a".to_string(), "sup".to_string(), false),
3020                    ]
3021            })
3022            .await;
3023
3024        assert_eq!(
3025            server
3026                .state()
3027                .await
3028                .channel(channel_id)
3029                .unwrap()
3030                .connection_ids
3031                .len(),
3032            2
3033        );
3034        cx_b.update(|_| drop(channel_b));
3035        server
3036            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3037            .await;
3038
3039        cx_a.update(|_| drop(channel_a));
3040        server
3041            .condition(|state| state.channel(channel_id).is_none())
3042            .await;
3043    }
3044
3045    #[gpui::test]
3046    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3047        cx_a.foreground().forbid_parking();
3048
3049        let mut server = TestServer::start(cx_a.foreground()).await;
3050        let client_a = server.create_client(&mut cx_a, "user_a").await;
3051
3052        let db = &server.app_state.db;
3053        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3054        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3055        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3056            .await
3057            .unwrap();
3058        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3059            .await
3060            .unwrap();
3061
3062        let channels_a = cx_a
3063            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3064        channels_a
3065            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3066            .await;
3067        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3068            this.get_channel(channel_id.to_proto(), cx).unwrap()
3069        });
3070
3071        // Messages aren't allowed to be too long.
3072        channel_a
3073            .update(&mut cx_a, |channel, cx| {
3074                let long_body = "this is long.\n".repeat(1024);
3075                channel.send_message(long_body, cx).unwrap()
3076            })
3077            .await
3078            .unwrap_err();
3079
3080        // Messages aren't allowed to be blank.
3081        channel_a.update(&mut cx_a, |channel, cx| {
3082            channel.send_message(String::new(), cx).unwrap_err()
3083        });
3084
3085        // Leading and trailing whitespace are trimmed.
3086        channel_a
3087            .update(&mut cx_a, |channel, cx| {
3088                channel
3089                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3090                    .unwrap()
3091            })
3092            .await
3093            .unwrap();
3094        assert_eq!(
3095            db.get_channel_messages(channel_id, 10, None)
3096                .await
3097                .unwrap()
3098                .iter()
3099                .map(|m| &m.body)
3100                .collect::<Vec<_>>(),
3101            &["surrounded by whitespace"]
3102        );
3103    }
3104
3105    #[gpui::test]
3106    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3107        cx_a.foreground().forbid_parking();
3108
3109        // Connect to a server as 2 clients.
3110        let mut server = TestServer::start(cx_a.foreground()).await;
3111        let client_a = server.create_client(&mut cx_a, "user_a").await;
3112        let client_b = server.create_client(&mut cx_b, "user_b").await;
3113        let mut status_b = client_b.status();
3114
3115        // Create an org that includes these 2 users.
3116        let db = &server.app_state.db;
3117        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3118        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3119            .await
3120            .unwrap();
3121        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3122            .await
3123            .unwrap();
3124
3125        // Create a channel that includes all the users.
3126        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3127        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3128            .await
3129            .unwrap();
3130        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3131            .await
3132            .unwrap();
3133        db.create_channel_message(
3134            channel_id,
3135            client_b.current_user_id(&cx_b),
3136            "hello A, it's B.",
3137            OffsetDateTime::now_utc(),
3138            2,
3139        )
3140        .await
3141        .unwrap();
3142
3143        let channels_a = cx_a
3144            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3145        channels_a
3146            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3147            .await;
3148
3149        channels_a.read_with(&cx_a, |list, _| {
3150            assert_eq!(
3151                list.available_channels().unwrap(),
3152                &[ChannelDetails {
3153                    id: channel_id.to_proto(),
3154                    name: "test-channel".to_string()
3155                }]
3156            )
3157        });
3158        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3159            this.get_channel(channel_id.to_proto(), cx).unwrap()
3160        });
3161        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3162        channel_a
3163            .condition(&cx_a, |channel, _| {
3164                channel_messages(channel)
3165                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3166            })
3167            .await;
3168
3169        let channels_b = cx_b
3170            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3171        channels_b
3172            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3173            .await;
3174        channels_b.read_with(&cx_b, |list, _| {
3175            assert_eq!(
3176                list.available_channels().unwrap(),
3177                &[ChannelDetails {
3178                    id: channel_id.to_proto(),
3179                    name: "test-channel".to_string()
3180                }]
3181            )
3182        });
3183
3184        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3185            this.get_channel(channel_id.to_proto(), cx).unwrap()
3186        });
3187        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3188        channel_b
3189            .condition(&cx_b, |channel, _| {
3190                channel_messages(channel)
3191                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3192            })
3193            .await;
3194
3195        // Disconnect client B, ensuring we can still access its cached channel data.
3196        server.forbid_connections();
3197        server.disconnect_client(client_b.current_user_id(&cx_b));
3198        while !matches!(
3199            status_b.next().await,
3200            Some(client::Status::ReconnectionError { .. })
3201        ) {}
3202
3203        channels_b.read_with(&cx_b, |channels, _| {
3204            assert_eq!(
3205                channels.available_channels().unwrap(),
3206                [ChannelDetails {
3207                    id: channel_id.to_proto(),
3208                    name: "test-channel".to_string()
3209                }]
3210            )
3211        });
3212        channel_b.read_with(&cx_b, |channel, _| {
3213            assert_eq!(
3214                channel_messages(channel),
3215                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3216            )
3217        });
3218
3219        // Send a message from client B while it is disconnected.
3220        channel_b
3221            .update(&mut cx_b, |channel, cx| {
3222                let task = channel
3223                    .send_message("can you see this?".to_string(), cx)
3224                    .unwrap();
3225                assert_eq!(
3226                    channel_messages(channel),
3227                    &[
3228                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3229                        ("user_b".to_string(), "can you see this?".to_string(), true)
3230                    ]
3231                );
3232                task
3233            })
3234            .await
3235            .unwrap_err();
3236
3237        // Send a message from client A while B is disconnected.
3238        channel_a
3239            .update(&mut cx_a, |channel, cx| {
3240                channel
3241                    .send_message("oh, hi B.".to_string(), cx)
3242                    .unwrap()
3243                    .detach();
3244                let task = channel.send_message("sup".to_string(), cx).unwrap();
3245                assert_eq!(
3246                    channel_messages(channel),
3247                    &[
3248                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3249                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3250                        ("user_a".to_string(), "sup".to_string(), true)
3251                    ]
3252                );
3253                task
3254            })
3255            .await
3256            .unwrap();
3257
3258        // Give client B a chance to reconnect.
3259        server.allow_connections();
3260        cx_b.foreground().advance_clock(Duration::from_secs(10));
3261
3262        // Verify that B sees the new messages upon reconnection, as well as the message client B
3263        // sent while offline.
3264        channel_b
3265            .condition(&cx_b, |channel, _| {
3266                channel_messages(channel)
3267                    == [
3268                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3269                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3270                        ("user_a".to_string(), "sup".to_string(), false),
3271                        ("user_b".to_string(), "can you see this?".to_string(), false),
3272                    ]
3273            })
3274            .await;
3275
3276        // Ensure client A and B can communicate normally after reconnection.
3277        channel_a
3278            .update(&mut cx_a, |channel, cx| {
3279                channel.send_message("you online?".to_string(), cx).unwrap()
3280            })
3281            .await
3282            .unwrap();
3283        channel_b
3284            .condition(&cx_b, |channel, _| {
3285                channel_messages(channel)
3286                    == [
3287                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3288                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3289                        ("user_a".to_string(), "sup".to_string(), false),
3290                        ("user_b".to_string(), "can you see this?".to_string(), false),
3291                        ("user_a".to_string(), "you online?".to_string(), false),
3292                    ]
3293            })
3294            .await;
3295
3296        channel_b
3297            .update(&mut cx_b, |channel, cx| {
3298                channel.send_message("yep".to_string(), cx).unwrap()
3299            })
3300            .await
3301            .unwrap();
3302        channel_a
3303            .condition(&cx_a, |channel, _| {
3304                channel_messages(channel)
3305                    == [
3306                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3307                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3308                        ("user_a".to_string(), "sup".to_string(), false),
3309                        ("user_b".to_string(), "can you see this?".to_string(), false),
3310                        ("user_a".to_string(), "you online?".to_string(), false),
3311                        ("user_b".to_string(), "yep".to_string(), false),
3312                    ]
3313            })
3314            .await;
3315    }
3316
3317    #[gpui::test]
3318    async fn test_contacts(
3319        mut cx_a: TestAppContext,
3320        mut cx_b: TestAppContext,
3321        mut cx_c: TestAppContext,
3322    ) {
3323        cx_a.foreground().forbid_parking();
3324        let lang_registry = Arc::new(LanguageRegistry::new());
3325        let fs = Arc::new(FakeFs::new(cx_a.background()));
3326
3327        // Connect to a server as 3 clients.
3328        let mut server = TestServer::start(cx_a.foreground()).await;
3329        let client_a = server.create_client(&mut cx_a, "user_a").await;
3330        let client_b = server.create_client(&mut cx_b, "user_b").await;
3331        let client_c = server.create_client(&mut cx_c, "user_c").await;
3332
3333        // Share a worktree as client A.
3334        fs.insert_tree(
3335            "/a",
3336            json!({
3337                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3338            }),
3339        )
3340        .await;
3341
3342        let project_a = cx_a.update(|cx| {
3343            Project::local(
3344                client_a.clone(),
3345                client_a.user_store.clone(),
3346                lang_registry.clone(),
3347                fs.clone(),
3348                cx,
3349            )
3350        });
3351        let (worktree_a, _) = project_a
3352            .update(&mut cx_a, |p, cx| {
3353                p.find_or_create_local_worktree("/a", false, cx)
3354            })
3355            .await
3356            .unwrap();
3357        worktree_a
3358            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3359            .await;
3360
3361        client_a
3362            .user_store
3363            .condition(&cx_a, |user_store, _| {
3364                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3365            })
3366            .await;
3367        client_b
3368            .user_store
3369            .condition(&cx_b, |user_store, _| {
3370                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3371            })
3372            .await;
3373        client_c
3374            .user_store
3375            .condition(&cx_c, |user_store, _| {
3376                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3377            })
3378            .await;
3379
3380        let project_id = project_a
3381            .update(&mut cx_a, |project, _| project.next_remote_id())
3382            .await;
3383        project_a
3384            .update(&mut cx_a, |project, cx| project.share(cx))
3385            .await
3386            .unwrap();
3387
3388        let _project_b = Project::remote(
3389            project_id,
3390            client_b.clone(),
3391            client_b.user_store.clone(),
3392            lang_registry.clone(),
3393            fs.clone(),
3394            &mut cx_b.to_async(),
3395        )
3396        .await
3397        .unwrap();
3398
3399        client_a
3400            .user_store
3401            .condition(&cx_a, |user_store, _| {
3402                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3403            })
3404            .await;
3405        client_b
3406            .user_store
3407            .condition(&cx_b, |user_store, _| {
3408                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3409            })
3410            .await;
3411        client_c
3412            .user_store
3413            .condition(&cx_c, |user_store, _| {
3414                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3415            })
3416            .await;
3417
3418        project_a
3419            .condition(&cx_a, |project, _| {
3420                project.collaborators().contains_key(&client_b.peer_id)
3421            })
3422            .await;
3423
3424        cx_a.update(move |_| drop(project_a));
3425        client_a
3426            .user_store
3427            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3428            .await;
3429        client_b
3430            .user_store
3431            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3432            .await;
3433        client_c
3434            .user_store
3435            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3436            .await;
3437
3438        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3439            user_store
3440                .contacts()
3441                .iter()
3442                .map(|contact| {
3443                    let worktrees = contact
3444                        .projects
3445                        .iter()
3446                        .map(|p| {
3447                            (
3448                                p.worktree_root_names[0].as_str(),
3449                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3450                            )
3451                        })
3452                        .collect();
3453                    (contact.user.github_login.as_str(), worktrees)
3454                })
3455                .collect()
3456        }
3457    }
3458
3459    struct TestServer {
3460        peer: Arc<Peer>,
3461        app_state: Arc<AppState>,
3462        server: Arc<Server>,
3463        foreground: Rc<executor::Foreground>,
3464        notifications: mpsc::Receiver<()>,
3465        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3466        forbid_connections: Arc<AtomicBool>,
3467        _test_db: TestDb,
3468    }
3469
3470    impl TestServer {
3471        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3472            let test_db = TestDb::new();
3473            let app_state = Self::build_app_state(&test_db).await;
3474            let peer = Peer::new();
3475            let notifications = mpsc::channel(128);
3476            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3477            Self {
3478                peer,
3479                app_state,
3480                server,
3481                foreground,
3482                notifications: notifications.1,
3483                connection_killers: Default::default(),
3484                forbid_connections: Default::default(),
3485                _test_db: test_db,
3486            }
3487        }
3488
3489        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3490            let http = FakeHttpClient::with_404_response();
3491            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3492            let client_name = name.to_string();
3493            let mut client = Client::new(http.clone());
3494            let server = self.server.clone();
3495            let connection_killers = self.connection_killers.clone();
3496            let forbid_connections = self.forbid_connections.clone();
3497            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3498
3499            Arc::get_mut(&mut client)
3500                .unwrap()
3501                .override_authenticate(move |cx| {
3502                    cx.spawn(|_| async move {
3503                        let access_token = "the-token".to_string();
3504                        Ok(Credentials {
3505                            user_id: user_id.0 as u64,
3506                            access_token,
3507                        })
3508                    })
3509                })
3510                .override_establish_connection(move |credentials, cx| {
3511                    assert_eq!(credentials.user_id, user_id.0 as u64);
3512                    assert_eq!(credentials.access_token, "the-token");
3513
3514                    let server = server.clone();
3515                    let connection_killers = connection_killers.clone();
3516                    let forbid_connections = forbid_connections.clone();
3517                    let client_name = client_name.clone();
3518                    let connection_id_tx = connection_id_tx.clone();
3519                    cx.spawn(move |cx| async move {
3520                        if forbid_connections.load(SeqCst) {
3521                            Err(EstablishConnectionError::other(anyhow!(
3522                                "server is forbidding connections"
3523                            )))
3524                        } else {
3525                            let (client_conn, server_conn, kill_conn) =
3526                                Connection::in_memory(cx.background());
3527                            connection_killers.lock().insert(user_id, kill_conn);
3528                            cx.background()
3529                                .spawn(server.handle_connection(
3530                                    server_conn,
3531                                    client_name,
3532                                    user_id,
3533                                    Some(connection_id_tx),
3534                                ))
3535                                .detach();
3536                            Ok(client_conn)
3537                        }
3538                    })
3539                });
3540
3541            client
3542                .authenticate_and_connect(&cx.to_async())
3543                .await
3544                .unwrap();
3545
3546            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3547            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3548            let mut authed_user =
3549                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3550            while authed_user.next().await.unwrap().is_none() {}
3551
3552            TestClient {
3553                client,
3554                peer_id,
3555                user_store,
3556            }
3557        }
3558
3559        fn disconnect_client(&self, user_id: UserId) {
3560            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3561                let _ = kill_conn.try_send(Some(()));
3562            }
3563        }
3564
3565        fn forbid_connections(&self) {
3566            self.forbid_connections.store(true, SeqCst);
3567        }
3568
3569        fn allow_connections(&self) {
3570            self.forbid_connections.store(false, SeqCst);
3571        }
3572
3573        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3574            let mut config = Config::default();
3575            config.session_secret = "a".repeat(32);
3576            config.database_url = test_db.url.clone();
3577            let github_client = github::AppClient::test();
3578            Arc::new(AppState {
3579                db: test_db.db().clone(),
3580                handlebars: Default::default(),
3581                auth_client: auth::build_client("", ""),
3582                repo_client: github::RepoClient::test(&github_client),
3583                github_client,
3584                config,
3585            })
3586        }
3587
3588        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3589            self.server.store.read()
3590        }
3591
3592        async fn condition<F>(&mut self, mut predicate: F)
3593        where
3594            F: FnMut(&Store) -> bool,
3595        {
3596            async_std::future::timeout(Duration::from_millis(500), async {
3597                while !(predicate)(&*self.server.store.read()) {
3598                    self.foreground.start_waiting();
3599                    self.notifications.next().await;
3600                    self.foreground.finish_waiting();
3601                }
3602            })
3603            .await
3604            .expect("condition timed out");
3605        }
3606    }
3607
3608    impl Drop for TestServer {
3609        fn drop(&mut self) {
3610            self.peer.reset();
3611        }
3612    }
3613
3614    struct TestClient {
3615        client: Arc<Client>,
3616        pub peer_id: PeerId,
3617        pub user_store: ModelHandle<UserStore>,
3618    }
3619
3620    impl Deref for TestClient {
3621        type Target = Arc<Client>;
3622
3623        fn deref(&self) -> &Self::Target {
3624            &self.client
3625        }
3626    }
3627
3628    impl TestClient {
3629        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3630            UserId::from_proto(
3631                self.user_store
3632                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3633            )
3634        }
3635    }
3636
3637    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3638        channel
3639            .messages()
3640            .cursor::<()>()
3641            .map(|m| {
3642                (
3643                    m.sender.github_login.clone(),
3644                    m.body.clone(),
3645                    m.is_pending(),
3646                )
3647            })
3648            .collect()
3649    }
3650
3651    struct EmptyView;
3652
3653    impl gpui::Entity for EmptyView {
3654        type Event = ();
3655    }
3656
3657    impl gpui::View for EmptyView {
3658        fn ui_name() -> &'static str {
3659            "empty view"
3660        }
3661
3662        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3663            gpui::Element::boxed(gpui::elements::Empty)
3664        }
3665    }
3666}