rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffers)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_code_actions)
  89            .add_handler(Server::apply_code_action)
  90            .add_handler(Server::get_channels)
  91            .add_handler(Server::get_users)
  92            .add_handler(Server::join_channel)
  93            .add_handler(Server::leave_channel)
  94            .add_handler(Server::send_channel_message)
  95            .add_handler(Server::get_channel_messages);
  96
  97        Arc::new(server)
  98    }
  99
 100    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 101    where
 102        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 103        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 104        M: EnvelopedMessage,
 105    {
 106        let prev_handler = self.handlers.insert(
 107            TypeId::of::<M>(),
 108            Box::new(move |server, envelope| {
 109                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 110                (handler)(server, *envelope).boxed()
 111            }),
 112        );
 113        if prev_handler.is_some() {
 114            panic!("registered a handler for the same message twice");
 115        }
 116        self
 117    }
 118
 119    pub fn handle_connection(
 120        self: &Arc<Self>,
 121        connection: Connection,
 122        addr: String,
 123        user_id: UserId,
 124        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 125    ) -> impl Future<Output = ()> {
 126        let mut this = self.clone();
 127        async move {
 128            let (connection_id, handle_io, mut incoming_rx) =
 129                this.peer.add_connection(connection).await;
 130
 131            if let Some(send_connection_id) = send_connection_id.as_mut() {
 132                let _ = send_connection_id.send(connection_id).await;
 133            }
 134
 135            this.state_mut().add_connection(connection_id, user_id);
 136            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 137                log::error!("error updating contacts for {:?}: {}", user_id, err);
 138            }
 139
 140            let handle_io = handle_io.fuse();
 141            futures::pin_mut!(handle_io);
 142            loop {
 143                let next_message = incoming_rx.next().fuse();
 144                futures::pin_mut!(next_message);
 145                futures::select_biased! {
 146                    result = handle_io => {
 147                        if let Err(err) = result {
 148                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 149                        }
 150                        break;
 151                    }
 152                    message = next_message => {
 153                        if let Some(message) = message {
 154                            let start_time = Instant::now();
 155                            let type_name = message.payload_type_name();
 156                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 157                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 158                                if let Err(err) = (handler)(this.clone(), message).await {
 159                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 160                                } else {
 161                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 162                                }
 163
 164                                if let Some(mut notifications) = this.notifications.clone() {
 165                                    let _ = notifications.send(()).await;
 166                                }
 167                            } else {
 168                                log::warn!("unhandled message: {}", type_name);
 169                            }
 170                        } else {
 171                            log::info!("rpc connection closed {:?}", addr);
 172                            break;
 173                        }
 174                    }
 175                }
 176            }
 177
 178            if let Err(err) = this.sign_out(connection_id).await {
 179                log::error!("error signing out connection {:?} - {:?}", addr, err);
 180            }
 181        }
 182    }
 183
 184    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 185        self.peer.disconnect(connection_id);
 186        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 187
 188        for (project_id, project) in removed_connection.hosted_projects {
 189            if let Some(share) = project.share {
 190                broadcast(
 191                    connection_id,
 192                    share.guests.keys().copied().collect(),
 193                    |conn_id| {
 194                        self.peer
 195                            .send(conn_id, proto::UnshareProject { project_id })
 196                    },
 197                )?;
 198            }
 199        }
 200
 201        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 202            broadcast(connection_id, peer_ids, |conn_id| {
 203                self.peer.send(
 204                    conn_id,
 205                    proto::RemoveProjectCollaborator {
 206                        project_id,
 207                        peer_id: connection_id.0,
 208                    },
 209                )
 210            })?;
 211        }
 212
 213        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 214        Ok(())
 215    }
 216
 217    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 218        self.peer.respond(request.receipt(), proto::Ack {})?;
 219        Ok(())
 220    }
 221
 222    async fn register_project(
 223        mut self: Arc<Server>,
 224        request: TypedEnvelope<proto::RegisterProject>,
 225    ) -> tide::Result<()> {
 226        let project_id = {
 227            let mut state = self.state_mut();
 228            let user_id = state.user_id_for_connection(request.sender_id)?;
 229            state.register_project(request.sender_id, user_id)
 230        };
 231        self.peer.respond(
 232            request.receipt(),
 233            proto::RegisterProjectResponse { project_id },
 234        )?;
 235        Ok(())
 236    }
 237
 238    async fn unregister_project(
 239        mut self: Arc<Server>,
 240        request: TypedEnvelope<proto::UnregisterProject>,
 241    ) -> tide::Result<()> {
 242        let project = self
 243            .state_mut()
 244            .unregister_project(request.payload.project_id, request.sender_id)
 245            .ok_or_else(|| anyhow!("no such project"))?;
 246        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 247        Ok(())
 248    }
 249
 250    async fn share_project(
 251        mut self: Arc<Server>,
 252        request: TypedEnvelope<proto::ShareProject>,
 253    ) -> tide::Result<()> {
 254        self.state_mut()
 255            .share_project(request.payload.project_id, request.sender_id);
 256        self.peer.respond(request.receipt(), proto::Ack {})?;
 257        Ok(())
 258    }
 259
 260    async fn unshare_project(
 261        mut self: Arc<Server>,
 262        request: TypedEnvelope<proto::UnshareProject>,
 263    ) -> tide::Result<()> {
 264        let project_id = request.payload.project_id;
 265        let project = self
 266            .state_mut()
 267            .unshare_project(project_id, request.sender_id)?;
 268
 269        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 270            self.peer
 271                .send(conn_id, proto::UnshareProject { project_id })
 272        })?;
 273        self.update_contacts_for_users(&project.authorized_user_ids)?;
 274        Ok(())
 275    }
 276
 277    async fn join_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::JoinProject>,
 280    ) -> tide::Result<()> {
 281        let project_id = request.payload.project_id;
 282
 283        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 284        let response_data = self
 285            .state_mut()
 286            .join_project(request.sender_id, user_id, project_id)
 287            .and_then(|joined| {
 288                let share = joined.project.share()?;
 289                let peer_count = share.guests.len();
 290                let mut collaborators = Vec::with_capacity(peer_count);
 291                collaborators.push(proto::Collaborator {
 292                    peer_id: joined.project.host_connection_id.0,
 293                    replica_id: 0,
 294                    user_id: joined.project.host_user_id.to_proto(),
 295                });
 296                let worktrees = joined
 297                    .project
 298                    .worktrees
 299                    .iter()
 300                    .filter_map(|(id, worktree)| {
 301                        worktree.share.as_ref().map(|share| proto::Worktree {
 302                            id: *id,
 303                            root_name: worktree.root_name.clone(),
 304                            entries: share.entries.values().cloned().collect(),
 305                            diagnostic_summaries: share
 306                                .diagnostic_summaries
 307                                .values()
 308                                .cloned()
 309                                .collect(),
 310                            weak: worktree.weak,
 311                        })
 312                    })
 313                    .collect();
 314                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 315                    if *peer_conn_id != request.sender_id {
 316                        collaborators.push(proto::Collaborator {
 317                            peer_id: peer_conn_id.0,
 318                            replica_id: *peer_replica_id as u32,
 319                            user_id: peer_user_id.to_proto(),
 320                        });
 321                    }
 322                }
 323                let response = proto::JoinProjectResponse {
 324                    worktrees,
 325                    replica_id: joined.replica_id as u32,
 326                    collaborators,
 327                };
 328                let connection_ids = joined.project.connection_ids();
 329                let contact_user_ids = joined.project.authorized_user_ids();
 330                Ok((response, connection_ids, contact_user_ids))
 331            });
 332
 333        match response_data {
 334            Ok((response, connection_ids, contact_user_ids)) => {
 335                broadcast(request.sender_id, connection_ids, |conn_id| {
 336                    self.peer.send(
 337                        conn_id,
 338                        proto::AddProjectCollaborator {
 339                            project_id,
 340                            collaborator: Some(proto::Collaborator {
 341                                peer_id: request.sender_id.0,
 342                                replica_id: response.replica_id,
 343                                user_id: user_id.to_proto(),
 344                            }),
 345                        },
 346                    )
 347                })?;
 348                self.peer.respond(request.receipt(), response)?;
 349                self.update_contacts_for_users(&contact_user_ids)?;
 350            }
 351            Err(error) => {
 352                self.peer.respond_with_error(
 353                    request.receipt(),
 354                    proto::Error {
 355                        message: error.to_string(),
 356                    },
 357                )?;
 358            }
 359        }
 360
 361        Ok(())
 362    }
 363
 364    async fn leave_project(
 365        mut self: Arc<Server>,
 366        request: TypedEnvelope<proto::LeaveProject>,
 367    ) -> tide::Result<()> {
 368        let sender_id = request.sender_id;
 369        let project_id = request.payload.project_id;
 370        let worktree = self.state_mut().leave_project(sender_id, project_id);
 371        if let Some(worktree) = worktree {
 372            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 373                self.peer.send(
 374                    conn_id,
 375                    proto::RemoveProjectCollaborator {
 376                        project_id,
 377                        peer_id: sender_id.0,
 378                    },
 379                )
 380            })?;
 381            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 382        }
 383        Ok(())
 384    }
 385
 386    async fn register_worktree(
 387        mut self: Arc<Server>,
 388        request: TypedEnvelope<proto::RegisterWorktree>,
 389    ) -> tide::Result<()> {
 390        let receipt = request.receipt();
 391        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 392
 393        let mut contact_user_ids = HashSet::default();
 394        contact_user_ids.insert(host_user_id);
 395        for github_login in request.payload.authorized_logins {
 396            match self.app_state.db.create_user(&github_login, false).await {
 397                Ok(contact_user_id) => {
 398                    contact_user_ids.insert(contact_user_id);
 399                }
 400                Err(err) => {
 401                    let message = err.to_string();
 402                    self.peer
 403                        .respond_with_error(receipt, proto::Error { message })?;
 404                    return Ok(());
 405                }
 406            }
 407        }
 408
 409        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 410        let ok = self.state_mut().register_worktree(
 411            request.payload.project_id,
 412            request.payload.worktree_id,
 413            Worktree {
 414                authorized_user_ids: contact_user_ids.clone(),
 415                root_name: request.payload.root_name,
 416                share: None,
 417                weak: false,
 418            },
 419        );
 420
 421        if ok {
 422            self.peer.respond(receipt, proto::Ack {})?;
 423            self.update_contacts_for_users(&contact_user_ids)?;
 424        } else {
 425            self.peer.respond_with_error(
 426                receipt,
 427                proto::Error {
 428                    message: NO_SUCH_PROJECT.to_string(),
 429                },
 430            )?;
 431        }
 432
 433        Ok(())
 434    }
 435
 436    async fn unregister_worktree(
 437        mut self: Arc<Server>,
 438        request: TypedEnvelope<proto::UnregisterWorktree>,
 439    ) -> tide::Result<()> {
 440        let project_id = request.payload.project_id;
 441        let worktree_id = request.payload.worktree_id;
 442        let (worktree, guest_connection_ids) =
 443            self.state_mut()
 444                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 445        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 446            self.peer.send(
 447                conn_id,
 448                proto::UnregisterWorktree {
 449                    project_id,
 450                    worktree_id,
 451                },
 452            )
 453        })?;
 454        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 455        Ok(())
 456    }
 457
 458    async fn share_worktree(
 459        mut self: Arc<Server>,
 460        mut request: TypedEnvelope<proto::ShareWorktree>,
 461    ) -> tide::Result<()> {
 462        let worktree = request
 463            .payload
 464            .worktree
 465            .as_mut()
 466            .ok_or_else(|| anyhow!("missing worktree"))?;
 467        let entries = worktree
 468            .entries
 469            .iter()
 470            .map(|entry| (entry.id, entry.clone()))
 471            .collect();
 472        let diagnostic_summaries = worktree
 473            .diagnostic_summaries
 474            .iter()
 475            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 476            .collect();
 477
 478        let shared_worktree = self.state_mut().share_worktree(
 479            request.payload.project_id,
 480            worktree.id,
 481            request.sender_id,
 482            entries,
 483            diagnostic_summaries,
 484        );
 485        if let Some(shared_worktree) = shared_worktree {
 486            broadcast(
 487                request.sender_id,
 488                shared_worktree.connection_ids,
 489                |connection_id| {
 490                    self.peer.forward_send(
 491                        request.sender_id,
 492                        connection_id,
 493                        request.payload.clone(),
 494                    )
 495                },
 496            )?;
 497            self.peer.respond(request.receipt(), proto::Ack {})?;
 498            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 499        } else {
 500            self.peer.respond_with_error(
 501                request.receipt(),
 502                proto::Error {
 503                    message: "no such worktree".to_string(),
 504                },
 505            )?;
 506        }
 507        Ok(())
 508    }
 509
 510    async fn update_worktree(
 511        mut self: Arc<Server>,
 512        request: TypedEnvelope<proto::UpdateWorktree>,
 513    ) -> tide::Result<()> {
 514        let connection_ids = self
 515            .state_mut()
 516            .update_worktree(
 517                request.sender_id,
 518                request.payload.project_id,
 519                request.payload.worktree_id,
 520                &request.payload.removed_entries,
 521                &request.payload.updated_entries,
 522            )
 523            .ok_or_else(|| anyhow!("no such worktree"))?;
 524
 525        broadcast(request.sender_id, connection_ids, |connection_id| {
 526            self.peer
 527                .forward_send(request.sender_id, connection_id, request.payload.clone())
 528        })?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_diagnostic_summary(
 534        mut self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = request
 538            .payload
 539            .summary
 540            .clone()
 541            .and_then(|summary| {
 542                self.state_mut().update_diagnostic_summary(
 543                    request.payload.project_id,
 544                    request.payload.worktree_id,
 545                    request.sender_id,
 546                    summary,
 547                )
 548            })
 549            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 550
 551        broadcast(request.sender_id, receiver_ids, |connection_id| {
 552            self.peer
 553                .forward_send(request.sender_id, connection_id, request.payload.clone())
 554        })?;
 555        Ok(())
 556    }
 557
 558    async fn disk_based_diagnostics_updating(
 559        self: Arc<Server>,
 560        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 561    ) -> tide::Result<()> {
 562        let receiver_ids = self
 563            .state()
 564            .project_connection_ids(request.payload.project_id, request.sender_id)
 565            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn disk_based_diagnostics_updated(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 576    ) -> tide::Result<()> {
 577        let receiver_ids = self
 578            .state()
 579            .project_connection_ids(request.payload.project_id, request.sender_id)
 580            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 581        broadcast(request.sender_id, receiver_ids, |connection_id| {
 582            self.peer
 583                .forward_send(request.sender_id, connection_id, request.payload.clone())
 584        })?;
 585        Ok(())
 586    }
 587
 588    async fn get_definition(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::GetDefinition>,
 591    ) -> tide::Result<()> {
 592        let receipt = request.receipt();
 593        let host_connection_id = self
 594            .state()
 595            .read_project(request.payload.project_id, request.sender_id)
 596            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 597            .host_connection_id;
 598        let response = self
 599            .peer
 600            .forward_request(request.sender_id, host_connection_id, request.payload)
 601            .await?;
 602        self.peer.respond(receipt, response)?;
 603        Ok(())
 604    }
 605
 606    async fn open_buffer(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::OpenBuffer>,
 609    ) -> tide::Result<()> {
 610        let receipt = request.receipt();
 611        let host_connection_id = self
 612            .state()
 613            .read_project(request.payload.project_id, request.sender_id)
 614            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 615            .host_connection_id;
 616        let response = self
 617            .peer
 618            .forward_request(request.sender_id, host_connection_id, request.payload)
 619            .await?;
 620        self.peer.respond(receipt, response)?;
 621        Ok(())
 622    }
 623
 624    async fn close_buffer(
 625        self: Arc<Server>,
 626        request: TypedEnvelope<proto::CloseBuffer>,
 627    ) -> tide::Result<()> {
 628        let host_connection_id = self
 629            .state()
 630            .read_project(request.payload.project_id, request.sender_id)
 631            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 632            .host_connection_id;
 633        self.peer
 634            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 635        Ok(())
 636    }
 637
 638    async fn save_buffer(
 639        self: Arc<Server>,
 640        request: TypedEnvelope<proto::SaveBuffer>,
 641    ) -> tide::Result<()> {
 642        let host;
 643        let guests;
 644        {
 645            let state = self.state();
 646            let project = state
 647                .read_project(request.payload.project_id, request.sender_id)
 648                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 649            host = project.host_connection_id;
 650            guests = project.guest_connection_ids()
 651        }
 652
 653        let sender = request.sender_id;
 654        let receipt = request.receipt();
 655        let response = self
 656            .peer
 657            .forward_request(sender, host, request.payload.clone())
 658            .await?;
 659
 660        broadcast(host, guests, |conn_id| {
 661            let response = response.clone();
 662            if conn_id == sender {
 663                self.peer.respond(receipt, response)
 664            } else {
 665                self.peer.forward_send(host, conn_id, response)
 666            }
 667        })?;
 668
 669        Ok(())
 670    }
 671
 672    async fn format_buffers(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::FormatBuffers>,
 675    ) -> tide::Result<()> {
 676        let host;
 677        {
 678            let state = self.state();
 679            let project = state
 680                .read_project(request.payload.project_id, request.sender_id)
 681                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 682            host = project.host_connection_id;
 683        }
 684
 685        let sender = request.sender_id;
 686        let receipt = request.receipt();
 687        let response = self
 688            .peer
 689            .forward_request(sender, host, request.payload.clone())
 690            .await?;
 691        self.peer.respond(receipt, response)?;
 692
 693        Ok(())
 694    }
 695
 696    async fn get_completions(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::GetCompletions>,
 699    ) -> tide::Result<()> {
 700        let host;
 701        {
 702            let state = self.state();
 703            let project = state
 704                .read_project(request.payload.project_id, request.sender_id)
 705                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 706            host = project.host_connection_id;
 707        }
 708
 709        let sender = request.sender_id;
 710        let receipt = request.receipt();
 711        let response = self
 712            .peer
 713            .forward_request(sender, host, request.payload.clone())
 714            .await?;
 715        self.peer.respond(receipt, response)?;
 716        Ok(())
 717    }
 718
 719    async fn apply_additional_edits_for_completion(
 720        self: Arc<Server>,
 721        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 722    ) -> tide::Result<()> {
 723        let host;
 724        {
 725            let state = self.state();
 726            let project = state
 727                .read_project(request.payload.project_id, request.sender_id)
 728                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 729            host = project.host_connection_id;
 730        }
 731
 732        let sender = request.sender_id;
 733        let receipt = request.receipt();
 734        let response = self
 735            .peer
 736            .forward_request(sender, host, request.payload.clone())
 737            .await?;
 738        self.peer.respond(receipt, response)?;
 739        Ok(())
 740    }
 741
 742    async fn get_code_actions(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::GetCodeActions>,
 745    ) -> tide::Result<()> {
 746        let host;
 747        {
 748            let state = self.state();
 749            let project = state
 750                .read_project(request.payload.project_id, request.sender_id)
 751                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 752            host = project.host_connection_id;
 753        }
 754
 755        let sender = request.sender_id;
 756        let receipt = request.receipt();
 757        let response = self
 758            .peer
 759            .forward_request(sender, host, request.payload.clone())
 760            .await?;
 761        self.peer.respond(receipt, response)?;
 762        Ok(())
 763    }
 764
 765    async fn apply_code_action(
 766        self: Arc<Server>,
 767        request: TypedEnvelope<proto::ApplyCodeAction>,
 768    ) -> tide::Result<()> {
 769        let host;
 770        {
 771            let state = self.state();
 772            let project = state
 773                .read_project(request.payload.project_id, request.sender_id)
 774                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 775            host = project.host_connection_id;
 776        }
 777
 778        let sender = request.sender_id;
 779        let receipt = request.receipt();
 780        let response = self
 781            .peer
 782            .forward_request(sender, host, request.payload.clone())
 783            .await?;
 784        self.peer.respond(receipt, response)?;
 785        Ok(())
 786    }
 787
 788    async fn update_buffer(
 789        self: Arc<Server>,
 790        request: TypedEnvelope<proto::UpdateBuffer>,
 791    ) -> tide::Result<()> {
 792        let receiver_ids = self
 793            .state()
 794            .project_connection_ids(request.payload.project_id, request.sender_id)
 795            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 796        broadcast(request.sender_id, receiver_ids, |connection_id| {
 797            self.peer
 798                .forward_send(request.sender_id, connection_id, request.payload.clone())
 799        })?;
 800        self.peer.respond(request.receipt(), proto::Ack {})?;
 801        Ok(())
 802    }
 803
 804    async fn update_buffer_file(
 805        self: Arc<Server>,
 806        request: TypedEnvelope<proto::UpdateBufferFile>,
 807    ) -> tide::Result<()> {
 808        let receiver_ids = self
 809            .state()
 810            .project_connection_ids(request.payload.project_id, request.sender_id)
 811            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 812        broadcast(request.sender_id, receiver_ids, |connection_id| {
 813            self.peer
 814                .forward_send(request.sender_id, connection_id, request.payload.clone())
 815        })?;
 816        Ok(())
 817    }
 818
 819    async fn buffer_reloaded(
 820        self: Arc<Server>,
 821        request: TypedEnvelope<proto::BufferReloaded>,
 822    ) -> tide::Result<()> {
 823        let receiver_ids = self
 824            .state()
 825            .project_connection_ids(request.payload.project_id, request.sender_id)
 826            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 827        broadcast(request.sender_id, receiver_ids, |connection_id| {
 828            self.peer
 829                .forward_send(request.sender_id, connection_id, request.payload.clone())
 830        })?;
 831        Ok(())
 832    }
 833
 834    async fn buffer_saved(
 835        self: Arc<Server>,
 836        request: TypedEnvelope<proto::BufferSaved>,
 837    ) -> tide::Result<()> {
 838        let receiver_ids = self
 839            .state()
 840            .project_connection_ids(request.payload.project_id, request.sender_id)
 841            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 842        broadcast(request.sender_id, receiver_ids, |connection_id| {
 843            self.peer
 844                .forward_send(request.sender_id, connection_id, request.payload.clone())
 845        })?;
 846        Ok(())
 847    }
 848
 849    async fn get_channels(
 850        self: Arc<Server>,
 851        request: TypedEnvelope<proto::GetChannels>,
 852    ) -> tide::Result<()> {
 853        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 854        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 855        self.peer.respond(
 856            request.receipt(),
 857            proto::GetChannelsResponse {
 858                channels: channels
 859                    .into_iter()
 860                    .map(|chan| proto::Channel {
 861                        id: chan.id.to_proto(),
 862                        name: chan.name,
 863                    })
 864                    .collect(),
 865            },
 866        )?;
 867        Ok(())
 868    }
 869
 870    async fn get_users(
 871        self: Arc<Server>,
 872        request: TypedEnvelope<proto::GetUsers>,
 873    ) -> tide::Result<()> {
 874        let receipt = request.receipt();
 875        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 876        let users = self
 877            .app_state
 878            .db
 879            .get_users_by_ids(user_ids)
 880            .await?
 881            .into_iter()
 882            .map(|user| proto::User {
 883                id: user.id.to_proto(),
 884                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 885                github_login: user.github_login,
 886            })
 887            .collect();
 888        self.peer
 889            .respond(receipt, proto::GetUsersResponse { users })?;
 890        Ok(())
 891    }
 892
 893    fn update_contacts_for_users<'a>(
 894        self: &Arc<Server>,
 895        user_ids: impl IntoIterator<Item = &'a UserId>,
 896    ) -> anyhow::Result<()> {
 897        let mut result = Ok(());
 898        let state = self.state();
 899        for user_id in user_ids {
 900            let contacts = state.contacts_for_user(*user_id);
 901            for connection_id in state.connection_ids_for_user(*user_id) {
 902                if let Err(error) = self.peer.send(
 903                    connection_id,
 904                    proto::UpdateContacts {
 905                        contacts: contacts.clone(),
 906                    },
 907                ) {
 908                    result = Err(error);
 909                }
 910            }
 911        }
 912        result
 913    }
 914
 915    async fn join_channel(
 916        mut self: Arc<Self>,
 917        request: TypedEnvelope<proto::JoinChannel>,
 918    ) -> tide::Result<()> {
 919        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 920        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 921        if !self
 922            .app_state
 923            .db
 924            .can_user_access_channel(user_id, channel_id)
 925            .await?
 926        {
 927            Err(anyhow!("access denied"))?;
 928        }
 929
 930        self.state_mut().join_channel(request.sender_id, channel_id);
 931        let messages = self
 932            .app_state
 933            .db
 934            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 935            .await?
 936            .into_iter()
 937            .map(|msg| proto::ChannelMessage {
 938                id: msg.id.to_proto(),
 939                body: msg.body,
 940                timestamp: msg.sent_at.unix_timestamp() as u64,
 941                sender_id: msg.sender_id.to_proto(),
 942                nonce: Some(msg.nonce.as_u128().into()),
 943            })
 944            .collect::<Vec<_>>();
 945        self.peer.respond(
 946            request.receipt(),
 947            proto::JoinChannelResponse {
 948                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 949                messages,
 950            },
 951        )?;
 952        Ok(())
 953    }
 954
 955    async fn leave_channel(
 956        mut self: Arc<Self>,
 957        request: TypedEnvelope<proto::LeaveChannel>,
 958    ) -> tide::Result<()> {
 959        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 960        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 961        if !self
 962            .app_state
 963            .db
 964            .can_user_access_channel(user_id, channel_id)
 965            .await?
 966        {
 967            Err(anyhow!("access denied"))?;
 968        }
 969
 970        self.state_mut()
 971            .leave_channel(request.sender_id, channel_id);
 972
 973        Ok(())
 974    }
 975
 976    async fn send_channel_message(
 977        self: Arc<Self>,
 978        request: TypedEnvelope<proto::SendChannelMessage>,
 979    ) -> tide::Result<()> {
 980        let receipt = request.receipt();
 981        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 982        let user_id;
 983        let connection_ids;
 984        {
 985            let state = self.state();
 986            user_id = state.user_id_for_connection(request.sender_id)?;
 987            if let Some(ids) = state.channel_connection_ids(channel_id) {
 988                connection_ids = ids;
 989            } else {
 990                return Ok(());
 991            }
 992        }
 993
 994        // Validate the message body.
 995        let body = request.payload.body.trim().to_string();
 996        if body.len() > MAX_MESSAGE_LEN {
 997            self.peer.respond_with_error(
 998                receipt,
 999                proto::Error {
1000                    message: "message is too long".to_string(),
1001                },
1002            )?;
1003            return Ok(());
1004        }
1005        if body.is_empty() {
1006            self.peer.respond_with_error(
1007                receipt,
1008                proto::Error {
1009                    message: "message can't be blank".to_string(),
1010                },
1011            )?;
1012            return Ok(());
1013        }
1014
1015        let timestamp = OffsetDateTime::now_utc();
1016        let nonce = if let Some(nonce) = request.payload.nonce {
1017            nonce
1018        } else {
1019            self.peer.respond_with_error(
1020                receipt,
1021                proto::Error {
1022                    message: "nonce can't be blank".to_string(),
1023                },
1024            )?;
1025            return Ok(());
1026        };
1027
1028        let message_id = self
1029            .app_state
1030            .db
1031            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1032            .await?
1033            .to_proto();
1034        let message = proto::ChannelMessage {
1035            sender_id: user_id.to_proto(),
1036            id: message_id,
1037            body,
1038            timestamp: timestamp.unix_timestamp() as u64,
1039            nonce: Some(nonce),
1040        };
1041        broadcast(request.sender_id, connection_ids, |conn_id| {
1042            self.peer.send(
1043                conn_id,
1044                proto::ChannelMessageSent {
1045                    channel_id: channel_id.to_proto(),
1046                    message: Some(message.clone()),
1047                },
1048            )
1049        })?;
1050        self.peer.respond(
1051            receipt,
1052            proto::SendChannelMessageResponse {
1053                message: Some(message),
1054            },
1055        )?;
1056        Ok(())
1057    }
1058
1059    async fn get_channel_messages(
1060        self: Arc<Self>,
1061        request: TypedEnvelope<proto::GetChannelMessages>,
1062    ) -> tide::Result<()> {
1063        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1064        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1065        if !self
1066            .app_state
1067            .db
1068            .can_user_access_channel(user_id, channel_id)
1069            .await?
1070        {
1071            Err(anyhow!("access denied"))?;
1072        }
1073
1074        let messages = self
1075            .app_state
1076            .db
1077            .get_channel_messages(
1078                channel_id,
1079                MESSAGE_COUNT_PER_PAGE,
1080                Some(MessageId::from_proto(request.payload.before_message_id)),
1081            )
1082            .await?
1083            .into_iter()
1084            .map(|msg| proto::ChannelMessage {
1085                id: msg.id.to_proto(),
1086                body: msg.body,
1087                timestamp: msg.sent_at.unix_timestamp() as u64,
1088                sender_id: msg.sender_id.to_proto(),
1089                nonce: Some(msg.nonce.as_u128().into()),
1090            })
1091            .collect::<Vec<_>>();
1092        self.peer.respond(
1093            request.receipt(),
1094            proto::GetChannelMessagesResponse {
1095                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1096                messages,
1097            },
1098        )?;
1099        Ok(())
1100    }
1101
1102    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1103        self.store.read()
1104    }
1105
1106    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1107        self.store.write()
1108    }
1109}
1110
1111fn broadcast<F>(
1112    sender_id: ConnectionId,
1113    receiver_ids: Vec<ConnectionId>,
1114    mut f: F,
1115) -> anyhow::Result<()>
1116where
1117    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1118{
1119    let mut result = Ok(());
1120    for receiver_id in receiver_ids {
1121        if receiver_id != sender_id {
1122            if let Err(error) = f(receiver_id) {
1123                if result.is_ok() {
1124                    result = Err(error);
1125                }
1126            }
1127        }
1128    }
1129    result
1130}
1131
1132pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1133    let server = Server::new(app.state().clone(), rpc.clone(), None);
1134    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1135        let server = server.clone();
1136        async move {
1137            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1138
1139            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1140            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1141            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1142            let client_protocol_version: Option<u32> = request
1143                .header("X-Zed-Protocol-Version")
1144                .and_then(|v| v.as_str().parse().ok());
1145
1146            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1147                return Ok(Response::new(StatusCode::UpgradeRequired));
1148            }
1149
1150            let header = match request.header("Sec-Websocket-Key") {
1151                Some(h) => h.as_str(),
1152                None => return Err(anyhow!("expected sec-websocket-key"))?,
1153            };
1154
1155            let user_id = process_auth_header(&request).await?;
1156
1157            let mut response = Response::new(StatusCode::SwitchingProtocols);
1158            response.insert_header(UPGRADE, "websocket");
1159            response.insert_header(CONNECTION, "Upgrade");
1160            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1161            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1162            response.insert_header("Sec-Websocket-Version", "13");
1163
1164            let http_res: &mut tide::http::Response = response.as_mut();
1165            let upgrade_receiver = http_res.recv_upgrade().await;
1166            let addr = request.remote().unwrap_or("unknown").to_string();
1167            task::spawn(async move {
1168                if let Some(stream) = upgrade_receiver.await {
1169                    server
1170                        .handle_connection(
1171                            Connection::new(
1172                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1173                            ),
1174                            addr,
1175                            user_id,
1176                            None,
1177                        )
1178                        .await;
1179                }
1180            });
1181
1182            Ok(response)
1183        }
1184    });
1185}
1186
1187fn header_contains_ignore_case<T>(
1188    request: &tide::Request<T>,
1189    header_name: HeaderName,
1190    value: &str,
1191) -> bool {
1192    request
1193        .header(header_name)
1194        .map(|h| {
1195            h.as_str()
1196                .split(',')
1197                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1198        })
1199        .unwrap_or(false)
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204    use super::*;
1205    use crate::{
1206        auth,
1207        db::{tests::TestDb, UserId},
1208        github, AppState, Config,
1209    };
1210    use ::rpc::Peer;
1211    use async_std::task;
1212    use gpui::{executor, ModelHandle, TestAppContext};
1213    use parking_lot::Mutex;
1214    use postage::{mpsc, watch};
1215    use rand::prelude::*;
1216    use rpc::PeerId;
1217    use serde_json::json;
1218    use sqlx::types::time::OffsetDateTime;
1219    use std::{
1220        ops::Deref,
1221        path::Path,
1222        rc::Rc,
1223        sync::{
1224            atomic::{AtomicBool, Ordering::SeqCst},
1225            Arc,
1226        },
1227        time::Duration,
1228    };
1229    use zed::{
1230        client::{
1231            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1232            EstablishConnectionError, UserStore,
1233        },
1234        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1235        fs::{FakeFs, Fs as _},
1236        language::{
1237            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1238            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1239        },
1240        lsp,
1241        project::{DiagnosticSummary, Project, ProjectPath},
1242    };
1243
1244    #[cfg(test)]
1245    #[ctor::ctor]
1246    fn init_logger() {
1247        if std::env::var("RUST_LOG").is_ok() {
1248            env_logger::init();
1249        }
1250    }
1251
1252    #[gpui::test(iterations = 10)]
1253    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1254        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1255        let lang_registry = Arc::new(LanguageRegistry::new());
1256        let fs = Arc::new(FakeFs::new(cx_a.background()));
1257        cx_a.foreground().forbid_parking();
1258
1259        // Connect to a server as 2 clients.
1260        let mut server = TestServer::start(cx_a.foreground()).await;
1261        let client_a = server.create_client(&mut cx_a, "user_a").await;
1262        let client_b = server.create_client(&mut cx_b, "user_b").await;
1263
1264        // Share a project as client A
1265        fs.insert_tree(
1266            "/a",
1267            json!({
1268                ".zed.toml": r#"collaborators = ["user_b"]"#,
1269                "a.txt": "a-contents",
1270                "b.txt": "b-contents",
1271            }),
1272        )
1273        .await;
1274        let project_a = cx_a.update(|cx| {
1275            Project::local(
1276                client_a.clone(),
1277                client_a.user_store.clone(),
1278                lang_registry.clone(),
1279                fs.clone(),
1280                cx,
1281            )
1282        });
1283        let (worktree_a, _) = project_a
1284            .update(&mut cx_a, |p, cx| {
1285                p.find_or_create_local_worktree("/a", false, cx)
1286            })
1287            .await
1288            .unwrap();
1289        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1290        worktree_a
1291            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1292            .await;
1293        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1294        project_a
1295            .update(&mut cx_a, |p, cx| p.share(cx))
1296            .await
1297            .unwrap();
1298
1299        // Join that project as client B
1300        let project_b = Project::remote(
1301            project_id,
1302            client_b.clone(),
1303            client_b.user_store.clone(),
1304            lang_registry.clone(),
1305            fs.clone(),
1306            &mut cx_b.to_async(),
1307        )
1308        .await
1309        .unwrap();
1310
1311        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1312            assert_eq!(
1313                project
1314                    .collaborators()
1315                    .get(&client_a.peer_id)
1316                    .unwrap()
1317                    .user
1318                    .github_login,
1319                "user_a"
1320            );
1321            project.replica_id()
1322        });
1323        project_a
1324            .condition(&cx_a, |tree, _| {
1325                tree.collaborators()
1326                    .get(&client_b.peer_id)
1327                    .map_or(false, |collaborator| {
1328                        collaborator.replica_id == replica_id_b
1329                            && collaborator.user.github_login == "user_b"
1330                    })
1331            })
1332            .await;
1333
1334        // Open the same file as client B and client A.
1335        let buffer_b = project_b
1336            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1337            .await
1338            .unwrap();
1339        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1340        buffer_b.read_with(&cx_b, |buf, cx| {
1341            assert_eq!(buf.read(cx).text(), "b-contents")
1342        });
1343        project_a.read_with(&cx_a, |project, cx| {
1344            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1345        });
1346        let buffer_a = project_a
1347            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1348            .await
1349            .unwrap();
1350
1351        let editor_b = cx_b.add_view(window_b, |cx| {
1352            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1353        });
1354
1355        // TODO
1356        // // Create a selection set as client B and see that selection set as client A.
1357        // buffer_a
1358        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1359        //     .await;
1360
1361        // Edit the buffer as client B and see that edit as client A.
1362        editor_b.update(&mut cx_b, |editor, cx| {
1363            editor.handle_input(&Input("ok, ".into()), cx)
1364        });
1365        buffer_a
1366            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1367            .await;
1368
1369        // TODO
1370        // // Remove the selection set as client B, see those selections disappear as client A.
1371        cx_b.update(move |_| drop(editor_b));
1372        // buffer_a
1373        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1374        //     .await;
1375
1376        // Close the buffer as client A, see that the buffer is closed.
1377        cx_a.update(move |_| drop(buffer_a));
1378        project_a
1379            .condition(&cx_a, |project, cx| {
1380                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1381            })
1382            .await;
1383
1384        // Dropping the client B's project removes client B from client A's collaborators.
1385        cx_b.update(move |_| drop(project_b));
1386        project_a
1387            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1388            .await;
1389    }
1390
1391    #[gpui::test(iterations = 10)]
1392    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1393        let lang_registry = Arc::new(LanguageRegistry::new());
1394        let fs = Arc::new(FakeFs::new(cx_a.background()));
1395        cx_a.foreground().forbid_parking();
1396
1397        // Connect to a server as 2 clients.
1398        let mut server = TestServer::start(cx_a.foreground()).await;
1399        let client_a = server.create_client(&mut cx_a, "user_a").await;
1400        let client_b = server.create_client(&mut cx_b, "user_b").await;
1401
1402        // Share a project as client A
1403        fs.insert_tree(
1404            "/a",
1405            json!({
1406                ".zed.toml": r#"collaborators = ["user_b"]"#,
1407                "a.txt": "a-contents",
1408                "b.txt": "b-contents",
1409            }),
1410        )
1411        .await;
1412        let project_a = cx_a.update(|cx| {
1413            Project::local(
1414                client_a.clone(),
1415                client_a.user_store.clone(),
1416                lang_registry.clone(),
1417                fs.clone(),
1418                cx,
1419            )
1420        });
1421        let (worktree_a, _) = project_a
1422            .update(&mut cx_a, |p, cx| {
1423                p.find_or_create_local_worktree("/a", false, cx)
1424            })
1425            .await
1426            .unwrap();
1427        worktree_a
1428            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1429            .await;
1430        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1431        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1432        project_a
1433            .update(&mut cx_a, |p, cx| p.share(cx))
1434            .await
1435            .unwrap();
1436        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1437
1438        // Join that project as client B
1439        let project_b = Project::remote(
1440            project_id,
1441            client_b.clone(),
1442            client_b.user_store.clone(),
1443            lang_registry.clone(),
1444            fs.clone(),
1445            &mut cx_b.to_async(),
1446        )
1447        .await
1448        .unwrap();
1449        project_b
1450            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1451            .await
1452            .unwrap();
1453
1454        // Unshare the project as client A
1455        project_a
1456            .update(&mut cx_a, |project, cx| project.unshare(cx))
1457            .await
1458            .unwrap();
1459        project_b
1460            .condition(&mut cx_b, |project, _| project.is_read_only())
1461            .await;
1462        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1463        drop(project_b);
1464
1465        // Share the project again and ensure guests can still join.
1466        project_a
1467            .update(&mut cx_a, |project, cx| project.share(cx))
1468            .await
1469            .unwrap();
1470        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1471
1472        let project_c = Project::remote(
1473            project_id,
1474            client_b.clone(),
1475            client_b.user_store.clone(),
1476            lang_registry.clone(),
1477            fs.clone(),
1478            &mut cx_b.to_async(),
1479        )
1480        .await
1481        .unwrap();
1482        project_c
1483            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1484            .await
1485            .unwrap();
1486    }
1487
1488    #[gpui::test(iterations = 10)]
1489    async fn test_propagate_saves_and_fs_changes(
1490        mut cx_a: TestAppContext,
1491        mut cx_b: TestAppContext,
1492        mut cx_c: TestAppContext,
1493    ) {
1494        let lang_registry = Arc::new(LanguageRegistry::new());
1495        let fs = Arc::new(FakeFs::new(cx_a.background()));
1496        cx_a.foreground().forbid_parking();
1497
1498        // Connect to a server as 3 clients.
1499        let mut server = TestServer::start(cx_a.foreground()).await;
1500        let client_a = server.create_client(&mut cx_a, "user_a").await;
1501        let client_b = server.create_client(&mut cx_b, "user_b").await;
1502        let client_c = server.create_client(&mut cx_c, "user_c").await;
1503
1504        // Share a worktree as client A.
1505        fs.insert_tree(
1506            "/a",
1507            json!({
1508                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1509                "file1": "",
1510                "file2": ""
1511            }),
1512        )
1513        .await;
1514        let project_a = cx_a.update(|cx| {
1515            Project::local(
1516                client_a.clone(),
1517                client_a.user_store.clone(),
1518                lang_registry.clone(),
1519                fs.clone(),
1520                cx,
1521            )
1522        });
1523        let (worktree_a, _) = project_a
1524            .update(&mut cx_a, |p, cx| {
1525                p.find_or_create_local_worktree("/a", false, cx)
1526            })
1527            .await
1528            .unwrap();
1529        worktree_a
1530            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1531            .await;
1532        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1533        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1534        project_a
1535            .update(&mut cx_a, |p, cx| p.share(cx))
1536            .await
1537            .unwrap();
1538
1539        // Join that worktree as clients B and C.
1540        let project_b = Project::remote(
1541            project_id,
1542            client_b.clone(),
1543            client_b.user_store.clone(),
1544            lang_registry.clone(),
1545            fs.clone(),
1546            &mut cx_b.to_async(),
1547        )
1548        .await
1549        .unwrap();
1550        let project_c = Project::remote(
1551            project_id,
1552            client_c.clone(),
1553            client_c.user_store.clone(),
1554            lang_registry.clone(),
1555            fs.clone(),
1556            &mut cx_c.to_async(),
1557        )
1558        .await
1559        .unwrap();
1560        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1561        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1562
1563        // Open and edit a buffer as both guests B and C.
1564        let buffer_b = project_b
1565            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1566            .await
1567            .unwrap();
1568        let buffer_c = project_c
1569            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1570            .await
1571            .unwrap();
1572        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1573        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1574
1575        // Open and edit that buffer as the host.
1576        let buffer_a = project_a
1577            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1578            .await
1579            .unwrap();
1580
1581        buffer_a
1582            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1583            .await;
1584        buffer_a.update(&mut cx_a, |buf, cx| {
1585            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1586        });
1587
1588        // Wait for edits to propagate
1589        buffer_a
1590            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1591            .await;
1592        buffer_b
1593            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1594            .await;
1595        buffer_c
1596            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1597            .await;
1598
1599        // Edit the buffer as the host and concurrently save as guest B.
1600        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1601        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1602        save_b.await.unwrap();
1603        assert_eq!(
1604            fs.load("/a/file1".as_ref()).await.unwrap(),
1605            "hi-a, i-am-c, i-am-b, i-am-a"
1606        );
1607        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1608        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1609        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1610
1611        // Make changes on host's file system, see those changes on guest worktrees.
1612        fs.rename(
1613            "/a/file1".as_ref(),
1614            "/a/file1-renamed".as_ref(),
1615            Default::default(),
1616        )
1617        .await
1618        .unwrap();
1619        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1620            .await
1621            .unwrap();
1622        fs.insert_file(Path::new("/a/file4"), "4".into())
1623            .await
1624            .unwrap();
1625
1626        worktree_a
1627            .condition(&cx_a, |tree, _| tree.file_count() == 4)
1628            .await;
1629        worktree_b
1630            .condition(&cx_b, |tree, _| tree.file_count() == 4)
1631            .await;
1632        worktree_c
1633            .condition(&cx_c, |tree, _| tree.file_count() == 4)
1634            .await;
1635        worktree_a.read_with(&cx_a, |tree, _| {
1636            assert_eq!(
1637                tree.paths()
1638                    .map(|p| p.to_string_lossy())
1639                    .collect::<Vec<_>>(),
1640                &[".zed.toml", "file1-renamed", "file3", "file4"]
1641            )
1642        });
1643        worktree_b.read_with(&cx_b, |tree, _| {
1644            assert_eq!(
1645                tree.paths()
1646                    .map(|p| p.to_string_lossy())
1647                    .collect::<Vec<_>>(),
1648                &[".zed.toml", "file1-renamed", "file3", "file4"]
1649            )
1650        });
1651        worktree_c.read_with(&cx_c, |tree, _| {
1652            assert_eq!(
1653                tree.paths()
1654                    .map(|p| p.to_string_lossy())
1655                    .collect::<Vec<_>>(),
1656                &[".zed.toml", "file1-renamed", "file3", "file4"]
1657            )
1658        });
1659
1660        // Ensure buffer files are updated as well.
1661        buffer_a
1662            .condition(&cx_a, |buf, _| {
1663                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1664            })
1665            .await;
1666        buffer_b
1667            .condition(&cx_b, |buf, _| {
1668                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1669            })
1670            .await;
1671        buffer_c
1672            .condition(&cx_c, |buf, _| {
1673                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1674            })
1675            .await;
1676    }
1677
1678    #[gpui::test(iterations = 10)]
1679    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1680        cx_a.foreground().forbid_parking();
1681        let lang_registry = Arc::new(LanguageRegistry::new());
1682        let fs = Arc::new(FakeFs::new(cx_a.background()));
1683
1684        // Connect to a server as 2 clients.
1685        let mut server = TestServer::start(cx_a.foreground()).await;
1686        let client_a = server.create_client(&mut cx_a, "user_a").await;
1687        let client_b = server.create_client(&mut cx_b, "user_b").await;
1688
1689        // Share a project as client A
1690        fs.insert_tree(
1691            "/dir",
1692            json!({
1693                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1694                "a.txt": "a-contents",
1695            }),
1696        )
1697        .await;
1698
1699        let project_a = cx_a.update(|cx| {
1700            Project::local(
1701                client_a.clone(),
1702                client_a.user_store.clone(),
1703                lang_registry.clone(),
1704                fs.clone(),
1705                cx,
1706            )
1707        });
1708        let (worktree_a, _) = project_a
1709            .update(&mut cx_a, |p, cx| {
1710                p.find_or_create_local_worktree("/dir", false, cx)
1711            })
1712            .await
1713            .unwrap();
1714        worktree_a
1715            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1716            .await;
1717        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1718        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1719        project_a
1720            .update(&mut cx_a, |p, cx| p.share(cx))
1721            .await
1722            .unwrap();
1723
1724        // Join that project as client B
1725        let project_b = Project::remote(
1726            project_id,
1727            client_b.clone(),
1728            client_b.user_store.clone(),
1729            lang_registry.clone(),
1730            fs.clone(),
1731            &mut cx_b.to_async(),
1732        )
1733        .await
1734        .unwrap();
1735        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1736
1737        // Open a buffer as client B
1738        let buffer_b = project_b
1739            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1740            .await
1741            .unwrap();
1742        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1743
1744        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1745        buffer_b.read_with(&cx_b, |buf, _| {
1746            assert!(buf.is_dirty());
1747            assert!(!buf.has_conflict());
1748        });
1749
1750        buffer_b
1751            .update(&mut cx_b, |buf, cx| buf.save(cx))
1752            .await
1753            .unwrap();
1754        worktree_b
1755            .condition(&cx_b, |_, cx| {
1756                buffer_b.read(cx).file().unwrap().mtime() != mtime
1757            })
1758            .await;
1759        buffer_b.read_with(&cx_b, |buf, _| {
1760            assert!(!buf.is_dirty());
1761            assert!(!buf.has_conflict());
1762        });
1763
1764        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1765        buffer_b.read_with(&cx_b, |buf, _| {
1766            assert!(buf.is_dirty());
1767            assert!(!buf.has_conflict());
1768        });
1769    }
1770
1771    #[gpui::test(iterations = 10)]
1772    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1773        cx_a.foreground().forbid_parking();
1774        let lang_registry = Arc::new(LanguageRegistry::new());
1775        let fs = Arc::new(FakeFs::new(cx_a.background()));
1776
1777        // Connect to a server as 2 clients.
1778        let mut server = TestServer::start(cx_a.foreground()).await;
1779        let client_a = server.create_client(&mut cx_a, "user_a").await;
1780        let client_b = server.create_client(&mut cx_b, "user_b").await;
1781
1782        // Share a project as client A
1783        fs.insert_tree(
1784            "/dir",
1785            json!({
1786                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1787                "a.txt": "a-contents",
1788            }),
1789        )
1790        .await;
1791
1792        let project_a = cx_a.update(|cx| {
1793            Project::local(
1794                client_a.clone(),
1795                client_a.user_store.clone(),
1796                lang_registry.clone(),
1797                fs.clone(),
1798                cx,
1799            )
1800        });
1801        let (worktree_a, _) = project_a
1802            .update(&mut cx_a, |p, cx| {
1803                p.find_or_create_local_worktree("/dir", false, cx)
1804            })
1805            .await
1806            .unwrap();
1807        worktree_a
1808            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1809            .await;
1810        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1811        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1812        project_a
1813            .update(&mut cx_a, |p, cx| p.share(cx))
1814            .await
1815            .unwrap();
1816
1817        // Join that project as client B
1818        let project_b = Project::remote(
1819            project_id,
1820            client_b.clone(),
1821            client_b.user_store.clone(),
1822            lang_registry.clone(),
1823            fs.clone(),
1824            &mut cx_b.to_async(),
1825        )
1826        .await
1827        .unwrap();
1828        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1829
1830        // Open a buffer as client B
1831        let buffer_b = project_b
1832            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1833            .await
1834            .unwrap();
1835        buffer_b.read_with(&cx_b, |buf, _| {
1836            assert!(!buf.is_dirty());
1837            assert!(!buf.has_conflict());
1838        });
1839
1840        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1841            .await
1842            .unwrap();
1843        buffer_b
1844            .condition(&cx_b, |buf, _| {
1845                buf.text() == "new contents" && !buf.is_dirty()
1846            })
1847            .await;
1848        buffer_b.read_with(&cx_b, |buf, _| {
1849            assert!(!buf.has_conflict());
1850        });
1851    }
1852
1853    #[gpui::test(iterations = 100)]
1854    async fn test_editing_while_guest_opens_buffer(
1855        mut cx_a: TestAppContext,
1856        mut cx_b: TestAppContext,
1857    ) {
1858        cx_a.foreground().forbid_parking();
1859        let lang_registry = Arc::new(LanguageRegistry::new());
1860        let fs = Arc::new(FakeFs::new(cx_a.background()));
1861
1862        // Connect to a server as 2 clients.
1863        let mut server = TestServer::start(cx_a.foreground()).await;
1864        let client_a = server.create_client(&mut cx_a, "user_a").await;
1865        let client_b = server.create_client(&mut cx_b, "user_b").await;
1866
1867        // Share a project as client A
1868        fs.insert_tree(
1869            "/dir",
1870            json!({
1871                ".zed.toml": r#"collaborators = ["user_b"]"#,
1872                "a.txt": "a-contents",
1873            }),
1874        )
1875        .await;
1876        let project_a = cx_a.update(|cx| {
1877            Project::local(
1878                client_a.clone(),
1879                client_a.user_store.clone(),
1880                lang_registry.clone(),
1881                fs.clone(),
1882                cx,
1883            )
1884        });
1885        let (worktree_a, _) = project_a
1886            .update(&mut cx_a, |p, cx| {
1887                p.find_or_create_local_worktree("/dir", false, cx)
1888            })
1889            .await
1890            .unwrap();
1891        worktree_a
1892            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1893            .await;
1894        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1895        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1896        project_a
1897            .update(&mut cx_a, |p, cx| p.share(cx))
1898            .await
1899            .unwrap();
1900
1901        // Join that project as client B
1902        let project_b = Project::remote(
1903            project_id,
1904            client_b.clone(),
1905            client_b.user_store.clone(),
1906            lang_registry.clone(),
1907            fs.clone(),
1908            &mut cx_b.to_async(),
1909        )
1910        .await
1911        .unwrap();
1912
1913        // Open a buffer as client A
1914        let buffer_a = project_a
1915            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1916            .await
1917            .unwrap();
1918
1919        // Start opening the same buffer as client B
1920        let buffer_b = cx_b
1921            .background()
1922            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1923        task::yield_now().await;
1924
1925        // Edit the buffer as client A while client B is still opening it.
1926        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1927
1928        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1929        let buffer_b = buffer_b.await.unwrap();
1930        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1931    }
1932
1933    #[gpui::test(iterations = 10)]
1934    async fn test_leaving_worktree_while_opening_buffer(
1935        mut cx_a: TestAppContext,
1936        mut cx_b: TestAppContext,
1937    ) {
1938        cx_a.foreground().forbid_parking();
1939        let lang_registry = Arc::new(LanguageRegistry::new());
1940        let fs = Arc::new(FakeFs::new(cx_a.background()));
1941
1942        // Connect to a server as 2 clients.
1943        let mut server = TestServer::start(cx_a.foreground()).await;
1944        let client_a = server.create_client(&mut cx_a, "user_a").await;
1945        let client_b = server.create_client(&mut cx_b, "user_b").await;
1946
1947        // Share a project as client A
1948        fs.insert_tree(
1949            "/dir",
1950            json!({
1951                ".zed.toml": r#"collaborators = ["user_b"]"#,
1952                "a.txt": "a-contents",
1953            }),
1954        )
1955        .await;
1956        let project_a = cx_a.update(|cx| {
1957            Project::local(
1958                client_a.clone(),
1959                client_a.user_store.clone(),
1960                lang_registry.clone(),
1961                fs.clone(),
1962                cx,
1963            )
1964        });
1965        let (worktree_a, _) = project_a
1966            .update(&mut cx_a, |p, cx| {
1967                p.find_or_create_local_worktree("/dir", false, cx)
1968            })
1969            .await
1970            .unwrap();
1971        worktree_a
1972            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1973            .await;
1974        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1975        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1976        project_a
1977            .update(&mut cx_a, |p, cx| p.share(cx))
1978            .await
1979            .unwrap();
1980
1981        // Join that project as client B
1982        let project_b = Project::remote(
1983            project_id,
1984            client_b.clone(),
1985            client_b.user_store.clone(),
1986            lang_registry.clone(),
1987            fs.clone(),
1988            &mut cx_b.to_async(),
1989        )
1990        .await
1991        .unwrap();
1992
1993        // See that a guest has joined as client A.
1994        project_a
1995            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1996            .await;
1997
1998        // Begin opening a buffer as client B, but leave the project before the open completes.
1999        let buffer_b = cx_b
2000            .background()
2001            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
2002        cx_b.update(|_| drop(project_b));
2003        drop(buffer_b);
2004
2005        // See that the guest has left.
2006        project_a
2007            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2008            .await;
2009    }
2010
2011    #[gpui::test(iterations = 10)]
2012    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2013        cx_a.foreground().forbid_parking();
2014        let lang_registry = Arc::new(LanguageRegistry::new());
2015        let fs = Arc::new(FakeFs::new(cx_a.background()));
2016
2017        // Connect to a server as 2 clients.
2018        let mut server = TestServer::start(cx_a.foreground()).await;
2019        let client_a = server.create_client(&mut cx_a, "user_a").await;
2020        let client_b = server.create_client(&mut cx_b, "user_b").await;
2021
2022        // Share a project as client A
2023        fs.insert_tree(
2024            "/a",
2025            json!({
2026                ".zed.toml": r#"collaborators = ["user_b"]"#,
2027                "a.txt": "a-contents",
2028                "b.txt": "b-contents",
2029            }),
2030        )
2031        .await;
2032        let project_a = cx_a.update(|cx| {
2033            Project::local(
2034                client_a.clone(),
2035                client_a.user_store.clone(),
2036                lang_registry.clone(),
2037                fs.clone(),
2038                cx,
2039            )
2040        });
2041        let (worktree_a, _) = project_a
2042            .update(&mut cx_a, |p, cx| {
2043                p.find_or_create_local_worktree("/a", false, cx)
2044            })
2045            .await
2046            .unwrap();
2047        worktree_a
2048            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2049            .await;
2050        let project_id = project_a
2051            .update(&mut cx_a, |project, _| project.next_remote_id())
2052            .await;
2053        project_a
2054            .update(&mut cx_a, |project, cx| project.share(cx))
2055            .await
2056            .unwrap();
2057
2058        // Join that project as client B
2059        let _project_b = Project::remote(
2060            project_id,
2061            client_b.clone(),
2062            client_b.user_store.clone(),
2063            lang_registry.clone(),
2064            fs.clone(),
2065            &mut cx_b.to_async(),
2066        )
2067        .await
2068        .unwrap();
2069
2070        // See that a guest has joined as client A.
2071        project_a
2072            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2073            .await;
2074
2075        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2076        client_b.disconnect(&cx_b.to_async()).unwrap();
2077        project_a
2078            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2079            .await;
2080    }
2081
2082    #[gpui::test(iterations = 10)]
2083    async fn test_collaborating_with_diagnostics(
2084        mut cx_a: TestAppContext,
2085        mut cx_b: TestAppContext,
2086    ) {
2087        cx_a.foreground().forbid_parking();
2088        let mut lang_registry = Arc::new(LanguageRegistry::new());
2089        let fs = Arc::new(FakeFs::new(cx_a.background()));
2090
2091        // Set up a fake language server.
2092        let (language_server_config, mut fake_language_server) =
2093            LanguageServerConfig::fake(&cx_a).await;
2094        Arc::get_mut(&mut lang_registry)
2095            .unwrap()
2096            .add(Arc::new(Language::new(
2097                LanguageConfig {
2098                    name: "Rust".to_string(),
2099                    path_suffixes: vec!["rs".to_string()],
2100                    language_server: Some(language_server_config),
2101                    ..Default::default()
2102                },
2103                Some(tree_sitter_rust::language()),
2104            )));
2105
2106        // Connect to a server as 2 clients.
2107        let mut server = TestServer::start(cx_a.foreground()).await;
2108        let client_a = server.create_client(&mut cx_a, "user_a").await;
2109        let client_b = server.create_client(&mut cx_b, "user_b").await;
2110
2111        // Share a project as client A
2112        fs.insert_tree(
2113            "/a",
2114            json!({
2115                ".zed.toml": r#"collaborators = ["user_b"]"#,
2116                "a.rs": "let one = two",
2117                "other.rs": "",
2118            }),
2119        )
2120        .await;
2121        let project_a = cx_a.update(|cx| {
2122            Project::local(
2123                client_a.clone(),
2124                client_a.user_store.clone(),
2125                lang_registry.clone(),
2126                fs.clone(),
2127                cx,
2128            )
2129        });
2130        let (worktree_a, _) = project_a
2131            .update(&mut cx_a, |p, cx| {
2132                p.find_or_create_local_worktree("/a", false, cx)
2133            })
2134            .await
2135            .unwrap();
2136        worktree_a
2137            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2138            .await;
2139        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2140        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2141        project_a
2142            .update(&mut cx_a, |p, cx| p.share(cx))
2143            .await
2144            .unwrap();
2145
2146        // Cause the language server to start.
2147        let _ = cx_a
2148            .background()
2149            .spawn(project_a.update(&mut cx_a, |project, cx| {
2150                project.open_buffer(
2151                    ProjectPath {
2152                        worktree_id,
2153                        path: Path::new("other.rs").into(),
2154                    },
2155                    cx,
2156                )
2157            }))
2158            .await
2159            .unwrap();
2160
2161        // Simulate a language server reporting errors for a file.
2162        fake_language_server
2163            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2164                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2165                version: None,
2166                diagnostics: vec![lsp::Diagnostic {
2167                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2168                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2169                    message: "message 1".to_string(),
2170                    ..Default::default()
2171                }],
2172            })
2173            .await;
2174
2175        // Wait for server to see the diagnostics update.
2176        server
2177            .condition(|store| {
2178                let worktree = store
2179                    .project(project_id)
2180                    .unwrap()
2181                    .worktrees
2182                    .get(&worktree_id.to_proto())
2183                    .unwrap();
2184
2185                !worktree
2186                    .share
2187                    .as_ref()
2188                    .unwrap()
2189                    .diagnostic_summaries
2190                    .is_empty()
2191            })
2192            .await;
2193
2194        // Join the worktree as client B.
2195        let project_b = Project::remote(
2196            project_id,
2197            client_b.clone(),
2198            client_b.user_store.clone(),
2199            lang_registry.clone(),
2200            fs.clone(),
2201            &mut cx_b.to_async(),
2202        )
2203        .await
2204        .unwrap();
2205
2206        project_b.read_with(&cx_b, |project, cx| {
2207            assert_eq!(
2208                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2209                &[(
2210                    ProjectPath {
2211                        worktree_id,
2212                        path: Arc::from(Path::new("a.rs")),
2213                    },
2214                    DiagnosticSummary {
2215                        error_count: 1,
2216                        warning_count: 0,
2217                        ..Default::default()
2218                    },
2219                )]
2220            )
2221        });
2222
2223        // Simulate a language server reporting more errors for a file.
2224        fake_language_server
2225            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2226                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2227                version: None,
2228                diagnostics: vec![
2229                    lsp::Diagnostic {
2230                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2231                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2232                        message: "message 1".to_string(),
2233                        ..Default::default()
2234                    },
2235                    lsp::Diagnostic {
2236                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2237                        range: lsp::Range::new(
2238                            lsp::Position::new(0, 10),
2239                            lsp::Position::new(0, 13),
2240                        ),
2241                        message: "message 2".to_string(),
2242                        ..Default::default()
2243                    },
2244                ],
2245            })
2246            .await;
2247
2248        // Client b gets the updated summaries
2249        project_b
2250            .condition(&cx_b, |project, cx| {
2251                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2252                    == &[(
2253                        ProjectPath {
2254                            worktree_id,
2255                            path: Arc::from(Path::new("a.rs")),
2256                        },
2257                        DiagnosticSummary {
2258                            error_count: 1,
2259                            warning_count: 1,
2260                            ..Default::default()
2261                        },
2262                    )]
2263            })
2264            .await;
2265
2266        // Open the file with the errors on client B. They should be present.
2267        let buffer_b = cx_b
2268            .background()
2269            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2270            .await
2271            .unwrap();
2272
2273        buffer_b.read_with(&cx_b, |buffer, _| {
2274            assert_eq!(
2275                buffer
2276                    .snapshot()
2277                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2278                    .map(|entry| entry)
2279                    .collect::<Vec<_>>(),
2280                &[
2281                    DiagnosticEntry {
2282                        range: Point::new(0, 4)..Point::new(0, 7),
2283                        diagnostic: Diagnostic {
2284                            group_id: 0,
2285                            message: "message 1".to_string(),
2286                            severity: lsp::DiagnosticSeverity::ERROR,
2287                            is_primary: true,
2288                            ..Default::default()
2289                        }
2290                    },
2291                    DiagnosticEntry {
2292                        range: Point::new(0, 10)..Point::new(0, 13),
2293                        diagnostic: Diagnostic {
2294                            group_id: 1,
2295                            severity: lsp::DiagnosticSeverity::WARNING,
2296                            message: "message 2".to_string(),
2297                            is_primary: true,
2298                            ..Default::default()
2299                        }
2300                    }
2301                ]
2302            );
2303        });
2304    }
2305
2306    #[gpui::test(iterations = 10)]
2307    async fn test_collaborating_with_completion(
2308        mut cx_a: TestAppContext,
2309        mut cx_b: TestAppContext,
2310    ) {
2311        cx_a.foreground().forbid_parking();
2312        let mut lang_registry = Arc::new(LanguageRegistry::new());
2313        let fs = Arc::new(FakeFs::new(cx_a.background()));
2314
2315        // Set up a fake language server.
2316        let (language_server_config, mut fake_language_server) =
2317            LanguageServerConfig::fake_with_capabilities(
2318                lsp::ServerCapabilities {
2319                    completion_provider: Some(lsp::CompletionOptions {
2320                        trigger_characters: Some(vec![".".to_string()]),
2321                        ..Default::default()
2322                    }),
2323                    ..Default::default()
2324                },
2325                &cx_a,
2326            )
2327            .await;
2328        Arc::get_mut(&mut lang_registry)
2329            .unwrap()
2330            .add(Arc::new(Language::new(
2331                LanguageConfig {
2332                    name: "Rust".to_string(),
2333                    path_suffixes: vec!["rs".to_string()],
2334                    language_server: Some(language_server_config),
2335                    ..Default::default()
2336                },
2337                Some(tree_sitter_rust::language()),
2338            )));
2339
2340        // Connect to a server as 2 clients.
2341        let mut server = TestServer::start(cx_a.foreground()).await;
2342        let client_a = server.create_client(&mut cx_a, "user_a").await;
2343        let client_b = server.create_client(&mut cx_b, "user_b").await;
2344
2345        // Share a project as client A
2346        fs.insert_tree(
2347            "/a",
2348            json!({
2349                ".zed.toml": r#"collaborators = ["user_b"]"#,
2350                "main.rs": "fn main() { a }",
2351                "other.rs": "",
2352            }),
2353        )
2354        .await;
2355        let project_a = cx_a.update(|cx| {
2356            Project::local(
2357                client_a.clone(),
2358                client_a.user_store.clone(),
2359                lang_registry.clone(),
2360                fs.clone(),
2361                cx,
2362            )
2363        });
2364        let (worktree_a, _) = project_a
2365            .update(&mut cx_a, |p, cx| {
2366                p.find_or_create_local_worktree("/a", false, cx)
2367            })
2368            .await
2369            .unwrap();
2370        worktree_a
2371            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2372            .await;
2373        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2374        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2375        project_a
2376            .update(&mut cx_a, |p, cx| p.share(cx))
2377            .await
2378            .unwrap();
2379
2380        // Join the worktree as client B.
2381        let project_b = Project::remote(
2382            project_id,
2383            client_b.clone(),
2384            client_b.user_store.clone(),
2385            lang_registry.clone(),
2386            fs.clone(),
2387            &mut cx_b.to_async(),
2388        )
2389        .await
2390        .unwrap();
2391
2392        // Open a file in an editor as the guest.
2393        let buffer_b = project_b
2394            .update(&mut cx_b, |p, cx| {
2395                p.open_buffer((worktree_id, "main.rs"), cx)
2396            })
2397            .await
2398            .unwrap();
2399        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2400        let editor_b = cx_b.add_view(window_b, |cx| {
2401            Editor::for_buffer(
2402                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2403                Arc::new(|cx| EditorSettings::test(cx)),
2404                Some(project_b.clone()),
2405                cx,
2406            )
2407        });
2408
2409        // Type a completion trigger character as the guest.
2410        editor_b.update(&mut cx_b, |editor, cx| {
2411            editor.select_ranges([13..13], None, cx);
2412            editor.handle_input(&Input(".".into()), cx);
2413            cx.focus(&editor_b);
2414        });
2415
2416        // Receive a completion request as the host's language server.
2417        let (request_id, params) = fake_language_server
2418            .receive_request::<lsp::request::Completion>()
2419            .await;
2420        assert_eq!(
2421            params.text_document_position.text_document.uri,
2422            lsp::Url::from_file_path("/a/main.rs").unwrap(),
2423        );
2424        assert_eq!(
2425            params.text_document_position.position,
2426            lsp::Position::new(0, 14),
2427        );
2428
2429        // Return some completions from the host's language server.
2430        fake_language_server
2431            .respond(
2432                request_id,
2433                Some(lsp::CompletionResponse::Array(vec![
2434                    lsp::CompletionItem {
2435                        label: "first_method(…)".into(),
2436                        detail: Some("fn(&mut self, B) -> C".into()),
2437                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2438                            new_text: "first_method($1)".to_string(),
2439                            range: lsp::Range::new(
2440                                lsp::Position::new(0, 14),
2441                                lsp::Position::new(0, 14),
2442                            ),
2443                        })),
2444                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2445                        ..Default::default()
2446                    },
2447                    lsp::CompletionItem {
2448                        label: "second_method(…)".into(),
2449                        detail: Some("fn(&mut self, C) -> D<E>".into()),
2450                        text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2451                            new_text: "second_method()".to_string(),
2452                            range: lsp::Range::new(
2453                                lsp::Position::new(0, 14),
2454                                lsp::Position::new(0, 14),
2455                            ),
2456                        })),
2457                        insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2458                        ..Default::default()
2459                    },
2460                ])),
2461            )
2462            .await;
2463
2464        // Open the buffer on the host.
2465        let buffer_a = project_a
2466            .update(&mut cx_a, |p, cx| {
2467                p.open_buffer((worktree_id, "main.rs"), cx)
2468            })
2469            .await
2470            .unwrap();
2471        buffer_a
2472            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2473            .await;
2474
2475        // Confirm a completion on the guest.
2476        editor_b.next_notification(&cx_b).await;
2477        editor_b.update(&mut cx_b, |editor, cx| {
2478            assert!(editor.context_menu_visible());
2479            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2480            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2481        });
2482
2483        buffer_a
2484            .condition(&cx_a, |buffer, _| {
2485                buffer.text() == "fn main() { a.first_method() }"
2486            })
2487            .await;
2488
2489        // Receive a request resolve the selected completion on the host's language server.
2490        let (request_id, params) = fake_language_server
2491            .receive_request::<lsp::request::ResolveCompletionItem>()
2492            .await;
2493        assert_eq!(params.label, "first_method(…)");
2494
2495        // Return a resolved completion from the host's language server.
2496        // The resolved completion has an additional text edit.
2497        fake_language_server
2498            .respond(
2499                request_id,
2500                lsp::CompletionItem {
2501                    label: "first_method(…)".into(),
2502                    detail: Some("fn(&mut self, B) -> C".into()),
2503                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2504                        new_text: "first_method($1)".to_string(),
2505                        range: lsp::Range::new(
2506                            lsp::Position::new(0, 14),
2507                            lsp::Position::new(0, 14),
2508                        ),
2509                    })),
2510                    additional_text_edits: Some(vec![lsp::TextEdit {
2511                        new_text: "use d::SomeTrait;\n".to_string(),
2512                        range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2513                    }]),
2514                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2515                    ..Default::default()
2516                },
2517            )
2518            .await;
2519
2520        // The additional edit is applied.
2521        buffer_b
2522            .condition(&cx_b, |buffer, _| {
2523                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2524            })
2525            .await;
2526        assert_eq!(
2527            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2528            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2529        );
2530    }
2531
2532    #[gpui::test(iterations = 10)]
2533    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2534        cx_a.foreground().forbid_parking();
2535        let mut lang_registry = Arc::new(LanguageRegistry::new());
2536        let fs = Arc::new(FakeFs::new(cx_a.background()));
2537
2538        // Set up a fake language server.
2539        let (language_server_config, mut fake_language_server) =
2540            LanguageServerConfig::fake(&cx_a).await;
2541        Arc::get_mut(&mut lang_registry)
2542            .unwrap()
2543            .add(Arc::new(Language::new(
2544                LanguageConfig {
2545                    name: "Rust".to_string(),
2546                    path_suffixes: vec!["rs".to_string()],
2547                    language_server: Some(language_server_config),
2548                    ..Default::default()
2549                },
2550                Some(tree_sitter_rust::language()),
2551            )));
2552
2553        // Connect to a server as 2 clients.
2554        let mut server = TestServer::start(cx_a.foreground()).await;
2555        let client_a = server.create_client(&mut cx_a, "user_a").await;
2556        let client_b = server.create_client(&mut cx_b, "user_b").await;
2557
2558        // Share a project as client A
2559        fs.insert_tree(
2560            "/a",
2561            json!({
2562                ".zed.toml": r#"collaborators = ["user_b"]"#,
2563                "a.rs": "let one = two",
2564            }),
2565        )
2566        .await;
2567        let project_a = cx_a.update(|cx| {
2568            Project::local(
2569                client_a.clone(),
2570                client_a.user_store.clone(),
2571                lang_registry.clone(),
2572                fs.clone(),
2573                cx,
2574            )
2575        });
2576        let (worktree_a, _) = project_a
2577            .update(&mut cx_a, |p, cx| {
2578                p.find_or_create_local_worktree("/a", false, cx)
2579            })
2580            .await
2581            .unwrap();
2582        worktree_a
2583            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2584            .await;
2585        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2586        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2587        project_a
2588            .update(&mut cx_a, |p, cx| p.share(cx))
2589            .await
2590            .unwrap();
2591
2592        // Join the worktree as client B.
2593        let project_b = Project::remote(
2594            project_id,
2595            client_b.clone(),
2596            client_b.user_store.clone(),
2597            lang_registry.clone(),
2598            fs.clone(),
2599            &mut cx_b.to_async(),
2600        )
2601        .await
2602        .unwrap();
2603
2604        let buffer_b = cx_b
2605            .background()
2606            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2607            .await
2608            .unwrap();
2609
2610        let format = project_b.update(&mut cx_b, |project, cx| {
2611            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2612        });
2613        let (request_id, _) = fake_language_server
2614            .receive_request::<lsp::request::Formatting>()
2615            .await;
2616        fake_language_server
2617            .respond(
2618                request_id,
2619                Some(vec![
2620                    lsp::TextEdit {
2621                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2622                        new_text: "h".to_string(),
2623                    },
2624                    lsp::TextEdit {
2625                        range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2626                        new_text: "y".to_string(),
2627                    },
2628                ]),
2629            )
2630            .await;
2631        format.await.unwrap();
2632        assert_eq!(
2633            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2634            "let honey = two"
2635        );
2636    }
2637
2638    #[gpui::test(iterations = 10)]
2639    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2640        cx_a.foreground().forbid_parking();
2641        let mut lang_registry = Arc::new(LanguageRegistry::new());
2642        let fs = Arc::new(FakeFs::new(cx_a.background()));
2643        fs.insert_tree(
2644            "/root-1",
2645            json!({
2646                ".zed.toml": r#"collaborators = ["user_b"]"#,
2647                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2648            }),
2649        )
2650        .await;
2651        fs.insert_tree(
2652            "/root-2",
2653            json!({
2654                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2655            }),
2656        )
2657        .await;
2658
2659        // Set up a fake language server.
2660        let (language_server_config, mut fake_language_server) =
2661            LanguageServerConfig::fake(&cx_a).await;
2662        Arc::get_mut(&mut lang_registry)
2663            .unwrap()
2664            .add(Arc::new(Language::new(
2665                LanguageConfig {
2666                    name: "Rust".to_string(),
2667                    path_suffixes: vec!["rs".to_string()],
2668                    language_server: Some(language_server_config),
2669                    ..Default::default()
2670                },
2671                Some(tree_sitter_rust::language()),
2672            )));
2673
2674        // Connect to a server as 2 clients.
2675        let mut server = TestServer::start(cx_a.foreground()).await;
2676        let client_a = server.create_client(&mut cx_a, "user_a").await;
2677        let client_b = server.create_client(&mut cx_b, "user_b").await;
2678
2679        // Share a project as client A
2680        let project_a = cx_a.update(|cx| {
2681            Project::local(
2682                client_a.clone(),
2683                client_a.user_store.clone(),
2684                lang_registry.clone(),
2685                fs.clone(),
2686                cx,
2687            )
2688        });
2689        let (worktree_a, _) = project_a
2690            .update(&mut cx_a, |p, cx| {
2691                p.find_or_create_local_worktree("/root-1", false, cx)
2692            })
2693            .await
2694            .unwrap();
2695        worktree_a
2696            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2697            .await;
2698        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2699        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2700        project_a
2701            .update(&mut cx_a, |p, cx| p.share(cx))
2702            .await
2703            .unwrap();
2704
2705        // Join the worktree as client B.
2706        let project_b = Project::remote(
2707            project_id,
2708            client_b.clone(),
2709            client_b.user_store.clone(),
2710            lang_registry.clone(),
2711            fs.clone(),
2712            &mut cx_b.to_async(),
2713        )
2714        .await
2715        .unwrap();
2716
2717        // Open the file to be formatted on client B.
2718        let buffer_b = cx_b
2719            .background()
2720            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2721            .await
2722            .unwrap();
2723
2724        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2725        let (request_id, _) = fake_language_server
2726            .receive_request::<lsp::request::GotoDefinition>()
2727            .await;
2728        fake_language_server
2729            .respond(
2730                request_id,
2731                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2732                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2733                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2734                ))),
2735            )
2736            .await;
2737        let definitions_1 = definitions_1.await.unwrap();
2738        cx_b.read(|cx| {
2739            assert_eq!(definitions_1.len(), 1);
2740            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2741            let target_buffer = definitions_1[0].target_buffer.read(cx);
2742            assert_eq!(
2743                target_buffer.text(),
2744                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2745            );
2746            assert_eq!(
2747                definitions_1[0].target_range.to_point(target_buffer),
2748                Point::new(0, 6)..Point::new(0, 9)
2749            );
2750        });
2751
2752        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2753        // the previous call to `definition`.
2754        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2755        let (request_id, _) = fake_language_server
2756            .receive_request::<lsp::request::GotoDefinition>()
2757            .await;
2758        fake_language_server
2759            .respond(
2760                request_id,
2761                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2762                    lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2763                    lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2764                ))),
2765            )
2766            .await;
2767        let definitions_2 = definitions_2.await.unwrap();
2768        cx_b.read(|cx| {
2769            assert_eq!(definitions_2.len(), 1);
2770            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2771            let target_buffer = definitions_2[0].target_buffer.read(cx);
2772            assert_eq!(
2773                target_buffer.text(),
2774                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2775            );
2776            assert_eq!(
2777                definitions_2[0].target_range.to_point(target_buffer),
2778                Point::new(1, 6)..Point::new(1, 11)
2779            );
2780        });
2781        assert_eq!(
2782            definitions_1[0].target_buffer,
2783            definitions_2[0].target_buffer
2784        );
2785
2786        cx_b.update(|_| {
2787            drop(definitions_1);
2788            drop(definitions_2);
2789        });
2790        project_b
2791            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2792            .await;
2793    }
2794
2795    #[gpui::test(iterations = 10)]
2796    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2797        mut cx_a: TestAppContext,
2798        mut cx_b: TestAppContext,
2799        mut rng: StdRng,
2800    ) {
2801        cx_a.foreground().forbid_parking();
2802        let mut lang_registry = Arc::new(LanguageRegistry::new());
2803        let fs = Arc::new(FakeFs::new(cx_a.background()));
2804        fs.insert_tree(
2805            "/root",
2806            json!({
2807                ".zed.toml": r#"collaborators = ["user_b"]"#,
2808                "a.rs": "const ONE: usize = b::TWO;",
2809                "b.rs": "const TWO: usize = 2",
2810            }),
2811        )
2812        .await;
2813
2814        // Set up a fake language server.
2815        let (language_server_config, mut fake_language_server) =
2816            LanguageServerConfig::fake(&cx_a).await;
2817        Arc::get_mut(&mut lang_registry)
2818            .unwrap()
2819            .add(Arc::new(Language::new(
2820                LanguageConfig {
2821                    name: "Rust".to_string(),
2822                    path_suffixes: vec!["rs".to_string()],
2823                    language_server: Some(language_server_config),
2824                    ..Default::default()
2825                },
2826                Some(tree_sitter_rust::language()),
2827            )));
2828
2829        // Connect to a server as 2 clients.
2830        let mut server = TestServer::start(cx_a.foreground()).await;
2831        let client_a = server.create_client(&mut cx_a, "user_a").await;
2832        let client_b = server.create_client(&mut cx_b, "user_b").await;
2833
2834        // Share a project as client A
2835        let project_a = cx_a.update(|cx| {
2836            Project::local(
2837                client_a.clone(),
2838                client_a.user_store.clone(),
2839                lang_registry.clone(),
2840                fs.clone(),
2841                cx,
2842            )
2843        });
2844        let (worktree_a, _) = project_a
2845            .update(&mut cx_a, |p, cx| {
2846                p.find_or_create_local_worktree("/root", false, cx)
2847            })
2848            .await
2849            .unwrap();
2850        worktree_a
2851            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2852            .await;
2853        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2854        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2855        project_a
2856            .update(&mut cx_a, |p, cx| p.share(cx))
2857            .await
2858            .unwrap();
2859
2860        // Join the worktree as client B.
2861        let project_b = Project::remote(
2862            project_id,
2863            client_b.clone(),
2864            client_b.user_store.clone(),
2865            lang_registry.clone(),
2866            fs.clone(),
2867            &mut cx_b.to_async(),
2868        )
2869        .await
2870        .unwrap();
2871
2872        let buffer_b1 = cx_b
2873            .background()
2874            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2875            .await
2876            .unwrap();
2877
2878        let definitions;
2879        let buffer_b2;
2880        if rng.gen() {
2881            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2882            buffer_b2 =
2883                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2884        } else {
2885            buffer_b2 =
2886                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2887            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2888        }
2889
2890        let (request_id, _) = fake_language_server
2891            .receive_request::<lsp::request::GotoDefinition>()
2892            .await;
2893        fake_language_server
2894            .respond(
2895                request_id,
2896                Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2897                    lsp::Url::from_file_path("/root/b.rs").unwrap(),
2898                    lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2899                ))),
2900            )
2901            .await;
2902
2903        let buffer_b2 = buffer_b2.await.unwrap();
2904        let definitions = definitions.await.unwrap();
2905        assert_eq!(definitions.len(), 1);
2906        assert_eq!(definitions[0].target_buffer, buffer_b2);
2907    }
2908
2909    #[gpui::test(iterations = 10)]
2910    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2911        cx_a.foreground().forbid_parking();
2912
2913        // Connect to a server as 2 clients.
2914        let mut server = TestServer::start(cx_a.foreground()).await;
2915        let client_a = server.create_client(&mut cx_a, "user_a").await;
2916        let client_b = server.create_client(&mut cx_b, "user_b").await;
2917
2918        // Create an org that includes these 2 users.
2919        let db = &server.app_state.db;
2920        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2921        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2922            .await
2923            .unwrap();
2924        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2925            .await
2926            .unwrap();
2927
2928        // Create a channel that includes all the users.
2929        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2930        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2931            .await
2932            .unwrap();
2933        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2934            .await
2935            .unwrap();
2936        db.create_channel_message(
2937            channel_id,
2938            client_b.current_user_id(&cx_b),
2939            "hello A, it's B.",
2940            OffsetDateTime::now_utc(),
2941            1,
2942        )
2943        .await
2944        .unwrap();
2945
2946        let channels_a = cx_a
2947            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2948        channels_a
2949            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2950            .await;
2951        channels_a.read_with(&cx_a, |list, _| {
2952            assert_eq!(
2953                list.available_channels().unwrap(),
2954                &[ChannelDetails {
2955                    id: channel_id.to_proto(),
2956                    name: "test-channel".to_string()
2957                }]
2958            )
2959        });
2960        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2961            this.get_channel(channel_id.to_proto(), cx).unwrap()
2962        });
2963        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2964        channel_a
2965            .condition(&cx_a, |channel, _| {
2966                channel_messages(channel)
2967                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2968            })
2969            .await;
2970
2971        let channels_b = cx_b
2972            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2973        channels_b
2974            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2975            .await;
2976        channels_b.read_with(&cx_b, |list, _| {
2977            assert_eq!(
2978                list.available_channels().unwrap(),
2979                &[ChannelDetails {
2980                    id: channel_id.to_proto(),
2981                    name: "test-channel".to_string()
2982                }]
2983            )
2984        });
2985
2986        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2987            this.get_channel(channel_id.to_proto(), cx).unwrap()
2988        });
2989        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2990        channel_b
2991            .condition(&cx_b, |channel, _| {
2992                channel_messages(channel)
2993                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2994            })
2995            .await;
2996
2997        channel_a
2998            .update(&mut cx_a, |channel, cx| {
2999                channel
3000                    .send_message("oh, hi B.".to_string(), cx)
3001                    .unwrap()
3002                    .detach();
3003                let task = channel.send_message("sup".to_string(), cx).unwrap();
3004                assert_eq!(
3005                    channel_messages(channel),
3006                    &[
3007                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3008                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3009                        ("user_a".to_string(), "sup".to_string(), true)
3010                    ]
3011                );
3012                task
3013            })
3014            .await
3015            .unwrap();
3016
3017        channel_b
3018            .condition(&cx_b, |channel, _| {
3019                channel_messages(channel)
3020                    == [
3021                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3022                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3023                        ("user_a".to_string(), "sup".to_string(), false),
3024                    ]
3025            })
3026            .await;
3027
3028        assert_eq!(
3029            server
3030                .state()
3031                .await
3032                .channel(channel_id)
3033                .unwrap()
3034                .connection_ids
3035                .len(),
3036            2
3037        );
3038        cx_b.update(|_| drop(channel_b));
3039        server
3040            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3041            .await;
3042
3043        cx_a.update(|_| drop(channel_a));
3044        server
3045            .condition(|state| state.channel(channel_id).is_none())
3046            .await;
3047    }
3048
3049    #[gpui::test(iterations = 10)]
3050    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3051        cx_a.foreground().forbid_parking();
3052
3053        let mut server = TestServer::start(cx_a.foreground()).await;
3054        let client_a = server.create_client(&mut cx_a, "user_a").await;
3055
3056        let db = &server.app_state.db;
3057        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3058        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3059        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3060            .await
3061            .unwrap();
3062        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3063            .await
3064            .unwrap();
3065
3066        let channels_a = cx_a
3067            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3068        channels_a
3069            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3070            .await;
3071        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3072            this.get_channel(channel_id.to_proto(), cx).unwrap()
3073        });
3074
3075        // Messages aren't allowed to be too long.
3076        channel_a
3077            .update(&mut cx_a, |channel, cx| {
3078                let long_body = "this is long.\n".repeat(1024);
3079                channel.send_message(long_body, cx).unwrap()
3080            })
3081            .await
3082            .unwrap_err();
3083
3084        // Messages aren't allowed to be blank.
3085        channel_a.update(&mut cx_a, |channel, cx| {
3086            channel.send_message(String::new(), cx).unwrap_err()
3087        });
3088
3089        // Leading and trailing whitespace are trimmed.
3090        channel_a
3091            .update(&mut cx_a, |channel, cx| {
3092                channel
3093                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3094                    .unwrap()
3095            })
3096            .await
3097            .unwrap();
3098        assert_eq!(
3099            db.get_channel_messages(channel_id, 10, None)
3100                .await
3101                .unwrap()
3102                .iter()
3103                .map(|m| &m.body)
3104                .collect::<Vec<_>>(),
3105            &["surrounded by whitespace"]
3106        );
3107    }
3108
3109    #[gpui::test(iterations = 10)]
3110    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3111        cx_a.foreground().forbid_parking();
3112
3113        // Connect to a server as 2 clients.
3114        let mut server = TestServer::start(cx_a.foreground()).await;
3115        let client_a = server.create_client(&mut cx_a, "user_a").await;
3116        let client_b = server.create_client(&mut cx_b, "user_b").await;
3117        let mut status_b = client_b.status();
3118
3119        // Create an org that includes these 2 users.
3120        let db = &server.app_state.db;
3121        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3122        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3123            .await
3124            .unwrap();
3125        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3126            .await
3127            .unwrap();
3128
3129        // Create a channel that includes all the users.
3130        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3131        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3132            .await
3133            .unwrap();
3134        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3135            .await
3136            .unwrap();
3137        db.create_channel_message(
3138            channel_id,
3139            client_b.current_user_id(&cx_b),
3140            "hello A, it's B.",
3141            OffsetDateTime::now_utc(),
3142            2,
3143        )
3144        .await
3145        .unwrap();
3146
3147        let channels_a = cx_a
3148            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3149        channels_a
3150            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3151            .await;
3152
3153        channels_a.read_with(&cx_a, |list, _| {
3154            assert_eq!(
3155                list.available_channels().unwrap(),
3156                &[ChannelDetails {
3157                    id: channel_id.to_proto(),
3158                    name: "test-channel".to_string()
3159                }]
3160            )
3161        });
3162        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3163            this.get_channel(channel_id.to_proto(), cx).unwrap()
3164        });
3165        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3166        channel_a
3167            .condition(&cx_a, |channel, _| {
3168                channel_messages(channel)
3169                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3170            })
3171            .await;
3172
3173        let channels_b = cx_b
3174            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3175        channels_b
3176            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3177            .await;
3178        channels_b.read_with(&cx_b, |list, _| {
3179            assert_eq!(
3180                list.available_channels().unwrap(),
3181                &[ChannelDetails {
3182                    id: channel_id.to_proto(),
3183                    name: "test-channel".to_string()
3184                }]
3185            )
3186        });
3187
3188        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3189            this.get_channel(channel_id.to_proto(), cx).unwrap()
3190        });
3191        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3192        channel_b
3193            .condition(&cx_b, |channel, _| {
3194                channel_messages(channel)
3195                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3196            })
3197            .await;
3198
3199        // Disconnect client B, ensuring we can still access its cached channel data.
3200        server.forbid_connections();
3201        server.disconnect_client(client_b.current_user_id(&cx_b));
3202        while !matches!(
3203            status_b.next().await,
3204            Some(client::Status::ReconnectionError { .. })
3205        ) {}
3206
3207        channels_b.read_with(&cx_b, |channels, _| {
3208            assert_eq!(
3209                channels.available_channels().unwrap(),
3210                [ChannelDetails {
3211                    id: channel_id.to_proto(),
3212                    name: "test-channel".to_string()
3213                }]
3214            )
3215        });
3216        channel_b.read_with(&cx_b, |channel, _| {
3217            assert_eq!(
3218                channel_messages(channel),
3219                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3220            )
3221        });
3222
3223        // Send a message from client B while it is disconnected.
3224        channel_b
3225            .update(&mut cx_b, |channel, cx| {
3226                let task = channel
3227                    .send_message("can you see this?".to_string(), cx)
3228                    .unwrap();
3229                assert_eq!(
3230                    channel_messages(channel),
3231                    &[
3232                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3233                        ("user_b".to_string(), "can you see this?".to_string(), true)
3234                    ]
3235                );
3236                task
3237            })
3238            .await
3239            .unwrap_err();
3240
3241        // Send a message from client A while B is disconnected.
3242        channel_a
3243            .update(&mut cx_a, |channel, cx| {
3244                channel
3245                    .send_message("oh, hi B.".to_string(), cx)
3246                    .unwrap()
3247                    .detach();
3248                let task = channel.send_message("sup".to_string(), cx).unwrap();
3249                assert_eq!(
3250                    channel_messages(channel),
3251                    &[
3252                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3253                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3254                        ("user_a".to_string(), "sup".to_string(), true)
3255                    ]
3256                );
3257                task
3258            })
3259            .await
3260            .unwrap();
3261
3262        // Give client B a chance to reconnect.
3263        server.allow_connections();
3264        cx_b.foreground().advance_clock(Duration::from_secs(10));
3265
3266        // Verify that B sees the new messages upon reconnection, as well as the message client B
3267        // sent while offline.
3268        channel_b
3269            .condition(&cx_b, |channel, _| {
3270                channel_messages(channel)
3271                    == [
3272                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3273                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3274                        ("user_a".to_string(), "sup".to_string(), false),
3275                        ("user_b".to_string(), "can you see this?".to_string(), false),
3276                    ]
3277            })
3278            .await;
3279
3280        // Ensure client A and B can communicate normally after reconnection.
3281        channel_a
3282            .update(&mut cx_a, |channel, cx| {
3283                channel.send_message("you online?".to_string(), cx).unwrap()
3284            })
3285            .await
3286            .unwrap();
3287        channel_b
3288            .condition(&cx_b, |channel, _| {
3289                channel_messages(channel)
3290                    == [
3291                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3292                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3293                        ("user_a".to_string(), "sup".to_string(), false),
3294                        ("user_b".to_string(), "can you see this?".to_string(), false),
3295                        ("user_a".to_string(), "you online?".to_string(), false),
3296                    ]
3297            })
3298            .await;
3299
3300        channel_b
3301            .update(&mut cx_b, |channel, cx| {
3302                channel.send_message("yep".to_string(), cx).unwrap()
3303            })
3304            .await
3305            .unwrap();
3306        channel_a
3307            .condition(&cx_a, |channel, _| {
3308                channel_messages(channel)
3309                    == [
3310                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3311                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3312                        ("user_a".to_string(), "sup".to_string(), false),
3313                        ("user_b".to_string(), "can you see this?".to_string(), false),
3314                        ("user_a".to_string(), "you online?".to_string(), false),
3315                        ("user_b".to_string(), "yep".to_string(), false),
3316                    ]
3317            })
3318            .await;
3319    }
3320
3321    #[gpui::test(iterations = 10)]
3322    async fn test_contacts(
3323        mut cx_a: TestAppContext,
3324        mut cx_b: TestAppContext,
3325        mut cx_c: TestAppContext,
3326    ) {
3327        cx_a.foreground().forbid_parking();
3328        let lang_registry = Arc::new(LanguageRegistry::new());
3329        let fs = Arc::new(FakeFs::new(cx_a.background()));
3330
3331        // Connect to a server as 3 clients.
3332        let mut server = TestServer::start(cx_a.foreground()).await;
3333        let client_a = server.create_client(&mut cx_a, "user_a").await;
3334        let client_b = server.create_client(&mut cx_b, "user_b").await;
3335        let client_c = server.create_client(&mut cx_c, "user_c").await;
3336
3337        // Share a worktree as client A.
3338        fs.insert_tree(
3339            "/a",
3340            json!({
3341                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3342            }),
3343        )
3344        .await;
3345
3346        let project_a = cx_a.update(|cx| {
3347            Project::local(
3348                client_a.clone(),
3349                client_a.user_store.clone(),
3350                lang_registry.clone(),
3351                fs.clone(),
3352                cx,
3353            )
3354        });
3355        let (worktree_a, _) = project_a
3356            .update(&mut cx_a, |p, cx| {
3357                p.find_or_create_local_worktree("/a", false, cx)
3358            })
3359            .await
3360            .unwrap();
3361        worktree_a
3362            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3363            .await;
3364
3365        client_a
3366            .user_store
3367            .condition(&cx_a, |user_store, _| {
3368                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3369            })
3370            .await;
3371        client_b
3372            .user_store
3373            .condition(&cx_b, |user_store, _| {
3374                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3375            })
3376            .await;
3377        client_c
3378            .user_store
3379            .condition(&cx_c, |user_store, _| {
3380                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3381            })
3382            .await;
3383
3384        let project_id = project_a
3385            .update(&mut cx_a, |project, _| project.next_remote_id())
3386            .await;
3387        project_a
3388            .update(&mut cx_a, |project, cx| project.share(cx))
3389            .await
3390            .unwrap();
3391
3392        let _project_b = Project::remote(
3393            project_id,
3394            client_b.clone(),
3395            client_b.user_store.clone(),
3396            lang_registry.clone(),
3397            fs.clone(),
3398            &mut cx_b.to_async(),
3399        )
3400        .await
3401        .unwrap();
3402
3403        client_a
3404            .user_store
3405            .condition(&cx_a, |user_store, _| {
3406                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3407            })
3408            .await;
3409        client_b
3410            .user_store
3411            .condition(&cx_b, |user_store, _| {
3412                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3413            })
3414            .await;
3415        client_c
3416            .user_store
3417            .condition(&cx_c, |user_store, _| {
3418                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3419            })
3420            .await;
3421
3422        project_a
3423            .condition(&cx_a, |project, _| {
3424                project.collaborators().contains_key(&client_b.peer_id)
3425            })
3426            .await;
3427
3428        cx_a.update(move |_| drop(project_a));
3429        client_a
3430            .user_store
3431            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3432            .await;
3433        client_b
3434            .user_store
3435            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3436            .await;
3437        client_c
3438            .user_store
3439            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3440            .await;
3441
3442        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3443            user_store
3444                .contacts()
3445                .iter()
3446                .map(|contact| {
3447                    let worktrees = contact
3448                        .projects
3449                        .iter()
3450                        .map(|p| {
3451                            (
3452                                p.worktree_root_names[0].as_str(),
3453                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3454                            )
3455                        })
3456                        .collect();
3457                    (contact.user.github_login.as_str(), worktrees)
3458                })
3459                .collect()
3460        }
3461    }
3462
3463    struct TestServer {
3464        peer: Arc<Peer>,
3465        app_state: Arc<AppState>,
3466        server: Arc<Server>,
3467        foreground: Rc<executor::Foreground>,
3468        notifications: mpsc::Receiver<()>,
3469        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3470        forbid_connections: Arc<AtomicBool>,
3471        _test_db: TestDb,
3472    }
3473
3474    impl TestServer {
3475        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3476            let test_db = TestDb::new();
3477            let app_state = Self::build_app_state(&test_db).await;
3478            let peer = Peer::new();
3479            let notifications = mpsc::channel(128);
3480            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3481            Self {
3482                peer,
3483                app_state,
3484                server,
3485                foreground,
3486                notifications: notifications.1,
3487                connection_killers: Default::default(),
3488                forbid_connections: Default::default(),
3489                _test_db: test_db,
3490            }
3491        }
3492
3493        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3494            let http = FakeHttpClient::with_404_response();
3495            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3496            let client_name = name.to_string();
3497            let mut client = Client::new(http.clone());
3498            let server = self.server.clone();
3499            let connection_killers = self.connection_killers.clone();
3500            let forbid_connections = self.forbid_connections.clone();
3501            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3502
3503            Arc::get_mut(&mut client)
3504                .unwrap()
3505                .override_authenticate(move |cx| {
3506                    cx.spawn(|_| async move {
3507                        let access_token = "the-token".to_string();
3508                        Ok(Credentials {
3509                            user_id: user_id.0 as u64,
3510                            access_token,
3511                        })
3512                    })
3513                })
3514                .override_establish_connection(move |credentials, cx| {
3515                    assert_eq!(credentials.user_id, user_id.0 as u64);
3516                    assert_eq!(credentials.access_token, "the-token");
3517
3518                    let server = server.clone();
3519                    let connection_killers = connection_killers.clone();
3520                    let forbid_connections = forbid_connections.clone();
3521                    let client_name = client_name.clone();
3522                    let connection_id_tx = connection_id_tx.clone();
3523                    cx.spawn(move |cx| async move {
3524                        if forbid_connections.load(SeqCst) {
3525                            Err(EstablishConnectionError::other(anyhow!(
3526                                "server is forbidding connections"
3527                            )))
3528                        } else {
3529                            let (client_conn, server_conn, kill_conn) =
3530                                Connection::in_memory(cx.background());
3531                            connection_killers.lock().insert(user_id, kill_conn);
3532                            cx.background()
3533                                .spawn(server.handle_connection(
3534                                    server_conn,
3535                                    client_name,
3536                                    user_id,
3537                                    Some(connection_id_tx),
3538                                ))
3539                                .detach();
3540                            Ok(client_conn)
3541                        }
3542                    })
3543                });
3544
3545            client
3546                .authenticate_and_connect(&cx.to_async())
3547                .await
3548                .unwrap();
3549
3550            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3551            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3552            let mut authed_user =
3553                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3554            while authed_user.next().await.unwrap().is_none() {}
3555
3556            TestClient {
3557                client,
3558                peer_id,
3559                user_store,
3560            }
3561        }
3562
3563        fn disconnect_client(&self, user_id: UserId) {
3564            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3565                let _ = kill_conn.try_send(Some(()));
3566            }
3567        }
3568
3569        fn forbid_connections(&self) {
3570            self.forbid_connections.store(true, SeqCst);
3571        }
3572
3573        fn allow_connections(&self) {
3574            self.forbid_connections.store(false, SeqCst);
3575        }
3576
3577        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3578            let mut config = Config::default();
3579            config.session_secret = "a".repeat(32);
3580            config.database_url = test_db.url.clone();
3581            let github_client = github::AppClient::test();
3582            Arc::new(AppState {
3583                db: test_db.db().clone(),
3584                handlebars: Default::default(),
3585                auth_client: auth::build_client("", ""),
3586                repo_client: github::RepoClient::test(&github_client),
3587                github_client,
3588                config,
3589            })
3590        }
3591
3592        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3593            self.server.store.read()
3594        }
3595
3596        async fn condition<F>(&mut self, mut predicate: F)
3597        where
3598            F: FnMut(&Store) -> bool,
3599        {
3600            async_std::future::timeout(Duration::from_millis(500), async {
3601                while !(predicate)(&*self.server.store.read()) {
3602                    self.foreground.start_waiting();
3603                    self.notifications.next().await;
3604                    self.foreground.finish_waiting();
3605                }
3606            })
3607            .await
3608            .expect("condition timed out");
3609        }
3610    }
3611
3612    impl Drop for TestServer {
3613        fn drop(&mut self) {
3614            self.peer.reset();
3615        }
3616    }
3617
3618    struct TestClient {
3619        client: Arc<Client>,
3620        pub peer_id: PeerId,
3621        pub user_store: ModelHandle<UserStore>,
3622    }
3623
3624    impl Deref for TestClient {
3625        type Target = Arc<Client>;
3626
3627        fn deref(&self) -> &Self::Target {
3628            &self.client
3629        }
3630    }
3631
3632    impl TestClient {
3633        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3634            UserId::from_proto(
3635                self.user_store
3636                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3637            )
3638        }
3639    }
3640
3641    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3642        channel
3643            .messages()
3644            .cursor::<()>()
3645            .map(|m| {
3646                (
3647                    m.sender.github_login.clone(),
3648                    m.body.clone(),
3649                    m.is_pending(),
3650                )
3651            })
3652            .collect()
3653    }
3654
3655    struct EmptyView;
3656
3657    impl gpui::Entity for EmptyView {
3658        type Event = ();
3659    }
3660
3661    impl gpui::View for EmptyView {
3662        fn ui_name() -> &'static str {
3663            "empty view"
3664        }
3665
3666        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3667            gpui::Element::boxed(gpui::elements::Empty)
3668        }
3669    }
3670}