rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffer)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_code_actions)
  89            .add_handler(Server::apply_code_action)
  90            .add_handler(Server::get_channels)
  91            .add_handler(Server::get_users)
  92            .add_handler(Server::join_channel)
  93            .add_handler(Server::leave_channel)
  94            .add_handler(Server::send_channel_message)
  95            .add_handler(Server::get_channel_messages);
  96
  97        Arc::new(server)
  98    }
  99
 100    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 101    where
 102        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 103        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 104        M: EnvelopedMessage,
 105    {
 106        let prev_handler = self.handlers.insert(
 107            TypeId::of::<M>(),
 108            Box::new(move |server, envelope| {
 109                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 110                (handler)(server, *envelope).boxed()
 111            }),
 112        );
 113        if prev_handler.is_some() {
 114            panic!("registered a handler for the same message twice");
 115        }
 116        self
 117    }
 118
 119    pub fn handle_connection(
 120        self: &Arc<Self>,
 121        connection: Connection,
 122        addr: String,
 123        user_id: UserId,
 124        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 125    ) -> impl Future<Output = ()> {
 126        let mut this = self.clone();
 127        async move {
 128            let (connection_id, handle_io, mut incoming_rx) =
 129                this.peer.add_connection(connection).await;
 130
 131            if let Some(send_connection_id) = send_connection_id.as_mut() {
 132                let _ = send_connection_id.send(connection_id).await;
 133            }
 134
 135            this.state_mut().add_connection(connection_id, user_id);
 136            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 137                log::error!("error updating contacts for {:?}: {}", user_id, err);
 138            }
 139
 140            let handle_io = handle_io.fuse();
 141            futures::pin_mut!(handle_io);
 142            loop {
 143                let next_message = incoming_rx.next().fuse();
 144                futures::pin_mut!(next_message);
 145                futures::select_biased! {
 146                    result = handle_io => {
 147                        if let Err(err) = result {
 148                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 149                        }
 150                        break;
 151                    }
 152                    message = next_message => {
 153                        if let Some(message) = message {
 154                            let start_time = Instant::now();
 155                            let type_name = message.payload_type_name();
 156                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 157                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 158                                if let Err(err) = (handler)(this.clone(), message).await {
 159                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 160                                } else {
 161                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 162                                }
 163
 164                                if let Some(mut notifications) = this.notifications.clone() {
 165                                    let _ = notifications.send(()).await;
 166                                }
 167                            } else {
 168                                log::warn!("unhandled message: {}", type_name);
 169                            }
 170                        } else {
 171                            log::info!("rpc connection closed {:?}", addr);
 172                            break;
 173                        }
 174                    }
 175                }
 176            }
 177
 178            if let Err(err) = this.sign_out(connection_id).await {
 179                log::error!("error signing out connection {:?} - {:?}", addr, err);
 180            }
 181        }
 182    }
 183
 184    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 185        self.peer.disconnect(connection_id);
 186        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 187
 188        for (project_id, project) in removed_connection.hosted_projects {
 189            if let Some(share) = project.share {
 190                broadcast(
 191                    connection_id,
 192                    share.guests.keys().copied().collect(),
 193                    |conn_id| {
 194                        self.peer
 195                            .send(conn_id, proto::UnshareProject { project_id })
 196                    },
 197                )?;
 198            }
 199        }
 200
 201        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 202            broadcast(connection_id, peer_ids, |conn_id| {
 203                self.peer.send(
 204                    conn_id,
 205                    proto::RemoveProjectCollaborator {
 206                        project_id,
 207                        peer_id: connection_id.0,
 208                    },
 209                )
 210            })?;
 211        }
 212
 213        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 214        Ok(())
 215    }
 216
 217    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 218        self.peer.respond(request.receipt(), proto::Ack {})?;
 219        Ok(())
 220    }
 221
 222    async fn register_project(
 223        mut self: Arc<Server>,
 224        request: TypedEnvelope<proto::RegisterProject>,
 225    ) -> tide::Result<()> {
 226        let project_id = {
 227            let mut state = self.state_mut();
 228            let user_id = state.user_id_for_connection(request.sender_id)?;
 229            state.register_project(request.sender_id, user_id)
 230        };
 231        self.peer.respond(
 232            request.receipt(),
 233            proto::RegisterProjectResponse { project_id },
 234        )?;
 235        Ok(())
 236    }
 237
 238    async fn unregister_project(
 239        mut self: Arc<Server>,
 240        request: TypedEnvelope<proto::UnregisterProject>,
 241    ) -> tide::Result<()> {
 242        let project = self
 243            .state_mut()
 244            .unregister_project(request.payload.project_id, request.sender_id)
 245            .ok_or_else(|| anyhow!("no such project"))?;
 246        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 247        Ok(())
 248    }
 249
 250    async fn share_project(
 251        mut self: Arc<Server>,
 252        request: TypedEnvelope<proto::ShareProject>,
 253    ) -> tide::Result<()> {
 254        self.state_mut()
 255            .share_project(request.payload.project_id, request.sender_id);
 256        self.peer.respond(request.receipt(), proto::Ack {})?;
 257        Ok(())
 258    }
 259
 260    async fn unshare_project(
 261        mut self: Arc<Server>,
 262        request: TypedEnvelope<proto::UnshareProject>,
 263    ) -> tide::Result<()> {
 264        let project_id = request.payload.project_id;
 265        let project = self
 266            .state_mut()
 267            .unshare_project(project_id, request.sender_id)?;
 268
 269        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 270            self.peer
 271                .send(conn_id, proto::UnshareProject { project_id })
 272        })?;
 273        self.update_contacts_for_users(&project.authorized_user_ids)?;
 274        Ok(())
 275    }
 276
 277    async fn join_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::JoinProject>,
 280    ) -> tide::Result<()> {
 281        let project_id = request.payload.project_id;
 282
 283        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 284        let response_data = self
 285            .state_mut()
 286            .join_project(request.sender_id, user_id, project_id)
 287            .and_then(|joined| {
 288                let share = joined.project.share()?;
 289                let peer_count = share.guests.len();
 290                let mut collaborators = Vec::with_capacity(peer_count);
 291                collaborators.push(proto::Collaborator {
 292                    peer_id: joined.project.host_connection_id.0,
 293                    replica_id: 0,
 294                    user_id: joined.project.host_user_id.to_proto(),
 295                });
 296                let worktrees = joined
 297                    .project
 298                    .worktrees
 299                    .iter()
 300                    .filter_map(|(id, worktree)| {
 301                        worktree.share.as_ref().map(|share| proto::Worktree {
 302                            id: *id,
 303                            root_name: worktree.root_name.clone(),
 304                            entries: share.entries.values().cloned().collect(),
 305                            diagnostic_summaries: share
 306                                .diagnostic_summaries
 307                                .values()
 308                                .cloned()
 309                                .collect(),
 310                            weak: worktree.weak,
 311                        })
 312                    })
 313                    .collect();
 314                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 315                    if *peer_conn_id != request.sender_id {
 316                        collaborators.push(proto::Collaborator {
 317                            peer_id: peer_conn_id.0,
 318                            replica_id: *peer_replica_id as u32,
 319                            user_id: peer_user_id.to_proto(),
 320                        });
 321                    }
 322                }
 323                let response = proto::JoinProjectResponse {
 324                    worktrees,
 325                    replica_id: joined.replica_id as u32,
 326                    collaborators,
 327                };
 328                let connection_ids = joined.project.connection_ids();
 329                let contact_user_ids = joined.project.authorized_user_ids();
 330                Ok((response, connection_ids, contact_user_ids))
 331            });
 332
 333        match response_data {
 334            Ok((response, connection_ids, contact_user_ids)) => {
 335                broadcast(request.sender_id, connection_ids, |conn_id| {
 336                    self.peer.send(
 337                        conn_id,
 338                        proto::AddProjectCollaborator {
 339                            project_id,
 340                            collaborator: Some(proto::Collaborator {
 341                                peer_id: request.sender_id.0,
 342                                replica_id: response.replica_id,
 343                                user_id: user_id.to_proto(),
 344                            }),
 345                        },
 346                    )
 347                })?;
 348                self.peer.respond(request.receipt(), response)?;
 349                self.update_contacts_for_users(&contact_user_ids)?;
 350            }
 351            Err(error) => {
 352                self.peer.respond_with_error(
 353                    request.receipt(),
 354                    proto::Error {
 355                        message: error.to_string(),
 356                    },
 357                )?;
 358            }
 359        }
 360
 361        Ok(())
 362    }
 363
 364    async fn leave_project(
 365        mut self: Arc<Server>,
 366        request: TypedEnvelope<proto::LeaveProject>,
 367    ) -> tide::Result<()> {
 368        let sender_id = request.sender_id;
 369        let project_id = request.payload.project_id;
 370        let worktree = self.state_mut().leave_project(sender_id, project_id);
 371        if let Some(worktree) = worktree {
 372            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 373                self.peer.send(
 374                    conn_id,
 375                    proto::RemoveProjectCollaborator {
 376                        project_id,
 377                        peer_id: sender_id.0,
 378                    },
 379                )
 380            })?;
 381            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 382        }
 383        Ok(())
 384    }
 385
 386    async fn register_worktree(
 387        mut self: Arc<Server>,
 388        request: TypedEnvelope<proto::RegisterWorktree>,
 389    ) -> tide::Result<()> {
 390        let receipt = request.receipt();
 391        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 392
 393        let mut contact_user_ids = HashSet::default();
 394        contact_user_ids.insert(host_user_id);
 395        for github_login in request.payload.authorized_logins {
 396            match self.app_state.db.create_user(&github_login, false).await {
 397                Ok(contact_user_id) => {
 398                    contact_user_ids.insert(contact_user_id);
 399                }
 400                Err(err) => {
 401                    let message = err.to_string();
 402                    self.peer
 403                        .respond_with_error(receipt, proto::Error { message })?;
 404                    return Ok(());
 405                }
 406            }
 407        }
 408
 409        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 410        let ok = self.state_mut().register_worktree(
 411            request.payload.project_id,
 412            request.payload.worktree_id,
 413            Worktree {
 414                authorized_user_ids: contact_user_ids.clone(),
 415                root_name: request.payload.root_name,
 416                share: None,
 417                weak: false,
 418            },
 419        );
 420
 421        if ok {
 422            self.peer.respond(receipt, proto::Ack {})?;
 423            self.update_contacts_for_users(&contact_user_ids)?;
 424        } else {
 425            self.peer.respond_with_error(
 426                receipt,
 427                proto::Error {
 428                    message: NO_SUCH_PROJECT.to_string(),
 429                },
 430            )?;
 431        }
 432
 433        Ok(())
 434    }
 435
 436    async fn unregister_worktree(
 437        mut self: Arc<Server>,
 438        request: TypedEnvelope<proto::UnregisterWorktree>,
 439    ) -> tide::Result<()> {
 440        let project_id = request.payload.project_id;
 441        let worktree_id = request.payload.worktree_id;
 442        let (worktree, guest_connection_ids) =
 443            self.state_mut()
 444                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 445        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 446            self.peer.send(
 447                conn_id,
 448                proto::UnregisterWorktree {
 449                    project_id,
 450                    worktree_id,
 451                },
 452            )
 453        })?;
 454        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 455        Ok(())
 456    }
 457
 458    async fn share_worktree(
 459        mut self: Arc<Server>,
 460        mut request: TypedEnvelope<proto::ShareWorktree>,
 461    ) -> tide::Result<()> {
 462        let worktree = request
 463            .payload
 464            .worktree
 465            .as_mut()
 466            .ok_or_else(|| anyhow!("missing worktree"))?;
 467        let entries = worktree
 468            .entries
 469            .iter()
 470            .map(|entry| (entry.id, entry.clone()))
 471            .collect();
 472        let diagnostic_summaries = worktree
 473            .diagnostic_summaries
 474            .iter()
 475            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 476            .collect();
 477
 478        let shared_worktree = self.state_mut().share_worktree(
 479            request.payload.project_id,
 480            worktree.id,
 481            request.sender_id,
 482            entries,
 483            diagnostic_summaries,
 484        );
 485        if let Some(shared_worktree) = shared_worktree {
 486            broadcast(
 487                request.sender_id,
 488                shared_worktree.connection_ids,
 489                |connection_id| {
 490                    self.peer.forward_send(
 491                        request.sender_id,
 492                        connection_id,
 493                        request.payload.clone(),
 494                    )
 495                },
 496            )?;
 497            self.peer.respond(request.receipt(), proto::Ack {})?;
 498            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 499        } else {
 500            self.peer.respond_with_error(
 501                request.receipt(),
 502                proto::Error {
 503                    message: "no such worktree".to_string(),
 504                },
 505            )?;
 506        }
 507        Ok(())
 508    }
 509
 510    async fn update_worktree(
 511        mut self: Arc<Server>,
 512        request: TypedEnvelope<proto::UpdateWorktree>,
 513    ) -> tide::Result<()> {
 514        let connection_ids = self
 515            .state_mut()
 516            .update_worktree(
 517                request.sender_id,
 518                request.payload.project_id,
 519                request.payload.worktree_id,
 520                &request.payload.removed_entries,
 521                &request.payload.updated_entries,
 522            )
 523            .ok_or_else(|| anyhow!("no such worktree"))?;
 524
 525        broadcast(request.sender_id, connection_ids, |connection_id| {
 526            self.peer
 527                .forward_send(request.sender_id, connection_id, request.payload.clone())
 528        })?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_diagnostic_summary(
 534        mut self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = request
 538            .payload
 539            .summary
 540            .clone()
 541            .and_then(|summary| {
 542                self.state_mut().update_diagnostic_summary(
 543                    request.payload.project_id,
 544                    request.payload.worktree_id,
 545                    request.sender_id,
 546                    summary,
 547                )
 548            })
 549            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 550
 551        broadcast(request.sender_id, receiver_ids, |connection_id| {
 552            self.peer
 553                .forward_send(request.sender_id, connection_id, request.payload.clone())
 554        })?;
 555        Ok(())
 556    }
 557
 558    async fn disk_based_diagnostics_updating(
 559        self: Arc<Server>,
 560        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 561    ) -> tide::Result<()> {
 562        let receiver_ids = self
 563            .state()
 564            .project_connection_ids(request.payload.project_id, request.sender_id)
 565            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn disk_based_diagnostics_updated(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 576    ) -> tide::Result<()> {
 577        let receiver_ids = self
 578            .state()
 579            .project_connection_ids(request.payload.project_id, request.sender_id)
 580            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 581        broadcast(request.sender_id, receiver_ids, |connection_id| {
 582            self.peer
 583                .forward_send(request.sender_id, connection_id, request.payload.clone())
 584        })?;
 585        Ok(())
 586    }
 587
 588    async fn get_definition(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::GetDefinition>,
 591    ) -> tide::Result<()> {
 592        let receipt = request.receipt();
 593        let host_connection_id = self
 594            .state()
 595            .read_project(request.payload.project_id, request.sender_id)
 596            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 597            .host_connection_id;
 598        let response = self
 599            .peer
 600            .forward_request(request.sender_id, host_connection_id, request.payload)
 601            .await?;
 602        self.peer.respond(receipt, response)?;
 603        Ok(())
 604    }
 605
 606    async fn open_buffer(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::OpenBuffer>,
 609    ) -> tide::Result<()> {
 610        let receipt = request.receipt();
 611        let host_connection_id = self
 612            .state()
 613            .read_project(request.payload.project_id, request.sender_id)
 614            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 615            .host_connection_id;
 616        let response = self
 617            .peer
 618            .forward_request(request.sender_id, host_connection_id, request.payload)
 619            .await?;
 620        self.peer.respond(receipt, response)?;
 621        Ok(())
 622    }
 623
 624    async fn close_buffer(
 625        self: Arc<Server>,
 626        request: TypedEnvelope<proto::CloseBuffer>,
 627    ) -> tide::Result<()> {
 628        let host_connection_id = self
 629            .state()
 630            .read_project(request.payload.project_id, request.sender_id)
 631            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 632            .host_connection_id;
 633        self.peer
 634            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 635        Ok(())
 636    }
 637
 638    async fn save_buffer(
 639        self: Arc<Server>,
 640        request: TypedEnvelope<proto::SaveBuffer>,
 641    ) -> tide::Result<()> {
 642        let host;
 643        let guests;
 644        {
 645            let state = self.state();
 646            let project = state
 647                .read_project(request.payload.project_id, request.sender_id)
 648                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 649            host = project.host_connection_id;
 650            guests = project.guest_connection_ids()
 651        }
 652
 653        let sender = request.sender_id;
 654        let receipt = request.receipt();
 655        let response = self
 656            .peer
 657            .forward_request(sender, host, request.payload.clone())
 658            .await?;
 659
 660        broadcast(host, guests, |conn_id| {
 661            let response = response.clone();
 662            if conn_id == sender {
 663                self.peer.respond(receipt, response)
 664            } else {
 665                self.peer.forward_send(host, conn_id, response)
 666            }
 667        })?;
 668
 669        Ok(())
 670    }
 671
 672    async fn format_buffer(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::FormatBuffer>,
 675    ) -> tide::Result<()> {
 676        let host;
 677        {
 678            let state = self.state();
 679            let project = state
 680                .read_project(request.payload.project_id, request.sender_id)
 681                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 682            host = project.host_connection_id;
 683        }
 684
 685        let sender = request.sender_id;
 686        let receipt = request.receipt();
 687        let response = self
 688            .peer
 689            .forward_request(sender, host, request.payload.clone())
 690            .await?;
 691        self.peer.respond(receipt, response)?;
 692
 693        Ok(())
 694    }
 695
 696    async fn get_completions(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::GetCompletions>,
 699    ) -> tide::Result<()> {
 700        let host;
 701        {
 702            let state = self.state();
 703            let project = state
 704                .read_project(request.payload.project_id, request.sender_id)
 705                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 706            host = project.host_connection_id;
 707        }
 708
 709        let sender = request.sender_id;
 710        let receipt = request.receipt();
 711        let response = self
 712            .peer
 713            .forward_request(sender, host, request.payload.clone())
 714            .await?;
 715        self.peer.respond(receipt, response)?;
 716        Ok(())
 717    }
 718
 719    async fn apply_additional_edits_for_completion(
 720        self: Arc<Server>,
 721        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 722    ) -> tide::Result<()> {
 723        let host;
 724        {
 725            let state = self.state();
 726            let project = state
 727                .read_project(request.payload.project_id, request.sender_id)
 728                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 729            host = project.host_connection_id;
 730        }
 731
 732        let sender = request.sender_id;
 733        let receipt = request.receipt();
 734        let response = self
 735            .peer
 736            .forward_request(sender, host, request.payload.clone())
 737            .await?;
 738        self.peer.respond(receipt, response)?;
 739        Ok(())
 740    }
 741
 742    async fn get_code_actions(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::GetCodeActions>,
 745    ) -> tide::Result<()> {
 746        let host;
 747        {
 748            let state = self.state();
 749            let project = state
 750                .read_project(request.payload.project_id, request.sender_id)
 751                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 752            host = project.host_connection_id;
 753        }
 754
 755        let sender = request.sender_id;
 756        let receipt = request.receipt();
 757        let response = self
 758            .peer
 759            .forward_request(sender, host, request.payload.clone())
 760            .await?;
 761        self.peer.respond(receipt, response)?;
 762        Ok(())
 763    }
 764
 765    async fn apply_code_action(
 766        self: Arc<Server>,
 767        request: TypedEnvelope<proto::ApplyCodeAction>,
 768    ) -> tide::Result<()> {
 769        let host;
 770        {
 771            let state = self.state();
 772            let project = state
 773                .read_project(request.payload.project_id, request.sender_id)
 774                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 775            host = project.host_connection_id;
 776        }
 777
 778        let sender = request.sender_id;
 779        let receipt = request.receipt();
 780        let response = self
 781            .peer
 782            .forward_request(sender, host, request.payload.clone())
 783            .await?;
 784        self.peer.respond(receipt, response)?;
 785        Ok(())
 786    }
 787
 788    async fn update_buffer(
 789        self: Arc<Server>,
 790        request: TypedEnvelope<proto::UpdateBuffer>,
 791    ) -> tide::Result<()> {
 792        let receiver_ids = self
 793            .state()
 794            .project_connection_ids(request.payload.project_id, request.sender_id)
 795            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 796        broadcast(request.sender_id, receiver_ids, |connection_id| {
 797            self.peer
 798                .forward_send(request.sender_id, connection_id, request.payload.clone())
 799        })?;
 800        self.peer.respond(request.receipt(), proto::Ack {})?;
 801        Ok(())
 802    }
 803
 804    async fn update_buffer_file(
 805        self: Arc<Server>,
 806        request: TypedEnvelope<proto::UpdateBufferFile>,
 807    ) -> tide::Result<()> {
 808        let receiver_ids = self
 809            .state()
 810            .project_connection_ids(request.payload.project_id, request.sender_id)
 811            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 812        broadcast(request.sender_id, receiver_ids, |connection_id| {
 813            self.peer
 814                .forward_send(request.sender_id, connection_id, request.payload.clone())
 815        })?;
 816        Ok(())
 817    }
 818
 819    async fn buffer_reloaded(
 820        self: Arc<Server>,
 821        request: TypedEnvelope<proto::BufferReloaded>,
 822    ) -> tide::Result<()> {
 823        let receiver_ids = self
 824            .state()
 825            .project_connection_ids(request.payload.project_id, request.sender_id)
 826            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 827        broadcast(request.sender_id, receiver_ids, |connection_id| {
 828            self.peer
 829                .forward_send(request.sender_id, connection_id, request.payload.clone())
 830        })?;
 831        Ok(())
 832    }
 833
 834    async fn buffer_saved(
 835        self: Arc<Server>,
 836        request: TypedEnvelope<proto::BufferSaved>,
 837    ) -> tide::Result<()> {
 838        let receiver_ids = self
 839            .state()
 840            .project_connection_ids(request.payload.project_id, request.sender_id)
 841            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 842        broadcast(request.sender_id, receiver_ids, |connection_id| {
 843            self.peer
 844                .forward_send(request.sender_id, connection_id, request.payload.clone())
 845        })?;
 846        Ok(())
 847    }
 848
 849    async fn get_channels(
 850        self: Arc<Server>,
 851        request: TypedEnvelope<proto::GetChannels>,
 852    ) -> tide::Result<()> {
 853        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 854        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 855        self.peer.respond(
 856            request.receipt(),
 857            proto::GetChannelsResponse {
 858                channels: channels
 859                    .into_iter()
 860                    .map(|chan| proto::Channel {
 861                        id: chan.id.to_proto(),
 862                        name: chan.name,
 863                    })
 864                    .collect(),
 865            },
 866        )?;
 867        Ok(())
 868    }
 869
 870    async fn get_users(
 871        self: Arc<Server>,
 872        request: TypedEnvelope<proto::GetUsers>,
 873    ) -> tide::Result<()> {
 874        let receipt = request.receipt();
 875        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 876        let users = self
 877            .app_state
 878            .db
 879            .get_users_by_ids(user_ids)
 880            .await?
 881            .into_iter()
 882            .map(|user| proto::User {
 883                id: user.id.to_proto(),
 884                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 885                github_login: user.github_login,
 886            })
 887            .collect();
 888        self.peer
 889            .respond(receipt, proto::GetUsersResponse { users })?;
 890        Ok(())
 891    }
 892
 893    fn update_contacts_for_users<'a>(
 894        self: &Arc<Server>,
 895        user_ids: impl IntoIterator<Item = &'a UserId>,
 896    ) -> anyhow::Result<()> {
 897        let mut result = Ok(());
 898        let state = self.state();
 899        for user_id in user_ids {
 900            let contacts = state.contacts_for_user(*user_id);
 901            for connection_id in state.connection_ids_for_user(*user_id) {
 902                if let Err(error) = self.peer.send(
 903                    connection_id,
 904                    proto::UpdateContacts {
 905                        contacts: contacts.clone(),
 906                    },
 907                ) {
 908                    result = Err(error);
 909                }
 910            }
 911        }
 912        result
 913    }
 914
 915    async fn join_channel(
 916        mut self: Arc<Self>,
 917        request: TypedEnvelope<proto::JoinChannel>,
 918    ) -> tide::Result<()> {
 919        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 920        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 921        if !self
 922            .app_state
 923            .db
 924            .can_user_access_channel(user_id, channel_id)
 925            .await?
 926        {
 927            Err(anyhow!("access denied"))?;
 928        }
 929
 930        self.state_mut().join_channel(request.sender_id, channel_id);
 931        let messages = self
 932            .app_state
 933            .db
 934            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 935            .await?
 936            .into_iter()
 937            .map(|msg| proto::ChannelMessage {
 938                id: msg.id.to_proto(),
 939                body: msg.body,
 940                timestamp: msg.sent_at.unix_timestamp() as u64,
 941                sender_id: msg.sender_id.to_proto(),
 942                nonce: Some(msg.nonce.as_u128().into()),
 943            })
 944            .collect::<Vec<_>>();
 945        self.peer.respond(
 946            request.receipt(),
 947            proto::JoinChannelResponse {
 948                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 949                messages,
 950            },
 951        )?;
 952        Ok(())
 953    }
 954
 955    async fn leave_channel(
 956        mut self: Arc<Self>,
 957        request: TypedEnvelope<proto::LeaveChannel>,
 958    ) -> tide::Result<()> {
 959        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 960        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 961        if !self
 962            .app_state
 963            .db
 964            .can_user_access_channel(user_id, channel_id)
 965            .await?
 966        {
 967            Err(anyhow!("access denied"))?;
 968        }
 969
 970        self.state_mut()
 971            .leave_channel(request.sender_id, channel_id);
 972
 973        Ok(())
 974    }
 975
 976    async fn send_channel_message(
 977        self: Arc<Self>,
 978        request: TypedEnvelope<proto::SendChannelMessage>,
 979    ) -> tide::Result<()> {
 980        let receipt = request.receipt();
 981        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 982        let user_id;
 983        let connection_ids;
 984        {
 985            let state = self.state();
 986            user_id = state.user_id_for_connection(request.sender_id)?;
 987            if let Some(ids) = state.channel_connection_ids(channel_id) {
 988                connection_ids = ids;
 989            } else {
 990                return Ok(());
 991            }
 992        }
 993
 994        // Validate the message body.
 995        let body = request.payload.body.trim().to_string();
 996        if body.len() > MAX_MESSAGE_LEN {
 997            self.peer.respond_with_error(
 998                receipt,
 999                proto::Error {
1000                    message: "message is too long".to_string(),
1001                },
1002            )?;
1003            return Ok(());
1004        }
1005        if body.is_empty() {
1006            self.peer.respond_with_error(
1007                receipt,
1008                proto::Error {
1009                    message: "message can't be blank".to_string(),
1010                },
1011            )?;
1012            return Ok(());
1013        }
1014
1015        let timestamp = OffsetDateTime::now_utc();
1016        let nonce = if let Some(nonce) = request.payload.nonce {
1017            nonce
1018        } else {
1019            self.peer.respond_with_error(
1020                receipt,
1021                proto::Error {
1022                    message: "nonce can't be blank".to_string(),
1023                },
1024            )?;
1025            return Ok(());
1026        };
1027
1028        let message_id = self
1029            .app_state
1030            .db
1031            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1032            .await?
1033            .to_proto();
1034        let message = proto::ChannelMessage {
1035            sender_id: user_id.to_proto(),
1036            id: message_id,
1037            body,
1038            timestamp: timestamp.unix_timestamp() as u64,
1039            nonce: Some(nonce),
1040        };
1041        broadcast(request.sender_id, connection_ids, |conn_id| {
1042            self.peer.send(
1043                conn_id,
1044                proto::ChannelMessageSent {
1045                    channel_id: channel_id.to_proto(),
1046                    message: Some(message.clone()),
1047                },
1048            )
1049        })?;
1050        self.peer.respond(
1051            receipt,
1052            proto::SendChannelMessageResponse {
1053                message: Some(message),
1054            },
1055        )?;
1056        Ok(())
1057    }
1058
1059    async fn get_channel_messages(
1060        self: Arc<Self>,
1061        request: TypedEnvelope<proto::GetChannelMessages>,
1062    ) -> tide::Result<()> {
1063        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1064        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1065        if !self
1066            .app_state
1067            .db
1068            .can_user_access_channel(user_id, channel_id)
1069            .await?
1070        {
1071            Err(anyhow!("access denied"))?;
1072        }
1073
1074        let messages = self
1075            .app_state
1076            .db
1077            .get_channel_messages(
1078                channel_id,
1079                MESSAGE_COUNT_PER_PAGE,
1080                Some(MessageId::from_proto(request.payload.before_message_id)),
1081            )
1082            .await?
1083            .into_iter()
1084            .map(|msg| proto::ChannelMessage {
1085                id: msg.id.to_proto(),
1086                body: msg.body,
1087                timestamp: msg.sent_at.unix_timestamp() as u64,
1088                sender_id: msg.sender_id.to_proto(),
1089                nonce: Some(msg.nonce.as_u128().into()),
1090            })
1091            .collect::<Vec<_>>();
1092        self.peer.respond(
1093            request.receipt(),
1094            proto::GetChannelMessagesResponse {
1095                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1096                messages,
1097            },
1098        )?;
1099        Ok(())
1100    }
1101
1102    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1103        self.store.read()
1104    }
1105
1106    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1107        self.store.write()
1108    }
1109}
1110
1111fn broadcast<F>(
1112    sender_id: ConnectionId,
1113    receiver_ids: Vec<ConnectionId>,
1114    mut f: F,
1115) -> anyhow::Result<()>
1116where
1117    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1118{
1119    let mut result = Ok(());
1120    for receiver_id in receiver_ids {
1121        if receiver_id != sender_id {
1122            if let Err(error) = f(receiver_id) {
1123                if result.is_ok() {
1124                    result = Err(error);
1125                }
1126            }
1127        }
1128    }
1129    result
1130}
1131
1132pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1133    let server = Server::new(app.state().clone(), rpc.clone(), None);
1134    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1135        let server = server.clone();
1136        async move {
1137            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1138
1139            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1140            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1141            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1142            let client_protocol_version: Option<u32> = request
1143                .header("X-Zed-Protocol-Version")
1144                .and_then(|v| v.as_str().parse().ok());
1145
1146            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1147                return Ok(Response::new(StatusCode::UpgradeRequired));
1148            }
1149
1150            let header = match request.header("Sec-Websocket-Key") {
1151                Some(h) => h.as_str(),
1152                None => return Err(anyhow!("expected sec-websocket-key"))?,
1153            };
1154
1155            let user_id = process_auth_header(&request).await?;
1156
1157            let mut response = Response::new(StatusCode::SwitchingProtocols);
1158            response.insert_header(UPGRADE, "websocket");
1159            response.insert_header(CONNECTION, "Upgrade");
1160            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1161            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1162            response.insert_header("Sec-Websocket-Version", "13");
1163
1164            let http_res: &mut tide::http::Response = response.as_mut();
1165            let upgrade_receiver = http_res.recv_upgrade().await;
1166            let addr = request.remote().unwrap_or("unknown").to_string();
1167            task::spawn(async move {
1168                if let Some(stream) = upgrade_receiver.await {
1169                    server
1170                        .handle_connection(
1171                            Connection::new(
1172                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1173                            ),
1174                            addr,
1175                            user_id,
1176                            None,
1177                        )
1178                        .await;
1179                }
1180            });
1181
1182            Ok(response)
1183        }
1184    });
1185}
1186
1187fn header_contains_ignore_case<T>(
1188    request: &tide::Request<T>,
1189    header_name: HeaderName,
1190    value: &str,
1191) -> bool {
1192    request
1193        .header(header_name)
1194        .map(|h| {
1195            h.as_str()
1196                .split(',')
1197                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1198        })
1199        .unwrap_or(false)
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204    use super::*;
1205    use crate::{
1206        auth,
1207        db::{tests::TestDb, UserId},
1208        github, AppState, Config,
1209    };
1210    use ::rpc::Peer;
1211    use async_std::task;
1212    use gpui::{executor, ModelHandle, TestAppContext};
1213    use parking_lot::Mutex;
1214    use postage::{mpsc, watch};
1215    use rand::prelude::*;
1216    use rpc::PeerId;
1217    use serde_json::json;
1218    use sqlx::types::time::OffsetDateTime;
1219    use std::{
1220        ops::Deref,
1221        path::Path,
1222        rc::Rc,
1223        sync::{
1224            atomic::{AtomicBool, Ordering::SeqCst},
1225            Arc,
1226        },
1227        time::Duration,
1228    };
1229    use zed::{
1230        client::{
1231            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1232            EstablishConnectionError, UserStore,
1233        },
1234        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1235        fs::{FakeFs, Fs as _},
1236        language::{
1237            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1238            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1239        },
1240        lsp,
1241        project::{DiagnosticSummary, Project, ProjectPath},
1242    };
1243
1244    #[cfg(test)]
1245    #[ctor::ctor]
1246    fn init_logger() {
1247        if std::env::var("RUST_LOG").is_ok() {
1248            env_logger::init();
1249        }
1250    }
1251
1252    #[gpui::test(iterations = 10)]
1253    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1254        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1255        let lang_registry = Arc::new(LanguageRegistry::new());
1256        let fs = Arc::new(FakeFs::new(cx_a.background()));
1257        cx_a.foreground().forbid_parking();
1258
1259        // Connect to a server as 2 clients.
1260        let mut server = TestServer::start(cx_a.foreground()).await;
1261        let client_a = server.create_client(&mut cx_a, "user_a").await;
1262        let client_b = server.create_client(&mut cx_b, "user_b").await;
1263
1264        // Share a project as client A
1265        fs.insert_tree(
1266            "/a",
1267            json!({
1268                ".zed.toml": r#"collaborators = ["user_b"]"#,
1269                "a.txt": "a-contents",
1270                "b.txt": "b-contents",
1271            }),
1272        )
1273        .await;
1274        let project_a = cx_a.update(|cx| {
1275            Project::local(
1276                client_a.clone(),
1277                client_a.user_store.clone(),
1278                lang_registry.clone(),
1279                fs.clone(),
1280                cx,
1281            )
1282        });
1283        let (worktree_a, _) = project_a
1284            .update(&mut cx_a, |p, cx| {
1285                p.find_or_create_local_worktree("/a", false, cx)
1286            })
1287            .await
1288            .unwrap();
1289        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1290        worktree_a
1291            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1292            .await;
1293        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1294        project_a
1295            .update(&mut cx_a, |p, cx| p.share(cx))
1296            .await
1297            .unwrap();
1298
1299        // Join that project as client B
1300        let project_b = Project::remote(
1301            project_id,
1302            client_b.clone(),
1303            client_b.user_store.clone(),
1304            lang_registry.clone(),
1305            fs.clone(),
1306            &mut cx_b.to_async(),
1307        )
1308        .await
1309        .unwrap();
1310
1311        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1312            assert_eq!(
1313                project
1314                    .collaborators()
1315                    .get(&client_a.peer_id)
1316                    .unwrap()
1317                    .user
1318                    .github_login,
1319                "user_a"
1320            );
1321            project.replica_id()
1322        });
1323        project_a
1324            .condition(&cx_a, |tree, _| {
1325                tree.collaborators()
1326                    .get(&client_b.peer_id)
1327                    .map_or(false, |collaborator| {
1328                        collaborator.replica_id == replica_id_b
1329                            && collaborator.user.github_login == "user_b"
1330                    })
1331            })
1332            .await;
1333
1334        // Open the same file as client B and client A.
1335        let buffer_b = project_b
1336            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1337            .await
1338            .unwrap();
1339        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1340        buffer_b.read_with(&cx_b, |buf, cx| {
1341            assert_eq!(buf.read(cx).text(), "b-contents")
1342        });
1343        project_a.read_with(&cx_a, |project, cx| {
1344            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1345        });
1346        let buffer_a = project_a
1347            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1348            .await
1349            .unwrap();
1350
1351        let editor_b = cx_b.add_view(window_b, |cx| {
1352            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1353        });
1354
1355        // TODO
1356        // // Create a selection set as client B and see that selection set as client A.
1357        // buffer_a
1358        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1359        //     .await;
1360
1361        // Edit the buffer as client B and see that edit as client A.
1362        editor_b.update(&mut cx_b, |editor, cx| {
1363            editor.handle_input(&Input("ok, ".into()), cx)
1364        });
1365        buffer_a
1366            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1367            .await;
1368
1369        // TODO
1370        // // Remove the selection set as client B, see those selections disappear as client A.
1371        cx_b.update(move |_| drop(editor_b));
1372        // buffer_a
1373        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1374        //     .await;
1375
1376        // Close the buffer as client A, see that the buffer is closed.
1377        cx_a.update(move |_| drop(buffer_a));
1378        project_a
1379            .condition(&cx_a, |project, cx| {
1380                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1381            })
1382            .await;
1383
1384        // Dropping the client B's project removes client B from client A's collaborators.
1385        cx_b.update(move |_| drop(project_b));
1386        project_a
1387            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1388            .await;
1389    }
1390
1391    #[gpui::test(iterations = 10)]
1392    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1393        let lang_registry = Arc::new(LanguageRegistry::new());
1394        let fs = Arc::new(FakeFs::new(cx_a.background()));
1395        cx_a.foreground().forbid_parking();
1396
1397        // Connect to a server as 2 clients.
1398        let mut server = TestServer::start(cx_a.foreground()).await;
1399        let client_a = server.create_client(&mut cx_a, "user_a").await;
1400        let client_b = server.create_client(&mut cx_b, "user_b").await;
1401
1402        // Share a project as client A
1403        fs.insert_tree(
1404            "/a",
1405            json!({
1406                ".zed.toml": r#"collaborators = ["user_b"]"#,
1407                "a.txt": "a-contents",
1408                "b.txt": "b-contents",
1409            }),
1410        )
1411        .await;
1412        let project_a = cx_a.update(|cx| {
1413            Project::local(
1414                client_a.clone(),
1415                client_a.user_store.clone(),
1416                lang_registry.clone(),
1417                fs.clone(),
1418                cx,
1419            )
1420        });
1421        let (worktree_a, _) = project_a
1422            .update(&mut cx_a, |p, cx| {
1423                p.find_or_create_local_worktree("/a", false, cx)
1424            })
1425            .await
1426            .unwrap();
1427        worktree_a
1428            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1429            .await;
1430        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1431        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1432        project_a
1433            .update(&mut cx_a, |p, cx| p.share(cx))
1434            .await
1435            .unwrap();
1436        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1437
1438        // Join that project as client B
1439        let project_b = Project::remote(
1440            project_id,
1441            client_b.clone(),
1442            client_b.user_store.clone(),
1443            lang_registry.clone(),
1444            fs.clone(),
1445            &mut cx_b.to_async(),
1446        )
1447        .await
1448        .unwrap();
1449        project_b
1450            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1451            .await
1452            .unwrap();
1453
1454        // Unshare the project as client A
1455        project_a
1456            .update(&mut cx_a, |project, cx| project.unshare(cx))
1457            .await
1458            .unwrap();
1459        project_b
1460            .condition(&mut cx_b, |project, _| project.is_read_only())
1461            .await;
1462        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1463        drop(project_b);
1464
1465        // Share the project again and ensure guests can still join.
1466        project_a
1467            .update(&mut cx_a, |project, cx| project.share(cx))
1468            .await
1469            .unwrap();
1470        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1471
1472        let project_c = Project::remote(
1473            project_id,
1474            client_b.clone(),
1475            client_b.user_store.clone(),
1476            lang_registry.clone(),
1477            fs.clone(),
1478            &mut cx_b.to_async(),
1479        )
1480        .await
1481        .unwrap();
1482        project_c
1483            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1484            .await
1485            .unwrap();
1486    }
1487
1488    #[gpui::test(iterations = 10)]
1489    async fn test_propagate_saves_and_fs_changes(
1490        mut cx_a: TestAppContext,
1491        mut cx_b: TestAppContext,
1492        mut cx_c: TestAppContext,
1493    ) {
1494        let lang_registry = Arc::new(LanguageRegistry::new());
1495        let fs = Arc::new(FakeFs::new(cx_a.background()));
1496        cx_a.foreground().forbid_parking();
1497
1498        // Connect to a server as 3 clients.
1499        let mut server = TestServer::start(cx_a.foreground()).await;
1500        let client_a = server.create_client(&mut cx_a, "user_a").await;
1501        let client_b = server.create_client(&mut cx_b, "user_b").await;
1502        let client_c = server.create_client(&mut cx_c, "user_c").await;
1503
1504        // Share a worktree as client A.
1505        fs.insert_tree(
1506            "/a",
1507            json!({
1508                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1509                "file1": "",
1510                "file2": ""
1511            }),
1512        )
1513        .await;
1514        let project_a = cx_a.update(|cx| {
1515            Project::local(
1516                client_a.clone(),
1517                client_a.user_store.clone(),
1518                lang_registry.clone(),
1519                fs.clone(),
1520                cx,
1521            )
1522        });
1523        let (worktree_a, _) = project_a
1524            .update(&mut cx_a, |p, cx| {
1525                p.find_or_create_local_worktree("/a", false, cx)
1526            })
1527            .await
1528            .unwrap();
1529        worktree_a
1530            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1531            .await;
1532        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1533        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1534        project_a
1535            .update(&mut cx_a, |p, cx| p.share(cx))
1536            .await
1537            .unwrap();
1538
1539        // Join that worktree as clients B and C.
1540        let project_b = Project::remote(
1541            project_id,
1542            client_b.clone(),
1543            client_b.user_store.clone(),
1544            lang_registry.clone(),
1545            fs.clone(),
1546            &mut cx_b.to_async(),
1547        )
1548        .await
1549        .unwrap();
1550        let project_c = Project::remote(
1551            project_id,
1552            client_c.clone(),
1553            client_c.user_store.clone(),
1554            lang_registry.clone(),
1555            fs.clone(),
1556            &mut cx_c.to_async(),
1557        )
1558        .await
1559        .unwrap();
1560        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1561        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1562
1563        // Open and edit a buffer as both guests B and C.
1564        let buffer_b = project_b
1565            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1566            .await
1567            .unwrap();
1568        let buffer_c = project_c
1569            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1570            .await
1571            .unwrap();
1572        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1573        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1574
1575        // Open and edit that buffer as the host.
1576        let buffer_a = project_a
1577            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1578            .await
1579            .unwrap();
1580
1581        buffer_a
1582            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1583            .await;
1584        buffer_a.update(&mut cx_a, |buf, cx| {
1585            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1586        });
1587
1588        // Wait for edits to propagate
1589        buffer_a
1590            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1591            .await;
1592        buffer_b
1593            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1594            .await;
1595        buffer_c
1596            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1597            .await;
1598
1599        // Edit the buffer as the host and concurrently save as guest B.
1600        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1601        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1602        save_b.await.unwrap();
1603        assert_eq!(
1604            fs.load("/a/file1".as_ref()).await.unwrap(),
1605            "hi-a, i-am-c, i-am-b, i-am-a"
1606        );
1607        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1608        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1609        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1610
1611        // Make changes on host's file system, see those changes on guest worktrees.
1612        fs.rename(
1613            "/a/file1".as_ref(),
1614            "/a/file1-renamed".as_ref(),
1615            Default::default(),
1616        )
1617        .await
1618        .unwrap();
1619        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1620            .await
1621            .unwrap();
1622        fs.insert_file(Path::new("/a/file4"), "4".into())
1623            .await
1624            .unwrap();
1625
1626        worktree_a
1627            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1628            .await;
1629        worktree_b
1630            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1631            .await;
1632        worktree_c
1633            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1634            .await;
1635        worktree_a.read_with(&cx_a, |tree, _| {
1636            assert_eq!(
1637                tree.paths()
1638                    .map(|p| p.to_string_lossy())
1639                    .collect::<Vec<_>>(),
1640                &[".zed.toml", "file1-renamed", "file3", "file4"]
1641            )
1642        });
1643        worktree_b.read_with(&cx_b, |tree, _| {
1644            assert_eq!(
1645                tree.paths()
1646                    .map(|p| p.to_string_lossy())
1647                    .collect::<Vec<_>>(),
1648                &[".zed.toml", "file1-renamed", "file3", "file4"]
1649            )
1650        });
1651        worktree_c.read_with(&cx_c, |tree, _| {
1652            assert_eq!(
1653                tree.paths()
1654                    .map(|p| p.to_string_lossy())
1655                    .collect::<Vec<_>>(),
1656                &[".zed.toml", "file1-renamed", "file3", "file4"]
1657            )
1658        });
1659
1660        // Ensure buffer files are updated as well.
1661        buffer_a
1662            .condition(&cx_a, |buf, _| {
1663                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1664            })
1665            .await;
1666        buffer_b
1667            .condition(&cx_b, |buf, _| {
1668                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1669            })
1670            .await;
1671        buffer_c
1672            .condition(&cx_c, |buf, _| {
1673                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1674            })
1675            .await;
1676    }
1677
1678    #[gpui::test(iterations = 10)]
1679    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1680        cx_a.foreground().forbid_parking();
1681        let lang_registry = Arc::new(LanguageRegistry::new());
1682        let fs = Arc::new(FakeFs::new(cx_a.background()));
1683
1684        // Connect to a server as 2 clients.
1685        let mut server = TestServer::start(cx_a.foreground()).await;
1686        let client_a = server.create_client(&mut cx_a, "user_a").await;
1687        let client_b = server.create_client(&mut cx_b, "user_b").await;
1688
1689        // Share a project as client A
1690        fs.insert_tree(
1691            "/dir",
1692            json!({
1693                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1694                "a.txt": "a-contents",
1695            }),
1696        )
1697        .await;
1698
1699        let project_a = cx_a.update(|cx| {
1700            Project::local(
1701                client_a.clone(),
1702                client_a.user_store.clone(),
1703                lang_registry.clone(),
1704                fs.clone(),
1705                cx,
1706            )
1707        });
1708        let (worktree_a, _) = project_a
1709            .update(&mut cx_a, |p, cx| {
1710                p.find_or_create_local_worktree("/dir", false, cx)
1711            })
1712            .await
1713            .unwrap();
1714        worktree_a
1715            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1716            .await;
1717        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1718        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1719        project_a
1720            .update(&mut cx_a, |p, cx| p.share(cx))
1721            .await
1722            .unwrap();
1723
1724        // Join that project as client B
1725        let project_b = Project::remote(
1726            project_id,
1727            client_b.clone(),
1728            client_b.user_store.clone(),
1729            lang_registry.clone(),
1730            fs.clone(),
1731            &mut cx_b.to_async(),
1732        )
1733        .await
1734        .unwrap();
1735        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1736
1737        // Open a buffer as client B
1738        let buffer_b = project_b
1739            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1740            .await
1741            .unwrap();
1742        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1743
1744        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1745        buffer_b.read_with(&cx_b, |buf, _| {
1746            assert!(buf.is_dirty());
1747            assert!(!buf.has_conflict());
1748        });
1749
1750        buffer_b
1751            .update(&mut cx_b, |buf, cx| buf.save(cx))
1752            .await
1753            .unwrap();
1754        worktree_b
1755            .condition(&cx_b, |_, cx| {
1756                buffer_b.read(cx).file().unwrap().mtime() != mtime
1757            })
1758            .await;
1759        buffer_b.read_with(&cx_b, |buf, _| {
1760            assert!(!buf.is_dirty());
1761            assert!(!buf.has_conflict());
1762        });
1763
1764        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1765        buffer_b.read_with(&cx_b, |buf, _| {
1766            assert!(buf.is_dirty());
1767            assert!(!buf.has_conflict());
1768        });
1769    }
1770
1771    #[gpui::test(iterations = 10)]
1772    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1773        cx_a.foreground().forbid_parking();
1774        let lang_registry = Arc::new(LanguageRegistry::new());
1775        let fs = Arc::new(FakeFs::new(cx_a.background()));
1776
1777        // Connect to a server as 2 clients.
1778        let mut server = TestServer::start(cx_a.foreground()).await;
1779        let client_a = server.create_client(&mut cx_a, "user_a").await;
1780        let client_b = server.create_client(&mut cx_b, "user_b").await;
1781
1782        // Share a project as client A
1783        fs.insert_tree(
1784            "/dir",
1785            json!({
1786                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1787                "a.txt": "a-contents",
1788            }),
1789        )
1790        .await;
1791
1792        let project_a = cx_a.update(|cx| {
1793            Project::local(
1794                client_a.clone(),
1795                client_a.user_store.clone(),
1796                lang_registry.clone(),
1797                fs.clone(),
1798                cx,
1799            )
1800        });
1801        let (worktree_a, _) = project_a
1802            .update(&mut cx_a, |p, cx| {
1803                p.find_or_create_local_worktree("/dir", false, cx)
1804            })
1805            .await
1806            .unwrap();
1807        worktree_a
1808            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1809            .await;
1810        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1811        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1812        project_a
1813            .update(&mut cx_a, |p, cx| p.share(cx))
1814            .await
1815            .unwrap();
1816
1817        // Join that project as client B
1818        let project_b = Project::remote(
1819            project_id,
1820            client_b.clone(),
1821            client_b.user_store.clone(),
1822            lang_registry.clone(),
1823            fs.clone(),
1824            &mut cx_b.to_async(),
1825        )
1826        .await
1827        .unwrap();
1828        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1829
1830        // Open a buffer as client B
1831        let buffer_b = project_b
1832            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1833            .await
1834            .unwrap();
1835        buffer_b.read_with(&cx_b, |buf, _| {
1836            assert!(!buf.is_dirty());
1837            assert!(!buf.has_conflict());
1838        });
1839
1840        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1841            .await
1842            .unwrap();
1843        buffer_b
1844            .condition(&cx_b, |buf, _| {
1845                buf.text() == "new contents" && !buf.is_dirty()
1846            })
1847            .await;
1848        buffer_b.read_with(&cx_b, |buf, _| {
1849            assert!(!buf.has_conflict());
1850        });
1851    }
1852
1853    #[gpui::test(iterations = 100)]
1854    async fn test_editing_while_guest_opens_buffer(
1855        mut cx_a: TestAppContext,
1856        mut cx_b: TestAppContext,
1857    ) {
1858        cx_a.foreground().forbid_parking();
1859        let lang_registry = Arc::new(LanguageRegistry::new());
1860        let fs = Arc::new(FakeFs::new(cx_a.background()));
1861
1862        // Connect to a server as 2 clients.
1863        let mut server = TestServer::start(cx_a.foreground()).await;
1864        let client_a = server.create_client(&mut cx_a, "user_a").await;
1865        let client_b = server.create_client(&mut cx_b, "user_b").await;
1866
1867        // Share a project as client A
1868        fs.insert_tree(
1869            "/dir",
1870            json!({
1871                ".zed.toml": r#"collaborators = ["user_b"]"#,
1872                "a.txt": "a-contents",
1873            }),
1874        )
1875        .await;
1876        let project_a = cx_a.update(|cx| {
1877            Project::local(
1878                client_a.clone(),
1879                client_a.user_store.clone(),
1880                lang_registry.clone(),
1881                fs.clone(),
1882                cx,
1883            )
1884        });
1885        let (worktree_a, _) = project_a
1886            .update(&mut cx_a, |p, cx| {
1887                p.find_or_create_local_worktree("/dir", false, cx)
1888            })
1889            .await
1890            .unwrap();
1891        worktree_a
1892            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1893            .await;
1894        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1895        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1896        project_a
1897            .update(&mut cx_a, |p, cx| p.share(cx))
1898            .await
1899            .unwrap();
1900
1901        // Join that project as client B
1902        let project_b = Project::remote(
1903            project_id,
1904            client_b.clone(),
1905            client_b.user_store.clone(),
1906            lang_registry.clone(),
1907            fs.clone(),
1908            &mut cx_b.to_async(),
1909        )
1910        .await
1911        .unwrap();
1912
1913        // Open a buffer as client A
1914        let buffer_a = project_a
1915            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1916            .await
1917            .unwrap();
1918
1919        // Start opening the same buffer as client B
1920        let buffer_b = cx_b
1921            .background()
1922            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1923        task::yield_now().await;
1924
1925        // Edit the buffer as client A while client B is still opening it.
1926        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1927
1928        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1929        let buffer_b = buffer_b.await.unwrap();
1930        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1931    }
1932
1933    #[gpui::test(iterations = 10)]
1934    async fn test_leaving_worktree_while_opening_buffer(
1935        mut cx_a: TestAppContext,
1936        mut cx_b: TestAppContext,
1937    ) {
1938        cx_a.foreground().forbid_parking();
1939        let lang_registry = Arc::new(LanguageRegistry::new());
1940        let fs = Arc::new(FakeFs::new(cx_a.background()));
1941
1942        // Connect to a server as 2 clients.
1943        let mut server = TestServer::start(cx_a.foreground()).await;
1944        let client_a = server.create_client(&mut cx_a, "user_a").await;
1945        let client_b = server.create_client(&mut cx_b, "user_b").await;
1946
1947        // Share a project as client A
1948        fs.insert_tree(
1949            "/dir",
1950            json!({
1951                ".zed.toml": r#"collaborators = ["user_b"]"#,
1952                "a.txt": "a-contents",
1953            }),
1954        )
1955        .await;
1956        let project_a = cx_a.update(|cx| {
1957            Project::local(
1958                client_a.clone(),
1959                client_a.user_store.clone(),
1960                lang_registry.clone(),
1961                fs.clone(),
1962                cx,
1963            )
1964        });
1965        let (worktree_a, _) = project_a
1966            .update(&mut cx_a, |p, cx| {
1967                p.find_or_create_local_worktree("/dir", false, cx)
1968            })
1969            .await
1970            .unwrap();
1971        worktree_a
1972            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1973            .await;
1974        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1975        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1976        project_a
1977            .update(&mut cx_a, |p, cx| p.share(cx))
1978            .await
1979            .unwrap();
1980
1981        // Join that project as client B
1982        let project_b = Project::remote(
1983            project_id,
1984            client_b.clone(),
1985            client_b.user_store.clone(),
1986            lang_registry.clone(),
1987            fs.clone(),
1988            &mut cx_b.to_async(),
1989        )
1990        .await
1991        .unwrap();
1992
1993        // See that a guest has joined as client A.
1994        project_a
1995            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1996            .await;
1997
1998        // Begin opening a buffer as client B, but leave the project before the open completes.
1999        let buffer_b = cx_b
2000            .background()
2001            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2002        cx_b.update(|_| drop(project_b));
2003        drop(buffer_b);
2004
2005        // See that the guest has left.
2006        project_a
2007            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2008            .await;
2009    }
2010
2011    #[gpui::test(iterations = 10)]
2012    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2013        cx_a.foreground().forbid_parking();
2014        let lang_registry = Arc::new(LanguageRegistry::new());
2015        let fs = Arc::new(FakeFs::new(cx_a.background()));
2016
2017        // Connect to a server as 2 clients.
2018        let mut server = TestServer::start(cx_a.foreground()).await;
2019        let client_a = server.create_client(&mut cx_a, "user_a").await;
2020        let client_b = server.create_client(&mut cx_b, "user_b").await;
2021
2022        // Share a project as client A
2023        fs.insert_tree(
2024            "/a",
2025            json!({
2026                ".zed.toml": r#"collaborators = ["user_b"]"#,
2027                "a.txt": "a-contents",
2028                "b.txt": "b-contents",
2029            }),
2030        )
2031        .await;
2032        let project_a = cx_a.update(|cx| {
2033            Project::local(
2034                client_a.clone(),
2035                client_a.user_store.clone(),
2036                lang_registry.clone(),
2037                fs.clone(),
2038                cx,
2039            )
2040        });
2041        let (worktree_a, _) = project_a
2042            .update(&mut cx_a, |p, cx| {
2043                p.find_or_create_local_worktree("/a", false, cx)
2044            })
2045            .await
2046            .unwrap();
2047        worktree_a
2048            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2049            .await;
2050        let project_id = project_a
2051            .update(&mut cx_a, |project, _| project.next_remote_id())
2052            .await;
2053        project_a
2054            .update(&mut cx_a, |project, cx| project.share(cx))
2055            .await
2056            .unwrap();
2057
2058        // Join that project as client B
2059        let _project_b = Project::remote(
2060            project_id,
2061            client_b.clone(),
2062            client_b.user_store.clone(),
2063            lang_registry.clone(),
2064            fs.clone(),
2065            &mut cx_b.to_async(),
2066        )
2067        .await
2068        .unwrap();
2069
2070        // See that a guest has joined as client A.
2071        project_a
2072            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2073            .await;
2074
2075        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2076        client_b.disconnect(&cx_b.to_async()).unwrap();
2077        project_a
2078            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2079            .await;
2080    }
2081
2082    #[gpui::test(iterations = 10)]
2083    async fn test_collaborating_with_diagnostics(
2084        mut cx_a: TestAppContext,
2085        mut cx_b: TestAppContext,
2086    ) {
2087        cx_a.foreground().forbid_parking();
2088        let mut lang_registry = Arc::new(LanguageRegistry::new());
2089        let fs = Arc::new(FakeFs::new(cx_a.background()));
2090
2091        // Set up a fake language server.
2092        let (language_server_config, mut fake_language_server) =
2093            LanguageServerConfig::fake(&cx_a).await;
2094        Arc::get_mut(&mut lang_registry)
2095            .unwrap()
2096            .add(Arc::new(Language::new(
2097                LanguageConfig {
2098                    name: "Rust".to_string(),
2099                    path_suffixes: vec!["rs".to_string()],
2100                    language_server: Some(language_server_config),
2101                    ..Default::default()
2102                },
2103                Some(tree_sitter_rust::language()),
2104            )));
2105
2106        // Connect to a server as 2 clients.
2107        let mut server = TestServer::start(cx_a.foreground()).await;
2108        let client_a = server.create_client(&mut cx_a, "user_a").await;
2109        let client_b = server.create_client(&mut cx_b, "user_b").await;
2110
2111        // Share a project as client A
2112        fs.insert_tree(
2113            "/a",
2114            json!({
2115                ".zed.toml": r#"collaborators = ["user_b"]"#,
2116                "a.rs": "let one = two",
2117                "other.rs": "",
2118            }),
2119        )
2120        .await;
2121        let project_a = cx_a.update(|cx| {
2122            Project::local(
2123                client_a.clone(),
2124                client_a.user_store.clone(),
2125                lang_registry.clone(),
2126                fs.clone(),
2127                cx,
2128            )
2129        });
2130        let (worktree_a, _) = project_a
2131            .update(&mut cx_a, |p, cx| {
2132                p.find_or_create_local_worktree("/a", false, cx)
2133            })
2134            .await
2135            .unwrap();
2136        worktree_a
2137            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2138            .await;
2139        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2140        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2141        project_a
2142            .update(&mut cx_a, |p, cx| p.share(cx))
2143            .await
2144            .unwrap();
2145
2146        // Cause the language server to start.
2147        let _ = cx_a
2148            .background()
2149            .spawn(project_a.update(&mut cx_a, |project, cx| {
2150                project.open_buffer(
2151                    ProjectPath {
2152                        worktree_id,
2153                        path: Path::new("other.rs").into(),
2154                    },
2155                    cx,
2156                )
2157            }))
2158            .await
2159            .unwrap();
2160
2161        // Simulate a language server reporting errors for a file.
2162        fake_language_server
2163            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2164                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2165                version: None,
2166                diagnostics: vec![lsp::Diagnostic {
2167                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2168                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2169                    message: "message 1".to_string(),
2170                    ..Default::default()
2171                }],
2172            })
2173            .await;
2174
2175        // Wait for server to see the diagnostics update.
2176        server
2177            .condition(|store| {
2178                let worktree = store
2179                    .project(project_id)
2180                    .unwrap()
2181                    .worktrees
2182                    .get(&worktree_id.to_proto())
2183                    .unwrap();
2184
2185                !worktree
2186                    .share
2187                    .as_ref()
2188                    .unwrap()
2189                    .diagnostic_summaries
2190                    .is_empty()
2191            })
2192            .await;
2193
2194        // Join the worktree as client B.
2195        let project_b = Project::remote(
2196            project_id,
2197            client_b.clone(),
2198            client_b.user_store.clone(),
2199            lang_registry.clone(),
2200            fs.clone(),
2201            &mut cx_b.to_async(),
2202        )
2203        .await
2204        .unwrap();
2205
2206        project_b.read_with(&cx_b, |project, cx| {
2207            assert_eq!(
2208                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2209                &[(
2210                    ProjectPath {
2211                        worktree_id,
2212                        path: Arc::from(Path::new("a.rs")),
2213                    },
2214                    DiagnosticSummary {
2215                        error_count: 1,
2216                        warning_count: 0,
2217                        ..Default::default()
2218                    },
2219                )]
2220            )
2221        });
2222
2223        // Simulate a language server reporting more errors for a file.
2224        fake_language_server
2225            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2226                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2227                version: None,
2228                diagnostics: vec![
2229                    lsp::Diagnostic {
2230                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2231                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2232                        message: "message 1".to_string(),
2233                        ..Default::default()
2234                    },
2235                    lsp::Diagnostic {
2236                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2237                        range: lsp::Range::new(
2238                            lsp::Position::new(0, 10),
2239                            lsp::Position::new(0, 13),
2240                        ),
2241                        message: "message 2".to_string(),
2242                        ..Default::default()
2243                    },
2244                ],
2245            })
2246            .await;
2247
2248        // Client b gets the updated summaries
2249        project_b
2250            .condition(&cx_b, |project, cx| {
2251                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2252                    == &[(
2253                        ProjectPath {
2254                            worktree_id,
2255                            path: Arc::from(Path::new("a.rs")),
2256                        },
2257                        DiagnosticSummary {
2258                            error_count: 1,
2259                            warning_count: 1,
2260                            ..Default::default()
2261                        },
2262                    )]
2263            })
2264            .await;
2265
2266        // Open the file with the errors on client B. They should be present.
2267        let buffer_b = cx_b
2268            .background()
2269            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2270            .await
2271            .unwrap();
2272
2273        buffer_b.read_with(&cx_b, |buffer, _| {
2274            assert_eq!(
2275                buffer
2276                    .snapshot()
2277                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2278                    .map(|entry| entry)
2279                    .collect::<Vec<_>>(),
2280                &[
2281                    DiagnosticEntry {
2282                        range: Point::new(0, 4)..Point::new(0, 7),
2283                        diagnostic: Diagnostic {
2284                            group_id: 0,
2285                            message: "message 1".to_string(),
2286                            severity: lsp::DiagnosticSeverity::ERROR,
2287                            is_primary: true,
2288                            ..Default::default()
2289                        }
2290                    },
2291                    DiagnosticEntry {
2292                        range: Point::new(0, 10)..Point::new(0, 13),
2293                        diagnostic: Diagnostic {
2294                            group_id: 1,
2295                            severity: lsp::DiagnosticSeverity::WARNING,
2296                            message: "message 2".to_string(),
2297                            is_primary: true,
2298                            ..Default::default()
2299                        }
2300                    }
2301                ]
2302            );
2303        });
2304    }
2305
2306    #[gpui::test(iterations = 10)]
2307    async fn test_collaborating_with_completion(
2308        mut cx_a: TestAppContext,
2309        mut cx_b: TestAppContext,
2310    ) {
2311        cx_a.foreground().forbid_parking();
2312        let mut lang_registry = Arc::new(LanguageRegistry::new());
2313        let fs = Arc::new(FakeFs::new(cx_a.background()));
2314
2315        // Set up a fake language server.
2316        let (language_server_config, mut fake_language_server) =
2317            LanguageServerConfig::fake_with_capabilities(
2318                lsp::ServerCapabilities {
2319                    completion_provider: Some(lsp::CompletionOptions {
2320                        trigger_characters: Some(vec![".".to_string()]),
2321                        ..Default::default()
2322                    }),
2323                    ..Default::default()
2324                },
2325                &cx_a,
2326            )
2327            .await;
2328        Arc::get_mut(&mut lang_registry)
2329            .unwrap()
2330            .add(Arc::new(Language::new(
2331                LanguageConfig {
2332                    name: "Rust".to_string(),
2333                    path_suffixes: vec!["rs".to_string()],
2334                    language_server: Some(language_server_config),
2335                    ..Default::default()
2336                },
2337                Some(tree_sitter_rust::language()),
2338            )));
2339
2340        // Connect to a server as 2 clients.
2341        let mut server = TestServer::start(cx_a.foreground()).await;
2342        let client_a = server.create_client(&mut cx_a, "user_a").await;
2343        let client_b = server.create_client(&mut cx_b, "user_b").await;
2344
2345        // Share a project as client A
2346        fs.insert_tree(
2347            "/a",
2348            json!({
2349                ".zed.toml": r#"collaborators = ["user_b"]"#,
2350                "main.rs": "fn main() { a }",
2351                "other.rs": "",
2352            }),
2353        )
2354        .await;
2355        let project_a = cx_a.update(|cx| {
2356            Project::local(
2357                client_a.clone(),
2358                client_a.user_store.clone(),
2359                lang_registry.clone(),
2360                fs.clone(),
2361                cx,
2362            )
2363        });
2364        let (worktree_a, _) = project_a
2365            .update(&mut cx_a, |p, cx| {
2366                p.find_or_create_local_worktree("/a", false, cx)
2367            })
2368            .await
2369            .unwrap();
2370        worktree_a
2371            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2372            .await;
2373        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2374        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2375        project_a
2376            .update(&mut cx_a, |p, cx| p.share(cx))
2377            .await
2378            .unwrap();
2379
2380        // Join the worktree as client B.
2381        let project_b = Project::remote(
2382            project_id,
2383            client_b.clone(),
2384            client_b.user_store.clone(),
2385            lang_registry.clone(),
2386            fs.clone(),
2387            &mut cx_b.to_async(),
2388        )
2389        .await
2390        .unwrap();
2391
2392        // Open a file in an editor as the guest.
2393        let buffer_b = project_b
2394            .update(&mut cx_b, |p, cx| {
2395                p.open_buffer((worktree_id, "main.rs"), cx)
2396            })
2397            .await
2398            .unwrap();
2399        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2400        let editor_b = cx_b.add_view(window_b, |cx| {
2401            Editor::for_buffer(
2402                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2403                Arc::new(|cx| EditorSettings::test(cx)),
2404                Some(project_b.clone()),
2405                cx,
2406            )
2407        });
2408
2409        // Type a completion trigger character as the guest.
2410        editor_b.update(&mut cx_b, |editor, cx| {
2411            editor.select_ranges([13..13], None, cx);
2412            editor.handle_input(&Input(".".into()), cx);
2413            cx.focus(&editor_b);
2414        });
2415
2416        // Receive a completion request as the host's language server.
2417        let (request_id, params) = fake_language_server
2418            .receive_request::<lsp::request::Completion>()
2419            .await;
2420        assert_eq!(
2421            params.text_document_position.text_document.uri,
2422            lsp::Url::from_file_path("/a/main.rs").unwrap(),
2423        );
2424        assert_eq!(
2425            params.text_document_position.position,
2426            lsp::Position::new(0, 14),
2427        );
2428
2429        // Return some completions from the host's language server.
2430        fake_language_server
2431            .respond(
2432                request_id,
2433                Some(lsp::CompletionResponse::Array(vec![
2434                    lsp::CompletionItem {
2435                        label: "first_method(…)".into(),
2436                        detail: Some("fn(&mut self, B) -> C".into()),
2437                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2438                            new_text: "first_method($1)".to_string(),
2439                            range: lsp::Range::new(
2440                                lsp::Position::new(0, 14),
2441                                lsp::Position::new(0, 14),
2442                            ),
2443                        })),
2444                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2445                        ..Default::default()
2446                    },
2447                    lsp::CompletionItem {
2448                        label: "second_method(…)".into(),
2449                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2450                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2451                            new_text: "second_method()".to_string(),
2452                            range: lsp::Range::new(
2453                                lsp::Position::new(0, 14),
2454                                lsp::Position::new(0, 14),
2455                            ),
2456                        })),
2457                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2458                        ..Default::default()
2459                    },
2460                ])),
2461            )
2462            .await;
2463
2464        // Open the buffer on the host.
2465        let buffer_a = project_a
2466            .update(&mut cx_a, |p, cx| {
2467                p.open_buffer((worktree_id, "main.rs"), cx)
2468            })
2469            .await
2470            .unwrap();
2471        buffer_a
2472            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2473            .await;
2474
2475        // Confirm a completion on the guest.
2476        editor_b.next_notification(&cx_b).await;
2477        editor_b.update(&mut cx_b, |editor, cx| {
2478            assert!(editor.showing_context_menu());
2479            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2480            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2481        });
2482
2483        buffer_a
2484            .condition(&cx_a, |buffer, _| {
2485                buffer.text() == "fn main() { a.first_method() }"
2486            })
2487            .await;
2488
2489        // Receive a request resolve the selected completion on the host's language server.
2490        let (request_id, params) = fake_language_server
2491            .receive_request::<lsp::request::ResolveCompletionItem>()
2492            .await;
2493        assert_eq!(params.label, "first_method(…)");
2494
2495        // Return a resolved completion from the host's language server.
2496        // The resolved completion has an additional text edit.
2497        fake_language_server
2498            .respond(
2499                request_id,
2500                lsp::CompletionItem {
2501                    label: "first_method(…)".into(),
2502                    detail: Some("fn(&mut self, B) -> C".into()),
2503                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2504                        new_text: "first_method($1)".to_string(),
2505                        range: lsp::Range::new(
2506                            lsp::Position::new(0, 14),
2507                            lsp::Position::new(0, 14),
2508                        ),
2509                    })),
2510                    additional_text_edits: Some(vec![lsp::TextEdit {
2511                        new_text: "use d::SomeTrait;\n".to_string(),
2512                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2513                    }]),
2514                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2515                    ..Default::default()
2516                },
2517            )
2518            .await;
2519
2520        // The additional edit is applied.
2521        buffer_b
2522            .condition(&cx_b, |buffer, _| {
2523                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2524            })
2525            .await;
2526        assert_eq!(
2527            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2528            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2529        );
2530    }
2531
2532    #[gpui::test(iterations = 10)]
2533    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2534        cx_a.foreground().forbid_parking();
2535        let mut lang_registry = Arc::new(LanguageRegistry::new());
2536        let fs = Arc::new(FakeFs::new(cx_a.background()));
2537
2538        // Set up a fake language server.
2539        let (language_server_config, mut fake_language_server) =
2540            LanguageServerConfig::fake(&cx_a).await;
2541        Arc::get_mut(&mut lang_registry)
2542            .unwrap()
2543            .add(Arc::new(Language::new(
2544                LanguageConfig {
2545                    name: "Rust".to_string(),
2546                    path_suffixes: vec!["rs".to_string()],
2547                    language_server: Some(language_server_config),
2548                    ..Default::default()
2549                },
2550                Some(tree_sitter_rust::language()),
2551            )));
2552
2553        // Connect to a server as 2 clients.
2554        let mut server = TestServer::start(cx_a.foreground()).await;
2555        let client_a = server.create_client(&mut cx_a, "user_a").await;
2556        let client_b = server.create_client(&mut cx_b, "user_b").await;
2557
2558        // Share a project as client A
2559        fs.insert_tree(
2560            "/a",
2561            json!({
2562                ".zed.toml": r#"collaborators = ["user_b"]"#,
2563                "a.rs": "let one = two",
2564            }),
2565        )
2566        .await;
2567        let project_a = cx_a.update(|cx| {
2568            Project::local(
2569                client_a.clone(),
2570                client_a.user_store.clone(),
2571                lang_registry.clone(),
2572                fs.clone(),
2573                cx,
2574            )
2575        });
2576        let (worktree_a, _) = project_a
2577            .update(&mut cx_a, |p, cx| {
2578                p.find_or_create_local_worktree("/a", false, cx)
2579            })
2580            .await
2581            .unwrap();
2582        worktree_a
2583            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2584            .await;
2585        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2586        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2587        project_a
2588            .update(&mut cx_a, |p, cx| p.share(cx))
2589            .await
2590            .unwrap();
2591
2592        // Join the worktree as client B.
2593        let project_b = Project::remote(
2594            project_id,
2595            client_b.clone(),
2596            client_b.user_store.clone(),
2597            lang_registry.clone(),
2598            fs.clone(),
2599            &mut cx_b.to_async(),
2600        )
2601        .await
2602        .unwrap();
2603
2604        let buffer_b = cx_b
2605            .background()
2606            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2607            .await
2608            .unwrap();
2609
2610        let format = buffer_b.update(&mut cx_b, |buffer, cx| buffer.format(cx));
2611        let (request_id, _) = fake_language_server
2612            .receive_request::<lsp::request::Formatting>()
2613            .await;
2614        fake_language_server
2615            .respond(
2616                request_id,
2617                Some(vec![
2618                    lsp::TextEdit {
2619                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2620                        new_text: "h".to_string(),
2621                    },
2622                    lsp::TextEdit {
2623                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2624                        new_text: "y".to_string(),
2625                    },
2626                ]),
2627            )
2628            .await;
2629        format.await.unwrap();
2630        assert_eq!(
2631            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2632            "let honey = two"
2633        );
2634    }
2635
2636    #[gpui::test(iterations = 10)]
2637    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2638        cx_a.foreground().forbid_parking();
2639        let mut lang_registry = Arc::new(LanguageRegistry::new());
2640        let fs = Arc::new(FakeFs::new(cx_a.background()));
2641        fs.insert_tree(
2642            "/root-1",
2643            json!({
2644                ".zed.toml": r#"collaborators = ["user_b"]"#,
2645                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2646            }),
2647        )
2648        .await;
2649        fs.insert_tree(
2650            "/root-2",
2651            json!({
2652                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2653            }),
2654        )
2655        .await;
2656
2657        // Set up a fake language server.
2658        let (language_server_config, mut fake_language_server) =
2659            LanguageServerConfig::fake(&cx_a).await;
2660        Arc::get_mut(&mut lang_registry)
2661            .unwrap()
2662            .add(Arc::new(Language::new(
2663                LanguageConfig {
2664                    name: "Rust".to_string(),
2665                    path_suffixes: vec!["rs".to_string()],
2666                    language_server: Some(language_server_config),
2667                    ..Default::default()
2668                },
2669                Some(tree_sitter_rust::language()),
2670            )));
2671
2672        // Connect to a server as 2 clients.
2673        let mut server = TestServer::start(cx_a.foreground()).await;
2674        let client_a = server.create_client(&mut cx_a, "user_a").await;
2675        let client_b = server.create_client(&mut cx_b, "user_b").await;
2676
2677        // Share a project as client A
2678        let project_a = cx_a.update(|cx| {
2679            Project::local(
2680                client_a.clone(),
2681                client_a.user_store.clone(),
2682                lang_registry.clone(),
2683                fs.clone(),
2684                cx,
2685            )
2686        });
2687        let (worktree_a, _) = project_a
2688            .update(&mut cx_a, |p, cx| {
2689                p.find_or_create_local_worktree("/root-1", false, cx)
2690            })
2691            .await
2692            .unwrap();
2693        worktree_a
2694            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2695            .await;
2696        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2697        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2698        project_a
2699            .update(&mut cx_a, |p, cx| p.share(cx))
2700            .await
2701            .unwrap();
2702
2703        // Join the worktree as client B.
2704        let project_b = Project::remote(
2705            project_id,
2706            client_b.clone(),
2707            client_b.user_store.clone(),
2708            lang_registry.clone(),
2709            fs.clone(),
2710            &mut cx_b.to_async(),
2711        )
2712        .await
2713        .unwrap();
2714
2715        // Open the file to be formatted on client B.
2716        let buffer_b = cx_b
2717            .background()
2718            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2719            .await
2720            .unwrap();
2721
2722        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2723        let (request_id, _) = fake_language_server
2724            .receive_request::<lsp::request::GotoDefinition>()
2725            .await;
2726        fake_language_server
2727            .respond(
2728                request_id,
2729                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2730                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2731                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2732                ))),
2733            )
2734            .await;
2735        let definitions_1 = definitions_1.await.unwrap();
2736        cx_b.read(|cx| {
2737            assert_eq!(definitions_1.len(), 1);
2738            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2739            let target_buffer = definitions_1[0].target_buffer.read(cx);
2740            assert_eq!(
2741                target_buffer.text(),
2742                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2743            );
2744            assert_eq!(
2745                definitions_1[0].target_range.to_point(target_buffer),
2746                Point::new(0, 6)..Point::new(0, 9)
2747            );
2748        });
2749
2750        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2751        // the previous call to `definition`.
2752        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2753        let (request_id, _) = fake_language_server
2754            .receive_request::<lsp::request::GotoDefinition>()
2755            .await;
2756        fake_language_server
2757            .respond(
2758                request_id,
2759                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2760                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2761                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2762                ))),
2763            )
2764            .await;
2765        let definitions_2 = definitions_2.await.unwrap();
2766        cx_b.read(|cx| {
2767            assert_eq!(definitions_2.len(), 1);
2768            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2769            let target_buffer = definitions_2[0].target_buffer.read(cx);
2770            assert_eq!(
2771                target_buffer.text(),
2772                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2773            );
2774            assert_eq!(
2775                definitions_2[0].target_range.to_point(target_buffer),
2776                Point::new(1, 6)..Point::new(1, 11)
2777            );
2778        });
2779        assert_eq!(
2780            definitions_1[0].target_buffer,
2781            definitions_2[0].target_buffer
2782        );
2783
2784        cx_b.update(|_| {
2785            drop(definitions_1);
2786            drop(definitions_2);
2787        });
2788        project_b
2789            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2790            .await;
2791    }
2792
2793    #[gpui::test(iterations = 10)]
2794    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2795        mut cx_a: TestAppContext,
2796        mut cx_b: TestAppContext,
2797        mut rng: StdRng,
2798    ) {
2799        cx_a.foreground().forbid_parking();
2800        let mut lang_registry = Arc::new(LanguageRegistry::new());
2801        let fs = Arc::new(FakeFs::new(cx_a.background()));
2802        fs.insert_tree(
2803            "/root",
2804            json!({
2805                ".zed.toml": r#"collaborators = ["user_b"]"#,
2806                "a.rs": "const ONE: usize = b::TWO;",
2807                "b.rs": "const TWO: usize = 2",
2808            }),
2809        )
2810        .await;
2811
2812        // Set up a fake language server.
2813        let (language_server_config, mut fake_language_server) =
2814            LanguageServerConfig::fake(&cx_a).await;
2815        Arc::get_mut(&mut lang_registry)
2816            .unwrap()
2817            .add(Arc::new(Language::new(
2818                LanguageConfig {
2819                    name: "Rust".to_string(),
2820                    path_suffixes: vec!["rs".to_string()],
2821                    language_server: Some(language_server_config),
2822                    ..Default::default()
2823                },
2824                Some(tree_sitter_rust::language()),
2825            )));
2826
2827        // Connect to a server as 2 clients.
2828        let mut server = TestServer::start(cx_a.foreground()).await;
2829        let client_a = server.create_client(&mut cx_a, "user_a").await;
2830        let client_b = server.create_client(&mut cx_b, "user_b").await;
2831
2832        // Share a project as client A
2833        let project_a = cx_a.update(|cx| {
2834            Project::local(
2835                client_a.clone(),
2836                client_a.user_store.clone(),
2837                lang_registry.clone(),
2838                fs.clone(),
2839                cx,
2840            )
2841        });
2842        let (worktree_a, _) = project_a
2843            .update(&mut cx_a, |p, cx| {
2844                p.find_or_create_local_worktree("/root", false, cx)
2845            })
2846            .await
2847            .unwrap();
2848        worktree_a
2849            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2850            .await;
2851        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2852        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2853        project_a
2854            .update(&mut cx_a, |p, cx| p.share(cx))
2855            .await
2856            .unwrap();
2857
2858        // Join the worktree as client B.
2859        let project_b = Project::remote(
2860            project_id,
2861            client_b.clone(),
2862            client_b.user_store.clone(),
2863            lang_registry.clone(),
2864            fs.clone(),
2865            &mut cx_b.to_async(),
2866        )
2867        .await
2868        .unwrap();
2869
2870        let buffer_b1 = cx_b
2871            .background()
2872            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2873            .await
2874            .unwrap();
2875
2876        let definitions;
2877        let buffer_b2;
2878        if rng.gen() {
2879            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2880            buffer_b2 =
2881                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2882        } else {
2883            buffer_b2 =
2884                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2885            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2886        }
2887
2888        let (request_id, _) = fake_language_server
2889            .receive_request::<lsp::request::GotoDefinition>()
2890            .await;
2891        fake_language_server
2892            .respond(
2893                request_id,
2894                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2895                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2896                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2897                ))),
2898            )
2899            .await;
2900
2901        let buffer_b2 = buffer_b2.await.unwrap();
2902        let definitions = definitions.await.unwrap();
2903        assert_eq!(definitions.len(), 1);
2904        assert_eq!(definitions[0].target_buffer, buffer_b2);
2905    }
2906
2907    #[gpui::test(iterations = 10)]
2908    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2909        cx_a.foreground().forbid_parking();
2910
2911        // Connect to a server as 2 clients.
2912        let mut server = TestServer::start(cx_a.foreground()).await;
2913        let client_a = server.create_client(&mut cx_a, "user_a").await;
2914        let client_b = server.create_client(&mut cx_b, "user_b").await;
2915
2916        // Create an org that includes these 2 users.
2917        let db = &server.app_state.db;
2918        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2919        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2920            .await
2921            .unwrap();
2922        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2923            .await
2924            .unwrap();
2925
2926        // Create a channel that includes all the users.
2927        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2928        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2929            .await
2930            .unwrap();
2931        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2932            .await
2933            .unwrap();
2934        db.create_channel_message(
2935            channel_id,
2936            client_b.current_user_id(&cx_b),
2937            "hello A, it's B.",
2938            OffsetDateTime::now_utc(),
2939            1,
2940        )
2941        .await
2942        .unwrap();
2943
2944        let channels_a = cx_a
2945            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2946        channels_a
2947            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2948            .await;
2949        channels_a.read_with(&cx_a, |list, _| {
2950            assert_eq!(
2951                list.available_channels().unwrap(),
2952                &[ChannelDetails {
2953                    id: channel_id.to_proto(),
2954                    name: "test-channel".to_string()
2955                }]
2956            )
2957        });
2958        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2959            this.get_channel(channel_id.to_proto(), cx).unwrap()
2960        });
2961        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2962        channel_a
2963            .condition(&cx_a, |channel, _| {
2964                channel_messages(channel)
2965                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2966            })
2967            .await;
2968
2969        let channels_b = cx_b
2970            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2971        channels_b
2972            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2973            .await;
2974        channels_b.read_with(&cx_b, |list, _| {
2975            assert_eq!(
2976                list.available_channels().unwrap(),
2977                &[ChannelDetails {
2978                    id: channel_id.to_proto(),
2979                    name: "test-channel".to_string()
2980                }]
2981            )
2982        });
2983
2984        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2985            this.get_channel(channel_id.to_proto(), cx).unwrap()
2986        });
2987        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2988        channel_b
2989            .condition(&cx_b, |channel, _| {
2990                channel_messages(channel)
2991                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2992            })
2993            .await;
2994
2995        channel_a
2996            .update(&mut cx_a, |channel, cx| {
2997                channel
2998                    .send_message("oh, hi B.".to_string(), cx)
2999                    .unwrap()
3000                    .detach();
3001                let task = channel.send_message("sup".to_string(), cx).unwrap();
3002                assert_eq!(
3003                    channel_messages(channel),
3004                    &[
3005                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3006                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3007                        ("user_a".to_string(), "sup".to_string(), true)
3008                    ]
3009                );
3010                task
3011            })
3012            .await
3013            .unwrap();
3014
3015        channel_b
3016            .condition(&cx_b, |channel, _| {
3017                channel_messages(channel)
3018                    == [
3019                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3020                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3021                        ("user_a".to_string(), "sup".to_string(), false),
3022                    ]
3023            })
3024            .await;
3025
3026        assert_eq!(
3027            server
3028                .state()
3029                .await
3030                .channel(channel_id)
3031                .unwrap()
3032                .connection_ids
3033                .len(),
3034            2
3035        );
3036        cx_b.update(|_| drop(channel_b));
3037        server
3038            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3039            .await;
3040
3041        cx_a.update(|_| drop(channel_a));
3042        server
3043            .condition(|state| state.channel(channel_id).is_none())
3044            .await;
3045    }
3046
3047    #[gpui::test(iterations = 10)]
3048    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3049        cx_a.foreground().forbid_parking();
3050
3051        let mut server = TestServer::start(cx_a.foreground()).await;
3052        let client_a = server.create_client(&mut cx_a, "user_a").await;
3053
3054        let db = &server.app_state.db;
3055        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3056        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3057        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3058            .await
3059            .unwrap();
3060        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3061            .await
3062            .unwrap();
3063
3064        let channels_a = cx_a
3065            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3066        channels_a
3067            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3068            .await;
3069        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3070            this.get_channel(channel_id.to_proto(), cx).unwrap()
3071        });
3072
3073        // Messages aren't allowed to be too long.
3074        channel_a
3075            .update(&mut cx_a, |channel, cx| {
3076                let long_body = "this is long.\n".repeat(1024);
3077                channel.send_message(long_body, cx).unwrap()
3078            })
3079            .await
3080            .unwrap_err();
3081
3082        // Messages aren't allowed to be blank.
3083        channel_a.update(&mut cx_a, |channel, cx| {
3084            channel.send_message(String::new(), cx).unwrap_err()
3085        });
3086
3087        // Leading and trailing whitespace are trimmed.
3088        channel_a
3089            .update(&mut cx_a, |channel, cx| {
3090                channel
3091                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3092                    .unwrap()
3093            })
3094            .await
3095            .unwrap();
3096        assert_eq!(
3097            db.get_channel_messages(channel_id, 10, None)
3098                .await
3099                .unwrap()
3100                .iter()
3101                .map(|m| &m.body)
3102                .collect::<Vec<_>>(),
3103            &["surrounded by whitespace"]
3104        );
3105    }
3106
3107    #[gpui::test(iterations = 10)]
3108    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3109        cx_a.foreground().forbid_parking();
3110
3111        // Connect to a server as 2 clients.
3112        let mut server = TestServer::start(cx_a.foreground()).await;
3113        let client_a = server.create_client(&mut cx_a, "user_a").await;
3114        let client_b = server.create_client(&mut cx_b, "user_b").await;
3115        let mut status_b = client_b.status();
3116
3117        // Create an org that includes these 2 users.
3118        let db = &server.app_state.db;
3119        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3120        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3121            .await
3122            .unwrap();
3123        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3124            .await
3125            .unwrap();
3126
3127        // Create a channel that includes all the users.
3128        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3129        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3130            .await
3131            .unwrap();
3132        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3133            .await
3134            .unwrap();
3135        db.create_channel_message(
3136            channel_id,
3137            client_b.current_user_id(&cx_b),
3138            "hello A, it's B.",
3139            OffsetDateTime::now_utc(),
3140            2,
3141        )
3142        .await
3143        .unwrap();
3144
3145        let channels_a = cx_a
3146            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3147        channels_a
3148            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3149            .await;
3150
3151        channels_a.read_with(&cx_a, |list, _| {
3152            assert_eq!(
3153                list.available_channels().unwrap(),
3154                &[ChannelDetails {
3155                    id: channel_id.to_proto(),
3156                    name: "test-channel".to_string()
3157                }]
3158            )
3159        });
3160        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3161            this.get_channel(channel_id.to_proto(), cx).unwrap()
3162        });
3163        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3164        channel_a
3165            .condition(&cx_a, |channel, _| {
3166                channel_messages(channel)
3167                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3168            })
3169            .await;
3170
3171        let channels_b = cx_b
3172            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3173        channels_b
3174            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3175            .await;
3176        channels_b.read_with(&cx_b, |list, _| {
3177            assert_eq!(
3178                list.available_channels().unwrap(),
3179                &[ChannelDetails {
3180                    id: channel_id.to_proto(),
3181                    name: "test-channel".to_string()
3182                }]
3183            )
3184        });
3185
3186        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3187            this.get_channel(channel_id.to_proto(), cx).unwrap()
3188        });
3189        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3190        channel_b
3191            .condition(&cx_b, |channel, _| {
3192                channel_messages(channel)
3193                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3194            })
3195            .await;
3196
3197        // Disconnect client B, ensuring we can still access its cached channel data.
3198        server.forbid_connections();
3199        server.disconnect_client(client_b.current_user_id(&cx_b));
3200        while !matches!(
3201            status_b.next().await,
3202            Some(client::Status::ReconnectionError { .. })
3203        ) {}
3204
3205        channels_b.read_with(&cx_b, |channels, _| {
3206            assert_eq!(
3207                channels.available_channels().unwrap(),
3208                [ChannelDetails {
3209                    id: channel_id.to_proto(),
3210                    name: "test-channel".to_string()
3211                }]
3212            )
3213        });
3214        channel_b.read_with(&cx_b, |channel, _| {
3215            assert_eq!(
3216                channel_messages(channel),
3217                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3218            )
3219        });
3220
3221        // Send a message from client B while it is disconnected.
3222        channel_b
3223            .update(&mut cx_b, |channel, cx| {
3224                let task = channel
3225                    .send_message("can you see this?".to_string(), cx)
3226                    .unwrap();
3227                assert_eq!(
3228                    channel_messages(channel),
3229                    &[
3230                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3231                        ("user_b".to_string(), "can you see this?".to_string(), true)
3232                    ]
3233                );
3234                task
3235            })
3236            .await
3237            .unwrap_err();
3238
3239        // Send a message from client A while B is disconnected.
3240        channel_a
3241            .update(&mut cx_a, |channel, cx| {
3242                channel
3243                    .send_message("oh, hi B.".to_string(), cx)
3244                    .unwrap()
3245                    .detach();
3246                let task = channel.send_message("sup".to_string(), cx).unwrap();
3247                assert_eq!(
3248                    channel_messages(channel),
3249                    &[
3250                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3251                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3252                        ("user_a".to_string(), "sup".to_string(), true)
3253                    ]
3254                );
3255                task
3256            })
3257            .await
3258            .unwrap();
3259
3260        // Give client B a chance to reconnect.
3261        server.allow_connections();
3262        cx_b.foreground().advance_clock(Duration::from_secs(10));
3263
3264        // Verify that B sees the new messages upon reconnection, as well as the message client B
3265        // sent while offline.
3266        channel_b
3267            .condition(&cx_b, |channel, _| {
3268                channel_messages(channel)
3269                    == [
3270                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3271                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3272                        ("user_a".to_string(), "sup".to_string(), false),
3273                        ("user_b".to_string(), "can you see this?".to_string(), false),
3274                    ]
3275            })
3276            .await;
3277
3278        // Ensure client A and B can communicate normally after reconnection.
3279        channel_a
3280            .update(&mut cx_a, |channel, cx| {
3281                channel.send_message("you online?".to_string(), cx).unwrap()
3282            })
3283            .await
3284            .unwrap();
3285        channel_b
3286            .condition(&cx_b, |channel, _| {
3287                channel_messages(channel)
3288                    == [
3289                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3290                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3291                        ("user_a".to_string(), "sup".to_string(), false),
3292                        ("user_b".to_string(), "can you see this?".to_string(), false),
3293                        ("user_a".to_string(), "you online?".to_string(), false),
3294                    ]
3295            })
3296            .await;
3297
3298        channel_b
3299            .update(&mut cx_b, |channel, cx| {
3300                channel.send_message("yep".to_string(), cx).unwrap()
3301            })
3302            .await
3303            .unwrap();
3304        channel_a
3305            .condition(&cx_a, |channel, _| {
3306                channel_messages(channel)
3307                    == [
3308                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3309                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3310                        ("user_a".to_string(), "sup".to_string(), false),
3311                        ("user_b".to_string(), "can you see this?".to_string(), false),
3312                        ("user_a".to_string(), "you online?".to_string(), false),
3313                        ("user_b".to_string(), "yep".to_string(), false),
3314                    ]
3315            })
3316            .await;
3317    }
3318
3319    #[gpui::test(iterations = 10)]
3320    async fn test_contacts(
3321        mut cx_a: TestAppContext,
3322        mut cx_b: TestAppContext,
3323        mut cx_c: TestAppContext,
3324    ) {
3325        cx_a.foreground().forbid_parking();
3326        let lang_registry = Arc::new(LanguageRegistry::new());
3327        let fs = Arc::new(FakeFs::new(cx_a.background()));
3328
3329        // Connect to a server as 3 clients.
3330        let mut server = TestServer::start(cx_a.foreground()).await;
3331        let client_a = server.create_client(&mut cx_a, "user_a").await;
3332        let client_b = server.create_client(&mut cx_b, "user_b").await;
3333        let client_c = server.create_client(&mut cx_c, "user_c").await;
3334
3335        // Share a worktree as client A.
3336        fs.insert_tree(
3337            "/a",
3338            json!({
3339                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3340            }),
3341        )
3342        .await;
3343
3344        let project_a = cx_a.update(|cx| {
3345            Project::local(
3346                client_a.clone(),
3347                client_a.user_store.clone(),
3348                lang_registry.clone(),
3349                fs.clone(),
3350                cx,
3351            )
3352        });
3353        let (worktree_a, _) = project_a
3354            .update(&mut cx_a, |p, cx| {
3355                p.find_or_create_local_worktree("/a", false, cx)
3356            })
3357            .await
3358            .unwrap();
3359        worktree_a
3360            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3361            .await;
3362
3363        client_a
3364            .user_store
3365            .condition(&cx_a, |user_store, _| {
3366                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3367            })
3368            .await;
3369        client_b
3370            .user_store
3371            .condition(&cx_b, |user_store, _| {
3372                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3373            })
3374            .await;
3375        client_c
3376            .user_store
3377            .condition(&cx_c, |user_store, _| {
3378                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3379            })
3380            .await;
3381
3382        let project_id = project_a
3383            .update(&mut cx_a, |project, _| project.next_remote_id())
3384            .await;
3385        project_a
3386            .update(&mut cx_a, |project, cx| project.share(cx))
3387            .await
3388            .unwrap();
3389
3390        let _project_b = Project::remote(
3391            project_id,
3392            client_b.clone(),
3393            client_b.user_store.clone(),
3394            lang_registry.clone(),
3395            fs.clone(),
3396            &mut cx_b.to_async(),
3397        )
3398        .await
3399        .unwrap();
3400
3401        client_a
3402            .user_store
3403            .condition(&cx_a, |user_store, _| {
3404                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3405            })
3406            .await;
3407        client_b
3408            .user_store
3409            .condition(&cx_b, |user_store, _| {
3410                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3411            })
3412            .await;
3413        client_c
3414            .user_store
3415            .condition(&cx_c, |user_store, _| {
3416                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3417            })
3418            .await;
3419
3420        project_a
3421            .condition(&cx_a, |project, _| {
3422                project.collaborators().contains_key(&client_b.peer_id)
3423            })
3424            .await;
3425
3426        cx_a.update(move |_| drop(project_a));
3427        client_a
3428            .user_store
3429            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3430            .await;
3431        client_b
3432            .user_store
3433            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3434            .await;
3435        client_c
3436            .user_store
3437            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3438            .await;
3439
3440        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3441            user_store
3442                .contacts()
3443                .iter()
3444                .map(|contact| {
3445                    let worktrees = contact
3446                        .projects
3447                        .iter()
3448                        .map(|p| {
3449                            (
3450                                p.worktree_root_names[0].as_str(),
3451                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3452                            )
3453                        })
3454                        .collect();
3455                    (contact.user.github_login.as_str(), worktrees)
3456                })
3457                .collect()
3458        }
3459    }
3460
3461    struct TestServer {
3462        peer: Arc<Peer>,
3463        app_state: Arc<AppState>,
3464        server: Arc<Server>,
3465        foreground: Rc<executor::Foreground>,
3466        notifications: mpsc::Receiver<()>,
3467        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3468        forbid_connections: Arc<AtomicBool>,
3469        _test_db: TestDb,
3470    }
3471
3472    impl TestServer {
3473        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3474            let test_db = TestDb::new();
3475            let app_state = Self::build_app_state(&test_db).await;
3476            let peer = Peer::new();
3477            let notifications = mpsc::channel(128);
3478            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3479            Self {
3480                peer,
3481                app_state,
3482                server,
3483                foreground,
3484                notifications: notifications.1,
3485                connection_killers: Default::default(),
3486                forbid_connections: Default::default(),
3487                _test_db: test_db,
3488            }
3489        }
3490
3491        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3492            let http = FakeHttpClient::with_404_response();
3493            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3494            let client_name = name.to_string();
3495            let mut client = Client::new(http.clone());
3496            let server = self.server.clone();
3497            let connection_killers = self.connection_killers.clone();
3498            let forbid_connections = self.forbid_connections.clone();
3499            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3500
3501            Arc::get_mut(&mut client)
3502                .unwrap()
3503                .override_authenticate(move |cx| {
3504                    cx.spawn(|_| async move {
3505                        let access_token = "the-token".to_string();
3506                        Ok(Credentials {
3507                            user_id: user_id.0 as u64,
3508                            access_token,
3509                        })
3510                    })
3511                })
3512                .override_establish_connection(move |credentials, cx| {
3513                    assert_eq!(credentials.user_id, user_id.0 as u64);
3514                    assert_eq!(credentials.access_token, "the-token");
3515
3516                    let server = server.clone();
3517                    let connection_killers = connection_killers.clone();
3518                    let forbid_connections = forbid_connections.clone();
3519                    let client_name = client_name.clone();
3520                    let connection_id_tx = connection_id_tx.clone();
3521                    cx.spawn(move |cx| async move {
3522                        if forbid_connections.load(SeqCst) {
3523                            Err(EstablishConnectionError::other(anyhow!(
3524                                "server is forbidding connections"
3525                            )))
3526                        } else {
3527                            let (client_conn, server_conn, kill_conn) =
3528                                Connection::in_memory(cx.background());
3529                            connection_killers.lock().insert(user_id, kill_conn);
3530                            cx.background()
3531                                .spawn(server.handle_connection(
3532                                    server_conn,
3533                                    client_name,
3534                                    user_id,
3535                                    Some(connection_id_tx),
3536                                ))
3537                                .detach();
3538                            Ok(client_conn)
3539                        }
3540                    })
3541                });
3542
3543            client
3544                .authenticate_and_connect(&cx.to_async())
3545                .await
3546                .unwrap();
3547
3548            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3549            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3550            let mut authed_user =
3551                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3552            while authed_user.next().await.unwrap().is_none() {}
3553
3554            TestClient {
3555                client,
3556                peer_id,
3557                user_store,
3558            }
3559        }
3560
3561        fn disconnect_client(&self, user_id: UserId) {
3562            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3563                let _ = kill_conn.try_send(Some(()));
3564            }
3565        }
3566
3567        fn forbid_connections(&self) {
3568            self.forbid_connections.store(true, SeqCst);
3569        }
3570
3571        fn allow_connections(&self) {
3572            self.forbid_connections.store(false, SeqCst);
3573        }
3574
3575        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3576            let mut config = Config::default();
3577            config.session_secret = "a".repeat(32);
3578            config.database_url = test_db.url.clone();
3579            let github_client = github::AppClient::test();
3580            Arc::new(AppState {
3581                db: test_db.db().clone(),
3582                handlebars: Default::default(),
3583                auth_client: auth::build_client("", ""),
3584                repo_client: github::RepoClient::test(&github_client),
3585                github_client,
3586                config,
3587            })
3588        }
3589
3590        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3591            self.server.store.read()
3592        }
3593
3594        async fn condition<F>(&mut self, mut predicate: F)
3595        where
3596            F: FnMut(&Store) -> bool,
3597        {
3598            async_std::future::timeout(Duration::from_millis(500), async {
3599                while !(predicate)(&*self.server.store.read()) {
3600                    self.foreground.start_waiting();
3601                    self.notifications.next().await;
3602                    self.foreground.finish_waiting();
3603                }
3604            })
3605            .await
3606            .expect("condition timed out");
3607        }
3608    }
3609
3610    impl Drop for TestServer {
3611        fn drop(&mut self) {
3612            self.peer.reset();
3613        }
3614    }
3615
3616    struct TestClient {
3617        client: Arc<Client>,
3618        pub peer_id: PeerId,
3619        pub user_store: ModelHandle<UserStore>,
3620    }
3621
3622    impl Deref for TestClient {
3623        type Target = Arc<Client>;
3624
3625        fn deref(&self) -> &Self::Target {
3626            &self.client
3627        }
3628    }
3629
3630    impl TestClient {
3631        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3632            UserId::from_proto(
3633                self.user_store
3634                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3635            )
3636        }
3637    }
3638
3639    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3640        channel
3641            .messages()
3642            .cursor::<()>()
3643            .map(|m| {
3644                (
3645                    m.sender.github_login.clone(),
3646                    m.body.clone(),
3647                    m.is_pending(),
3648                )
3649            })
3650            .collect()
3651    }
3652
3653    struct EmptyView;
3654
3655    impl gpui::Entity for EmptyView {
3656        type Event = ();
3657    }
3658
3659    impl gpui::View for EmptyView {
3660        fn ui_name() -> &'static str {
3661            "empty view"
3662        }
3663
3664        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3665            gpui::Element::boxed(gpui::elements::Empty)
3666        }
3667    }
3668}