rpc.rs

   1mod store;
   2
   3use super::{
   4    auth::process_auth_header,
   5    db::{ChannelId, MessageId, UserId},
   6    AppState,
   7};
   8use anyhow::anyhow;
   9use async_std::task;
  10use async_tungstenite::{tungstenite::protocol::Role, WebSocketStream};
  11use collections::{HashMap, HashSet};
  12use futures::{future::BoxFuture, FutureExt, StreamExt};
  13use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
  14use postage::{mpsc, prelude::Sink as _};
  15use rpc::{
  16    proto::{self, AnyTypedEnvelope, EnvelopedMessage},
  17    Connection, ConnectionId, Peer, TypedEnvelope,
  18};
  19use sha1::{Digest as _, Sha1};
  20use std::{any::TypeId, future::Future, path::PathBuf, sync::Arc, time::Instant};
  21use store::{Store, Worktree};
  22use surf::StatusCode;
  23use tide::log;
  24use tide::{
  25    http::headers::{HeaderName, CONNECTION, UPGRADE},
  26    Request, Response,
  27};
  28use time::OffsetDateTime;
  29
  30type MessageHandler = Box<
  31    dyn Send
  32        + Sync
  33        + Fn(Arc<Server>, Box<dyn AnyTypedEnvelope>) -> BoxFuture<'static, tide::Result<()>>,
  34>;
  35
  36pub struct Server {
  37    peer: Arc<Peer>,
  38    store: RwLock<Store>,
  39    app_state: Arc<AppState>,
  40    handlers: HashMap<TypeId, MessageHandler>,
  41    notifications: Option<mpsc::Sender<()>>,
  42}
  43
  44const MESSAGE_COUNT_PER_PAGE: usize = 100;
  45const MAX_MESSAGE_LEN: usize = 1024;
  46const NO_SUCH_PROJECT: &'static str = "no such project";
  47
  48impl Server {
  49    pub fn new(
  50        app_state: Arc<AppState>,
  51        peer: Arc<Peer>,
  52        notifications: Option<mpsc::Sender<()>>,
  53    ) -> Arc<Self> {
  54        let mut server = Self {
  55            peer,
  56            app_state,
  57            store: Default::default(),
  58            handlers: Default::default(),
  59            notifications,
  60        };
  61
  62        server
  63            .add_handler(Server::ping)
  64            .add_handler(Server::register_project)
  65            .add_handler(Server::unregister_project)
  66            .add_handler(Server::share_project)
  67            .add_handler(Server::unshare_project)
  68            .add_handler(Server::join_project)
  69            .add_handler(Server::leave_project)
  70            .add_handler(Server::register_worktree)
  71            .add_handler(Server::unregister_worktree)
  72            .add_handler(Server::share_worktree)
  73            .add_handler(Server::update_worktree)
  74            .add_handler(Server::update_diagnostic_summary)
  75            .add_handler(Server::disk_based_diagnostics_updating)
  76            .add_handler(Server::disk_based_diagnostics_updated)
  77            .add_handler(Server::get_definition)
  78            .add_handler(Server::open_buffer)
  79            .add_handler(Server::close_buffer)
  80            .add_handler(Server::update_buffer)
  81            .add_handler(Server::update_buffer_file)
  82            .add_handler(Server::buffer_reloaded)
  83            .add_handler(Server::buffer_saved)
  84            .add_handler(Server::save_buffer)
  85            .add_handler(Server::format_buffers)
  86            .add_handler(Server::get_completions)
  87            .add_handler(Server::apply_additional_edits_for_completion)
  88            .add_handler(Server::get_code_actions)
  89            .add_handler(Server::apply_code_action)
  90            .add_handler(Server::get_channels)
  91            .add_handler(Server::get_users)
  92            .add_handler(Server::join_channel)
  93            .add_handler(Server::leave_channel)
  94            .add_handler(Server::send_channel_message)
  95            .add_handler(Server::get_channel_messages);
  96
  97        Arc::new(server)
  98    }
  99
 100    fn add_handler<F, Fut, M>(&mut self, handler: F) -> &mut Self
 101    where
 102        F: 'static + Send + Sync + Fn(Arc<Self>, TypedEnvelope<M>) -> Fut,
 103        Fut: 'static + Send + Future<Output = tide::Result<()>>,
 104        M: EnvelopedMessage,
 105    {
 106        let prev_handler = self.handlers.insert(
 107            TypeId::of::<M>(),
 108            Box::new(move |server, envelope| {
 109                let envelope = envelope.into_any().downcast::<TypedEnvelope<M>>().unwrap();
 110                (handler)(server, *envelope).boxed()
 111            }),
 112        );
 113        if prev_handler.is_some() {
 114            panic!("registered a handler for the same message twice");
 115        }
 116        self
 117    }
 118
 119    pub fn handle_connection(
 120        self: &Arc<Self>,
 121        connection: Connection,
 122        addr: String,
 123        user_id: UserId,
 124        mut send_connection_id: Option<postage::mpsc::Sender<ConnectionId>>,
 125    ) -> impl Future<Output = ()> {
 126        let mut this = self.clone();
 127        async move {
 128            let (connection_id, handle_io, mut incoming_rx) =
 129                this.peer.add_connection(connection).await;
 130
 131            if let Some(send_connection_id) = send_connection_id.as_mut() {
 132                let _ = send_connection_id.send(connection_id).await;
 133            }
 134
 135            this.state_mut().add_connection(connection_id, user_id);
 136            if let Err(err) = this.update_contacts_for_users(&[user_id]) {
 137                log::error!("error updating contacts for {:?}: {}", user_id, err);
 138            }
 139
 140            let handle_io = handle_io.fuse();
 141            futures::pin_mut!(handle_io);
 142            loop {
 143                let next_message = incoming_rx.next().fuse();
 144                futures::pin_mut!(next_message);
 145                futures::select_biased! {
 146                    result = handle_io => {
 147                        if let Err(err) = result {
 148                            log::error!("error handling rpc connection {:?} - {:?}", addr, err);
 149                        }
 150                        break;
 151                    }
 152                    message = next_message => {
 153                        if let Some(message) = message {
 154                            let start_time = Instant::now();
 155                            let type_name = message.payload_type_name();
 156                            log::info!("rpc message received. connection:{}, type:{}", connection_id, type_name);
 157                            if let Some(handler) = this.handlers.get(&message.payload_type_id()) {
 158                                if let Err(err) = (handler)(this.clone(), message).await {
 159                                    log::error!("rpc message error. connection:{}, type:{}, error:{:?}", connection_id, type_name, err);
 160                                } else {
 161                                    log::info!("rpc message handled. connection:{}, type:{}, duration:{:?}", connection_id, type_name, start_time.elapsed());
 162                                }
 163
 164                                if let Some(mut notifications) = this.notifications.clone() {
 165                                    let _ = notifications.send(()).await;
 166                                }
 167                            } else {
 168                                log::warn!("unhandled message: {}", type_name);
 169                            }
 170                        } else {
 171                            log::info!("rpc connection closed {:?}", addr);
 172                            break;
 173                        }
 174                    }
 175                }
 176            }
 177
 178            if let Err(err) = this.sign_out(connection_id).await {
 179                log::error!("error signing out connection {:?} - {:?}", addr, err);
 180            }
 181        }
 182    }
 183
 184    async fn sign_out(self: &mut Arc<Self>, connection_id: ConnectionId) -> tide::Result<()> {
 185        self.peer.disconnect(connection_id);
 186        let removed_connection = self.state_mut().remove_connection(connection_id)?;
 187
 188        for (project_id, project) in removed_connection.hosted_projects {
 189            if let Some(share) = project.share {
 190                broadcast(
 191                    connection_id,
 192                    share.guests.keys().copied().collect(),
 193                    |conn_id| {
 194                        self.peer
 195                            .send(conn_id, proto::UnshareProject { project_id })
 196                    },
 197                )?;
 198            }
 199        }
 200
 201        for (project_id, peer_ids) in removed_connection.guest_project_ids {
 202            broadcast(connection_id, peer_ids, |conn_id| {
 203                self.peer.send(
 204                    conn_id,
 205                    proto::RemoveProjectCollaborator {
 206                        project_id,
 207                        peer_id: connection_id.0,
 208                    },
 209                )
 210            })?;
 211        }
 212
 213        self.update_contacts_for_users(removed_connection.contact_ids.iter())?;
 214        Ok(())
 215    }
 216
 217    async fn ping(self: Arc<Server>, request: TypedEnvelope<proto::Ping>) -> tide::Result<()> {
 218        self.peer.respond(request.receipt(), proto::Ack {})?;
 219        Ok(())
 220    }
 221
 222    async fn register_project(
 223        mut self: Arc<Server>,
 224        request: TypedEnvelope<proto::RegisterProject>,
 225    ) -> tide::Result<()> {
 226        let project_id = {
 227            let mut state = self.state_mut();
 228            let user_id = state.user_id_for_connection(request.sender_id)?;
 229            state.register_project(request.sender_id, user_id)
 230        };
 231        self.peer.respond(
 232            request.receipt(),
 233            proto::RegisterProjectResponse { project_id },
 234        )?;
 235        Ok(())
 236    }
 237
 238    async fn unregister_project(
 239        mut self: Arc<Server>,
 240        request: TypedEnvelope<proto::UnregisterProject>,
 241    ) -> tide::Result<()> {
 242        let project = self
 243            .state_mut()
 244            .unregister_project(request.payload.project_id, request.sender_id)
 245            .ok_or_else(|| anyhow!("no such project"))?;
 246        self.update_contacts_for_users(project.authorized_user_ids().iter())?;
 247        Ok(())
 248    }
 249
 250    async fn share_project(
 251        mut self: Arc<Server>,
 252        request: TypedEnvelope<proto::ShareProject>,
 253    ) -> tide::Result<()> {
 254        self.state_mut()
 255            .share_project(request.payload.project_id, request.sender_id);
 256        self.peer.respond(request.receipt(), proto::Ack {})?;
 257        Ok(())
 258    }
 259
 260    async fn unshare_project(
 261        mut self: Arc<Server>,
 262        request: TypedEnvelope<proto::UnshareProject>,
 263    ) -> tide::Result<()> {
 264        let project_id = request.payload.project_id;
 265        let project = self
 266            .state_mut()
 267            .unshare_project(project_id, request.sender_id)?;
 268
 269        broadcast(request.sender_id, project.connection_ids, |conn_id| {
 270            self.peer
 271                .send(conn_id, proto::UnshareProject { project_id })
 272        })?;
 273        self.update_contacts_for_users(&project.authorized_user_ids)?;
 274        Ok(())
 275    }
 276
 277    async fn join_project(
 278        mut self: Arc<Server>,
 279        request: TypedEnvelope<proto::JoinProject>,
 280    ) -> tide::Result<()> {
 281        let project_id = request.payload.project_id;
 282
 283        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 284        let response_data = self
 285            .state_mut()
 286            .join_project(request.sender_id, user_id, project_id)
 287            .and_then(|joined| {
 288                let share = joined.project.share()?;
 289                let peer_count = share.guests.len();
 290                let mut collaborators = Vec::with_capacity(peer_count);
 291                collaborators.push(proto::Collaborator {
 292                    peer_id: joined.project.host_connection_id.0,
 293                    replica_id: 0,
 294                    user_id: joined.project.host_user_id.to_proto(),
 295                });
 296                let worktrees = joined
 297                    .project
 298                    .worktrees
 299                    .iter()
 300                    .filter_map(|(id, worktree)| {
 301                        worktree.share.as_ref().map(|share| proto::Worktree {
 302                            id: *id,
 303                            root_name: worktree.root_name.clone(),
 304                            entries: share.entries.values().cloned().collect(),
 305                            diagnostic_summaries: share
 306                                .diagnostic_summaries
 307                                .values()
 308                                .cloned()
 309                                .collect(),
 310                            weak: worktree.weak,
 311                        })
 312                    })
 313                    .collect();
 314                for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests {
 315                    if *peer_conn_id != request.sender_id {
 316                        collaborators.push(proto::Collaborator {
 317                            peer_id: peer_conn_id.0,
 318                            replica_id: *peer_replica_id as u32,
 319                            user_id: peer_user_id.to_proto(),
 320                        });
 321                    }
 322                }
 323                let response = proto::JoinProjectResponse {
 324                    worktrees,
 325                    replica_id: joined.replica_id as u32,
 326                    collaborators,
 327                };
 328                let connection_ids = joined.project.connection_ids();
 329                let contact_user_ids = joined.project.authorized_user_ids();
 330                Ok((response, connection_ids, contact_user_ids))
 331            });
 332
 333        match response_data {
 334            Ok((response, connection_ids, contact_user_ids)) => {
 335                broadcast(request.sender_id, connection_ids, |conn_id| {
 336                    self.peer.send(
 337                        conn_id,
 338                        proto::AddProjectCollaborator {
 339                            project_id,
 340                            collaborator: Some(proto::Collaborator {
 341                                peer_id: request.sender_id.0,
 342                                replica_id: response.replica_id,
 343                                user_id: user_id.to_proto(),
 344                            }),
 345                        },
 346                    )
 347                })?;
 348                self.peer.respond(request.receipt(), response)?;
 349                self.update_contacts_for_users(&contact_user_ids)?;
 350            }
 351            Err(error) => {
 352                self.peer.respond_with_error(
 353                    request.receipt(),
 354                    proto::Error {
 355                        message: error.to_string(),
 356                    },
 357                )?;
 358            }
 359        }
 360
 361        Ok(())
 362    }
 363
 364    async fn leave_project(
 365        mut self: Arc<Server>,
 366        request: TypedEnvelope<proto::LeaveProject>,
 367    ) -> tide::Result<()> {
 368        let sender_id = request.sender_id;
 369        let project_id = request.payload.project_id;
 370        let worktree = self.state_mut().leave_project(sender_id, project_id);
 371        if let Some(worktree) = worktree {
 372            broadcast(sender_id, worktree.connection_ids, |conn_id| {
 373                self.peer.send(
 374                    conn_id,
 375                    proto::RemoveProjectCollaborator {
 376                        project_id,
 377                        peer_id: sender_id.0,
 378                    },
 379                )
 380            })?;
 381            self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 382        }
 383        Ok(())
 384    }
 385
 386    async fn register_worktree(
 387        mut self: Arc<Server>,
 388        request: TypedEnvelope<proto::RegisterWorktree>,
 389    ) -> tide::Result<()> {
 390        let receipt = request.receipt();
 391        let host_user_id = self.state().user_id_for_connection(request.sender_id)?;
 392
 393        let mut contact_user_ids = HashSet::default();
 394        contact_user_ids.insert(host_user_id);
 395        for github_login in request.payload.authorized_logins {
 396            match self.app_state.db.create_user(&github_login, false).await {
 397                Ok(contact_user_id) => {
 398                    contact_user_ids.insert(contact_user_id);
 399                }
 400                Err(err) => {
 401                    let message = err.to_string();
 402                    self.peer
 403                        .respond_with_error(receipt, proto::Error { message })?;
 404                    return Ok(());
 405                }
 406            }
 407        }
 408
 409        let contact_user_ids = contact_user_ids.into_iter().collect::<Vec<_>>();
 410        let ok = self.state_mut().register_worktree(
 411            request.payload.project_id,
 412            request.payload.worktree_id,
 413            Worktree {
 414                authorized_user_ids: contact_user_ids.clone(),
 415                root_name: request.payload.root_name,
 416                share: None,
 417                weak: false,
 418            },
 419        );
 420
 421        if ok {
 422            self.peer.respond(receipt, proto::Ack {})?;
 423            self.update_contacts_for_users(&contact_user_ids)?;
 424        } else {
 425            self.peer.respond_with_error(
 426                receipt,
 427                proto::Error {
 428                    message: NO_SUCH_PROJECT.to_string(),
 429                },
 430            )?;
 431        }
 432
 433        Ok(())
 434    }
 435
 436    async fn unregister_worktree(
 437        mut self: Arc<Server>,
 438        request: TypedEnvelope<proto::UnregisterWorktree>,
 439    ) -> tide::Result<()> {
 440        let project_id = request.payload.project_id;
 441        let worktree_id = request.payload.worktree_id;
 442        let (worktree, guest_connection_ids) =
 443            self.state_mut()
 444                .unregister_worktree(project_id, worktree_id, request.sender_id)?;
 445        broadcast(request.sender_id, guest_connection_ids, |conn_id| {
 446            self.peer.send(
 447                conn_id,
 448                proto::UnregisterWorktree {
 449                    project_id,
 450                    worktree_id,
 451                },
 452            )
 453        })?;
 454        self.update_contacts_for_users(&worktree.authorized_user_ids)?;
 455        Ok(())
 456    }
 457
 458    async fn share_worktree(
 459        mut self: Arc<Server>,
 460        mut request: TypedEnvelope<proto::ShareWorktree>,
 461    ) -> tide::Result<()> {
 462        let worktree = request
 463            .payload
 464            .worktree
 465            .as_mut()
 466            .ok_or_else(|| anyhow!("missing worktree"))?;
 467        let entries = worktree
 468            .entries
 469            .iter()
 470            .map(|entry| (entry.id, entry.clone()))
 471            .collect();
 472        let diagnostic_summaries = worktree
 473            .diagnostic_summaries
 474            .iter()
 475            .map(|summary| (PathBuf::from(summary.path.clone()), summary.clone()))
 476            .collect();
 477
 478        let shared_worktree = self.state_mut().share_worktree(
 479            request.payload.project_id,
 480            worktree.id,
 481            request.sender_id,
 482            entries,
 483            diagnostic_summaries,
 484        );
 485        if let Some(shared_worktree) = shared_worktree {
 486            broadcast(
 487                request.sender_id,
 488                shared_worktree.connection_ids,
 489                |connection_id| {
 490                    self.peer.forward_send(
 491                        request.sender_id,
 492                        connection_id,
 493                        request.payload.clone(),
 494                    )
 495                },
 496            )?;
 497            self.peer.respond(request.receipt(), proto::Ack {})?;
 498            self.update_contacts_for_users(&shared_worktree.authorized_user_ids)?;
 499        } else {
 500            self.peer.respond_with_error(
 501                request.receipt(),
 502                proto::Error {
 503                    message: "no such worktree".to_string(),
 504                },
 505            )?;
 506        }
 507        Ok(())
 508    }
 509
 510    async fn update_worktree(
 511        mut self: Arc<Server>,
 512        request: TypedEnvelope<proto::UpdateWorktree>,
 513    ) -> tide::Result<()> {
 514        let connection_ids = self
 515            .state_mut()
 516            .update_worktree(
 517                request.sender_id,
 518                request.payload.project_id,
 519                request.payload.worktree_id,
 520                &request.payload.removed_entries,
 521                &request.payload.updated_entries,
 522            )
 523            .ok_or_else(|| anyhow!("no such worktree"))?;
 524
 525        broadcast(request.sender_id, connection_ids, |connection_id| {
 526            self.peer
 527                .forward_send(request.sender_id, connection_id, request.payload.clone())
 528        })?;
 529
 530        Ok(())
 531    }
 532
 533    async fn update_diagnostic_summary(
 534        mut self: Arc<Server>,
 535        request: TypedEnvelope<proto::UpdateDiagnosticSummary>,
 536    ) -> tide::Result<()> {
 537        let receiver_ids = request
 538            .payload
 539            .summary
 540            .clone()
 541            .and_then(|summary| {
 542                self.state_mut().update_diagnostic_summary(
 543                    request.payload.project_id,
 544                    request.payload.worktree_id,
 545                    request.sender_id,
 546                    summary,
 547                )
 548            })
 549            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 550
 551        broadcast(request.sender_id, receiver_ids, |connection_id| {
 552            self.peer
 553                .forward_send(request.sender_id, connection_id, request.payload.clone())
 554        })?;
 555        Ok(())
 556    }
 557
 558    async fn disk_based_diagnostics_updating(
 559        self: Arc<Server>,
 560        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdating>,
 561    ) -> tide::Result<()> {
 562        let receiver_ids = self
 563            .state()
 564            .project_connection_ids(request.payload.project_id, request.sender_id)
 565            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 566        broadcast(request.sender_id, receiver_ids, |connection_id| {
 567            self.peer
 568                .forward_send(request.sender_id, connection_id, request.payload.clone())
 569        })?;
 570        Ok(())
 571    }
 572
 573    async fn disk_based_diagnostics_updated(
 574        self: Arc<Server>,
 575        request: TypedEnvelope<proto::DiskBasedDiagnosticsUpdated>,
 576    ) -> tide::Result<()> {
 577        let receiver_ids = self
 578            .state()
 579            .project_connection_ids(request.payload.project_id, request.sender_id)
 580            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 581        broadcast(request.sender_id, receiver_ids, |connection_id| {
 582            self.peer
 583                .forward_send(request.sender_id, connection_id, request.payload.clone())
 584        })?;
 585        Ok(())
 586    }
 587
 588    async fn get_definition(
 589        self: Arc<Server>,
 590        request: TypedEnvelope<proto::GetDefinition>,
 591    ) -> tide::Result<()> {
 592        let receipt = request.receipt();
 593        let host_connection_id = self
 594            .state()
 595            .read_project(request.payload.project_id, request.sender_id)
 596            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 597            .host_connection_id;
 598        let response = self
 599            .peer
 600            .forward_request(request.sender_id, host_connection_id, request.payload)
 601            .await?;
 602        self.peer.respond(receipt, response)?;
 603        Ok(())
 604    }
 605
 606    async fn open_buffer(
 607        self: Arc<Server>,
 608        request: TypedEnvelope<proto::OpenBuffer>,
 609    ) -> tide::Result<()> {
 610        let receipt = request.receipt();
 611        let host_connection_id = self
 612            .state()
 613            .read_project(request.payload.project_id, request.sender_id)
 614            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 615            .host_connection_id;
 616        let response = self
 617            .peer
 618            .forward_request(request.sender_id, host_connection_id, request.payload)
 619            .await?;
 620        self.peer.respond(receipt, response)?;
 621        Ok(())
 622    }
 623
 624    async fn close_buffer(
 625        self: Arc<Server>,
 626        request: TypedEnvelope<proto::CloseBuffer>,
 627    ) -> tide::Result<()> {
 628        let host_connection_id = self
 629            .state()
 630            .read_project(request.payload.project_id, request.sender_id)
 631            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?
 632            .host_connection_id;
 633        self.peer
 634            .forward_send(request.sender_id, host_connection_id, request.payload)?;
 635        Ok(())
 636    }
 637
 638    async fn save_buffer(
 639        self: Arc<Server>,
 640        request: TypedEnvelope<proto::SaveBuffer>,
 641    ) -> tide::Result<()> {
 642        let host;
 643        let guests;
 644        {
 645            let state = self.state();
 646            let project = state
 647                .read_project(request.payload.project_id, request.sender_id)
 648                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 649            host = project.host_connection_id;
 650            guests = project.guest_connection_ids()
 651        }
 652
 653        let sender = request.sender_id;
 654        let receipt = request.receipt();
 655        let response = self
 656            .peer
 657            .forward_request(sender, host, request.payload.clone())
 658            .await?;
 659
 660        broadcast(host, guests, |conn_id| {
 661            let response = response.clone();
 662            if conn_id == sender {
 663                self.peer.respond(receipt, response)
 664            } else {
 665                self.peer.forward_send(host, conn_id, response)
 666            }
 667        })?;
 668
 669        Ok(())
 670    }
 671
 672    async fn format_buffers(
 673        self: Arc<Server>,
 674        request: TypedEnvelope<proto::FormatBuffers>,
 675    ) -> tide::Result<()> {
 676        let host;
 677        {
 678            let state = self.state();
 679            let project = state
 680                .read_project(request.payload.project_id, request.sender_id)
 681                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 682            host = project.host_connection_id;
 683        }
 684
 685        let sender = request.sender_id;
 686        let receipt = request.receipt();
 687        let response = self
 688            .peer
 689            .forward_request(sender, host, request.payload.clone())
 690            .await?;
 691        self.peer.respond(receipt, response)?;
 692
 693        Ok(())
 694    }
 695
 696    async fn get_completions(
 697        self: Arc<Server>,
 698        request: TypedEnvelope<proto::GetCompletions>,
 699    ) -> tide::Result<()> {
 700        let host;
 701        {
 702            let state = self.state();
 703            let project = state
 704                .read_project(request.payload.project_id, request.sender_id)
 705                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 706            host = project.host_connection_id;
 707        }
 708
 709        let sender = request.sender_id;
 710        let receipt = request.receipt();
 711        let response = self
 712            .peer
 713            .forward_request(sender, host, request.payload.clone())
 714            .await?;
 715        self.peer.respond(receipt, response)?;
 716        Ok(())
 717    }
 718
 719    async fn apply_additional_edits_for_completion(
 720        self: Arc<Server>,
 721        request: TypedEnvelope<proto::ApplyCompletionAdditionalEdits>,
 722    ) -> tide::Result<()> {
 723        let host;
 724        {
 725            let state = self.state();
 726            let project = state
 727                .read_project(request.payload.project_id, request.sender_id)
 728                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 729            host = project.host_connection_id;
 730        }
 731
 732        let sender = request.sender_id;
 733        let receipt = request.receipt();
 734        let response = self
 735            .peer
 736            .forward_request(sender, host, request.payload.clone())
 737            .await?;
 738        self.peer.respond(receipt, response)?;
 739        Ok(())
 740    }
 741
 742    async fn get_code_actions(
 743        self: Arc<Server>,
 744        request: TypedEnvelope<proto::GetCodeActions>,
 745    ) -> tide::Result<()> {
 746        let host;
 747        {
 748            let state = self.state();
 749            let project = state
 750                .read_project(request.payload.project_id, request.sender_id)
 751                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 752            host = project.host_connection_id;
 753        }
 754
 755        let sender = request.sender_id;
 756        let receipt = request.receipt();
 757        let response = self
 758            .peer
 759            .forward_request(sender, host, request.payload.clone())
 760            .await?;
 761        self.peer.respond(receipt, response)?;
 762        Ok(())
 763    }
 764
 765    async fn apply_code_action(
 766        self: Arc<Server>,
 767        request: TypedEnvelope<proto::ApplyCodeAction>,
 768    ) -> tide::Result<()> {
 769        let host;
 770        {
 771            let state = self.state();
 772            let project = state
 773                .read_project(request.payload.project_id, request.sender_id)
 774                .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 775            host = project.host_connection_id;
 776        }
 777
 778        let sender = request.sender_id;
 779        let receipt = request.receipt();
 780        let response = self
 781            .peer
 782            .forward_request(sender, host, request.payload.clone())
 783            .await?;
 784        self.peer.respond(receipt, response)?;
 785        Ok(())
 786    }
 787
 788    async fn update_buffer(
 789        self: Arc<Server>,
 790        request: TypedEnvelope<proto::UpdateBuffer>,
 791    ) -> tide::Result<()> {
 792        let receiver_ids = self
 793            .state()
 794            .project_connection_ids(request.payload.project_id, request.sender_id)
 795            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 796        broadcast(request.sender_id, receiver_ids, |connection_id| {
 797            self.peer
 798                .forward_send(request.sender_id, connection_id, request.payload.clone())
 799        })?;
 800        self.peer.respond(request.receipt(), proto::Ack {})?;
 801        Ok(())
 802    }
 803
 804    async fn update_buffer_file(
 805        self: Arc<Server>,
 806        request: TypedEnvelope<proto::UpdateBufferFile>,
 807    ) -> tide::Result<()> {
 808        let receiver_ids = self
 809            .state()
 810            .project_connection_ids(request.payload.project_id, request.sender_id)
 811            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 812        broadcast(request.sender_id, receiver_ids, |connection_id| {
 813            self.peer
 814                .forward_send(request.sender_id, connection_id, request.payload.clone())
 815        })?;
 816        Ok(())
 817    }
 818
 819    async fn buffer_reloaded(
 820        self: Arc<Server>,
 821        request: TypedEnvelope<proto::BufferReloaded>,
 822    ) -> tide::Result<()> {
 823        let receiver_ids = self
 824            .state()
 825            .project_connection_ids(request.payload.project_id, request.sender_id)
 826            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 827        broadcast(request.sender_id, receiver_ids, |connection_id| {
 828            self.peer
 829                .forward_send(request.sender_id, connection_id, request.payload.clone())
 830        })?;
 831        Ok(())
 832    }
 833
 834    async fn buffer_saved(
 835        self: Arc<Server>,
 836        request: TypedEnvelope<proto::BufferSaved>,
 837    ) -> tide::Result<()> {
 838        let receiver_ids = self
 839            .state()
 840            .project_connection_ids(request.payload.project_id, request.sender_id)
 841            .ok_or_else(|| anyhow!(NO_SUCH_PROJECT))?;
 842        broadcast(request.sender_id, receiver_ids, |connection_id| {
 843            self.peer
 844                .forward_send(request.sender_id, connection_id, request.payload.clone())
 845        })?;
 846        Ok(())
 847    }
 848
 849    async fn get_channels(
 850        self: Arc<Server>,
 851        request: TypedEnvelope<proto::GetChannels>,
 852    ) -> tide::Result<()> {
 853        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 854        let channels = self.app_state.db.get_accessible_channels(user_id).await?;
 855        self.peer.respond(
 856            request.receipt(),
 857            proto::GetChannelsResponse {
 858                channels: channels
 859                    .into_iter()
 860                    .map(|chan| proto::Channel {
 861                        id: chan.id.to_proto(),
 862                        name: chan.name,
 863                    })
 864                    .collect(),
 865            },
 866        )?;
 867        Ok(())
 868    }
 869
 870    async fn get_users(
 871        self: Arc<Server>,
 872        request: TypedEnvelope<proto::GetUsers>,
 873    ) -> tide::Result<()> {
 874        let receipt = request.receipt();
 875        let user_ids = request.payload.user_ids.into_iter().map(UserId::from_proto);
 876        let users = self
 877            .app_state
 878            .db
 879            .get_users_by_ids(user_ids)
 880            .await?
 881            .into_iter()
 882            .map(|user| proto::User {
 883                id: user.id.to_proto(),
 884                avatar_url: format!("https://github.com/{}.png?size=128", user.github_login),
 885                github_login: user.github_login,
 886            })
 887            .collect();
 888        self.peer
 889            .respond(receipt, proto::GetUsersResponse { users })?;
 890        Ok(())
 891    }
 892
 893    fn update_contacts_for_users<'a>(
 894        self: &Arc<Server>,
 895        user_ids: impl IntoIterator<Item = &'a UserId>,
 896    ) -> anyhow::Result<()> {
 897        let mut result = Ok(());
 898        let state = self.state();
 899        for user_id in user_ids {
 900            let contacts = state.contacts_for_user(*user_id);
 901            for connection_id in state.connection_ids_for_user(*user_id) {
 902                if let Err(error) = self.peer.send(
 903                    connection_id,
 904                    proto::UpdateContacts {
 905                        contacts: contacts.clone(),
 906                    },
 907                ) {
 908                    result = Err(error);
 909                }
 910            }
 911        }
 912        result
 913    }
 914
 915    async fn join_channel(
 916        mut self: Arc<Self>,
 917        request: TypedEnvelope<proto::JoinChannel>,
 918    ) -> tide::Result<()> {
 919        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 920        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 921        if !self
 922            .app_state
 923            .db
 924            .can_user_access_channel(user_id, channel_id)
 925            .await?
 926        {
 927            Err(anyhow!("access denied"))?;
 928        }
 929
 930        self.state_mut().join_channel(request.sender_id, channel_id);
 931        let messages = self
 932            .app_state
 933            .db
 934            .get_channel_messages(channel_id, MESSAGE_COUNT_PER_PAGE, None)
 935            .await?
 936            .into_iter()
 937            .map(|msg| proto::ChannelMessage {
 938                id: msg.id.to_proto(),
 939                body: msg.body,
 940                timestamp: msg.sent_at.unix_timestamp() as u64,
 941                sender_id: msg.sender_id.to_proto(),
 942                nonce: Some(msg.nonce.as_u128().into()),
 943            })
 944            .collect::<Vec<_>>();
 945        self.peer.respond(
 946            request.receipt(),
 947            proto::JoinChannelResponse {
 948                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
 949                messages,
 950            },
 951        )?;
 952        Ok(())
 953    }
 954
 955    async fn leave_channel(
 956        mut self: Arc<Self>,
 957        request: TypedEnvelope<proto::LeaveChannel>,
 958    ) -> tide::Result<()> {
 959        let user_id = self.state().user_id_for_connection(request.sender_id)?;
 960        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 961        if !self
 962            .app_state
 963            .db
 964            .can_user_access_channel(user_id, channel_id)
 965            .await?
 966        {
 967            Err(anyhow!("access denied"))?;
 968        }
 969
 970        self.state_mut()
 971            .leave_channel(request.sender_id, channel_id);
 972
 973        Ok(())
 974    }
 975
 976    async fn send_channel_message(
 977        self: Arc<Self>,
 978        request: TypedEnvelope<proto::SendChannelMessage>,
 979    ) -> tide::Result<()> {
 980        let receipt = request.receipt();
 981        let channel_id = ChannelId::from_proto(request.payload.channel_id);
 982        let user_id;
 983        let connection_ids;
 984        {
 985            let state = self.state();
 986            user_id = state.user_id_for_connection(request.sender_id)?;
 987            if let Some(ids) = state.channel_connection_ids(channel_id) {
 988                connection_ids = ids;
 989            } else {
 990                return Ok(());
 991            }
 992        }
 993
 994        // Validate the message body.
 995        let body = request.payload.body.trim().to_string();
 996        if body.len() > MAX_MESSAGE_LEN {
 997            self.peer.respond_with_error(
 998                receipt,
 999                proto::Error {
1000                    message: "message is too long".to_string(),
1001                },
1002            )?;
1003            return Ok(());
1004        }
1005        if body.is_empty() {
1006            self.peer.respond_with_error(
1007                receipt,
1008                proto::Error {
1009                    message: "message can't be blank".to_string(),
1010                },
1011            )?;
1012            return Ok(());
1013        }
1014
1015        let timestamp = OffsetDateTime::now_utc();
1016        let nonce = if let Some(nonce) = request.payload.nonce {
1017            nonce
1018        } else {
1019            self.peer.respond_with_error(
1020                receipt,
1021                proto::Error {
1022                    message: "nonce can't be blank".to_string(),
1023                },
1024            )?;
1025            return Ok(());
1026        };
1027
1028        let message_id = self
1029            .app_state
1030            .db
1031            .create_channel_message(channel_id, user_id, &body, timestamp, nonce.clone().into())
1032            .await?
1033            .to_proto();
1034        let message = proto::ChannelMessage {
1035            sender_id: user_id.to_proto(),
1036            id: message_id,
1037            body,
1038            timestamp: timestamp.unix_timestamp() as u64,
1039            nonce: Some(nonce),
1040        };
1041        broadcast(request.sender_id, connection_ids, |conn_id| {
1042            self.peer.send(
1043                conn_id,
1044                proto::ChannelMessageSent {
1045                    channel_id: channel_id.to_proto(),
1046                    message: Some(message.clone()),
1047                },
1048            )
1049        })?;
1050        self.peer.respond(
1051            receipt,
1052            proto::SendChannelMessageResponse {
1053                message: Some(message),
1054            },
1055        )?;
1056        Ok(())
1057    }
1058
1059    async fn get_channel_messages(
1060        self: Arc<Self>,
1061        request: TypedEnvelope<proto::GetChannelMessages>,
1062    ) -> tide::Result<()> {
1063        let user_id = self.state().user_id_for_connection(request.sender_id)?;
1064        let channel_id = ChannelId::from_proto(request.payload.channel_id);
1065        if !self
1066            .app_state
1067            .db
1068            .can_user_access_channel(user_id, channel_id)
1069            .await?
1070        {
1071            Err(anyhow!("access denied"))?;
1072        }
1073
1074        let messages = self
1075            .app_state
1076            .db
1077            .get_channel_messages(
1078                channel_id,
1079                MESSAGE_COUNT_PER_PAGE,
1080                Some(MessageId::from_proto(request.payload.before_message_id)),
1081            )
1082            .await?
1083            .into_iter()
1084            .map(|msg| proto::ChannelMessage {
1085                id: msg.id.to_proto(),
1086                body: msg.body,
1087                timestamp: msg.sent_at.unix_timestamp() as u64,
1088                sender_id: msg.sender_id.to_proto(),
1089                nonce: Some(msg.nonce.as_u128().into()),
1090            })
1091            .collect::<Vec<_>>();
1092        self.peer.respond(
1093            request.receipt(),
1094            proto::GetChannelMessagesResponse {
1095                done: messages.len() < MESSAGE_COUNT_PER_PAGE,
1096                messages,
1097            },
1098        )?;
1099        Ok(())
1100    }
1101
1102    fn state<'a>(self: &'a Arc<Self>) -> RwLockReadGuard<'a, Store> {
1103        self.store.read()
1104    }
1105
1106    fn state_mut<'a>(self: &'a mut Arc<Self>) -> RwLockWriteGuard<'a, Store> {
1107        self.store.write()
1108    }
1109}
1110
1111fn broadcast<F>(
1112    sender_id: ConnectionId,
1113    receiver_ids: Vec<ConnectionId>,
1114    mut f: F,
1115) -> anyhow::Result<()>
1116where
1117    F: FnMut(ConnectionId) -> anyhow::Result<()>,
1118{
1119    let mut result = Ok(());
1120    for receiver_id in receiver_ids {
1121        if receiver_id != sender_id {
1122            if let Err(error) = f(receiver_id) {
1123                if result.is_ok() {
1124                    result = Err(error);
1125                }
1126            }
1127        }
1128    }
1129    result
1130}
1131
1132pub fn add_routes(app: &mut tide::Server<Arc<AppState>>, rpc: &Arc<Peer>) {
1133    let server = Server::new(app.state().clone(), rpc.clone(), None);
1134    app.at("/rpc").get(move |request: Request<Arc<AppState>>| {
1135        let server = server.clone();
1136        async move {
1137            const WEBSOCKET_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1138
1139            let connection_upgrade = header_contains_ignore_case(&request, CONNECTION, "upgrade");
1140            let upgrade_to_websocket = header_contains_ignore_case(&request, UPGRADE, "websocket");
1141            let upgrade_requested = connection_upgrade && upgrade_to_websocket;
1142            let client_protocol_version: Option<u32> = request
1143                .header("X-Zed-Protocol-Version")
1144                .and_then(|v| v.as_str().parse().ok());
1145
1146            if !upgrade_requested || client_protocol_version != Some(rpc::PROTOCOL_VERSION) {
1147                return Ok(Response::new(StatusCode::UpgradeRequired));
1148            }
1149
1150            let header = match request.header("Sec-Websocket-Key") {
1151                Some(h) => h.as_str(),
1152                None => return Err(anyhow!("expected sec-websocket-key"))?,
1153            };
1154
1155            let user_id = process_auth_header(&request).await?;
1156
1157            let mut response = Response::new(StatusCode::SwitchingProtocols);
1158            response.insert_header(UPGRADE, "websocket");
1159            response.insert_header(CONNECTION, "Upgrade");
1160            let hash = Sha1::new().chain(header).chain(WEBSOCKET_GUID).finalize();
1161            response.insert_header("Sec-Websocket-Accept", base64::encode(&hash[..]));
1162            response.insert_header("Sec-Websocket-Version", "13");
1163
1164            let http_res: &mut tide::http::Response = response.as_mut();
1165            let upgrade_receiver = http_res.recv_upgrade().await;
1166            let addr = request.remote().unwrap_or("unknown").to_string();
1167            task::spawn(async move {
1168                if let Some(stream) = upgrade_receiver.await {
1169                    server
1170                        .handle_connection(
1171                            Connection::new(
1172                                WebSocketStream::from_raw_socket(stream, Role::Server, None).await,
1173                            ),
1174                            addr,
1175                            user_id,
1176                            None,
1177                        )
1178                        .await;
1179                }
1180            });
1181
1182            Ok(response)
1183        }
1184    });
1185}
1186
1187fn header_contains_ignore_case<T>(
1188    request: &tide::Request<T>,
1189    header_name: HeaderName,
1190    value: &str,
1191) -> bool {
1192    request
1193        .header(header_name)
1194        .map(|h| {
1195            h.as_str()
1196                .split(',')
1197                .any(|s| s.trim().eq_ignore_ascii_case(value.trim()))
1198        })
1199        .unwrap_or(false)
1200}
1201
1202#[cfg(test)]
1203mod tests {
1204    use super::*;
1205    use crate::{
1206        auth,
1207        db::{tests::TestDb, UserId},
1208        github, AppState, Config,
1209    };
1210    use ::rpc::Peer;
1211    use async_std::task;
1212    use gpui::{executor, ModelHandle, TestAppContext};
1213    use parking_lot::Mutex;
1214    use postage::{mpsc, watch};
1215    use rand::prelude::*;
1216    use rpc::PeerId;
1217    use serde_json::json;
1218    use sqlx::types::time::OffsetDateTime;
1219    use std::{
1220        ops::Deref,
1221        path::Path,
1222        rc::Rc,
1223        sync::{
1224            atomic::{AtomicBool, Ordering::SeqCst},
1225            Arc,
1226        },
1227        time::Duration,
1228    };
1229    use zed::{
1230        client::{
1231            self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials,
1232            EstablishConnectionError, UserStore,
1233        },
1234        editor::{ConfirmCompletion, Editor, EditorSettings, Input, MultiBuffer},
1235        fs::{FakeFs, Fs as _},
1236        language::{
1237            tree_sitter_rust, AnchorRangeExt, Diagnostic, DiagnosticEntry, Language,
1238            LanguageConfig, LanguageRegistry, LanguageServerConfig, Point,
1239        },
1240        lsp,
1241        project::{worktree::WorktreeHandle, DiagnosticSummary, Project, ProjectPath},
1242    };
1243
1244    #[cfg(test)]
1245    #[ctor::ctor]
1246    fn init_logger() {
1247        if std::env::var("RUST_LOG").is_ok() {
1248            env_logger::init();
1249        }
1250    }
1251
1252    #[gpui::test(iterations = 10)]
1253    async fn test_share_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1254        let (window_b, _) = cx_b.add_window(|_| EmptyView);
1255        let lang_registry = Arc::new(LanguageRegistry::new());
1256        let fs = Arc::new(FakeFs::new(cx_a.background()));
1257        cx_a.foreground().forbid_parking();
1258
1259        // Connect to a server as 2 clients.
1260        let mut server = TestServer::start(cx_a.foreground()).await;
1261        let client_a = server.create_client(&mut cx_a, "user_a").await;
1262        let client_b = server.create_client(&mut cx_b, "user_b").await;
1263
1264        // Share a project as client A
1265        fs.insert_tree(
1266            "/a",
1267            json!({
1268                ".zed.toml": r#"collaborators = ["user_b"]"#,
1269                "a.txt": "a-contents",
1270                "b.txt": "b-contents",
1271            }),
1272        )
1273        .await;
1274        let project_a = cx_a.update(|cx| {
1275            Project::local(
1276                client_a.clone(),
1277                client_a.user_store.clone(),
1278                lang_registry.clone(),
1279                fs.clone(),
1280                cx,
1281            )
1282        });
1283        let (worktree_a, _) = project_a
1284            .update(&mut cx_a, |p, cx| {
1285                p.find_or_create_local_worktree("/a", false, cx)
1286            })
1287            .await
1288            .unwrap();
1289        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1290        worktree_a
1291            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1292            .await;
1293        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1294        project_a
1295            .update(&mut cx_a, |p, cx| p.share(cx))
1296            .await
1297            .unwrap();
1298
1299        // Join that project as client B
1300        let project_b = Project::remote(
1301            project_id,
1302            client_b.clone(),
1303            client_b.user_store.clone(),
1304            lang_registry.clone(),
1305            fs.clone(),
1306            &mut cx_b.to_async(),
1307        )
1308        .await
1309        .unwrap();
1310
1311        let replica_id_b = project_b.read_with(&cx_b, |project, _| {
1312            assert_eq!(
1313                project
1314                    .collaborators()
1315                    .get(&client_a.peer_id)
1316                    .unwrap()
1317                    .user
1318                    .github_login,
1319                "user_a"
1320            );
1321            project.replica_id()
1322        });
1323        project_a
1324            .condition(&cx_a, |tree, _| {
1325                tree.collaborators()
1326                    .get(&client_b.peer_id)
1327                    .map_or(false, |collaborator| {
1328                        collaborator.replica_id == replica_id_b
1329                            && collaborator.user.github_login == "user_b"
1330                    })
1331            })
1332            .await;
1333
1334        // Open the same file as client B and client A.
1335        let buffer_b = project_b
1336            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1337            .await
1338            .unwrap();
1339        let buffer_b = cx_b.add_model(|cx| MultiBuffer::singleton(buffer_b, cx));
1340        buffer_b.read_with(&cx_b, |buf, cx| {
1341            assert_eq!(buf.read(cx).text(), "b-contents")
1342        });
1343        project_a.read_with(&cx_a, |project, cx| {
1344            assert!(project.has_open_buffer((worktree_id, "b.txt"), cx))
1345        });
1346        let buffer_a = project_a
1347            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "b.txt"), cx))
1348            .await
1349            .unwrap();
1350
1351        let editor_b = cx_b.add_view(window_b, |cx| {
1352            Editor::for_buffer(buffer_b, Arc::new(|cx| EditorSettings::test(cx)), None, cx)
1353        });
1354
1355        // TODO
1356        // // Create a selection set as client B and see that selection set as client A.
1357        // buffer_a
1358        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 1)
1359        //     .await;
1360
1361        // Edit the buffer as client B and see that edit as client A.
1362        editor_b.update(&mut cx_b, |editor, cx| {
1363            editor.handle_input(&Input("ok, ".into()), cx)
1364        });
1365        buffer_a
1366            .condition(&cx_a, |buffer, _| buffer.text() == "ok, b-contents")
1367            .await;
1368
1369        // TODO
1370        // // Remove the selection set as client B, see those selections disappear as client A.
1371        cx_b.update(move |_| drop(editor_b));
1372        // buffer_a
1373        //     .condition(&cx_a, |buffer, _| buffer.selection_sets().count() == 0)
1374        //     .await;
1375
1376        // Close the buffer as client A, see that the buffer is closed.
1377        cx_a.update(move |_| drop(buffer_a));
1378        project_a
1379            .condition(&cx_a, |project, cx| {
1380                !project.has_open_buffer((worktree_id, "b.txt"), cx)
1381            })
1382            .await;
1383
1384        // Dropping the client B's project removes client B from client A's collaborators.
1385        cx_b.update(move |_| drop(project_b));
1386        project_a
1387            .condition(&cx_a, |project, _| project.collaborators().is_empty())
1388            .await;
1389    }
1390
1391    #[gpui::test(iterations = 10)]
1392    async fn test_unshare_project(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1393        let lang_registry = Arc::new(LanguageRegistry::new());
1394        let fs = Arc::new(FakeFs::new(cx_a.background()));
1395        cx_a.foreground().forbid_parking();
1396
1397        // Connect to a server as 2 clients.
1398        let mut server = TestServer::start(cx_a.foreground()).await;
1399        let client_a = server.create_client(&mut cx_a, "user_a").await;
1400        let client_b = server.create_client(&mut cx_b, "user_b").await;
1401
1402        // Share a project as client A
1403        fs.insert_tree(
1404            "/a",
1405            json!({
1406                ".zed.toml": r#"collaborators = ["user_b"]"#,
1407                "a.txt": "a-contents",
1408                "b.txt": "b-contents",
1409            }),
1410        )
1411        .await;
1412        let project_a = cx_a.update(|cx| {
1413            Project::local(
1414                client_a.clone(),
1415                client_a.user_store.clone(),
1416                lang_registry.clone(),
1417                fs.clone(),
1418                cx,
1419            )
1420        });
1421        let (worktree_a, _) = project_a
1422            .update(&mut cx_a, |p, cx| {
1423                p.find_or_create_local_worktree("/a", false, cx)
1424            })
1425            .await
1426            .unwrap();
1427        worktree_a
1428            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1429            .await;
1430        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1431        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1432        project_a
1433            .update(&mut cx_a, |p, cx| p.share(cx))
1434            .await
1435            .unwrap();
1436        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1437
1438        // Join that project as client B
1439        let project_b = Project::remote(
1440            project_id,
1441            client_b.clone(),
1442            client_b.user_store.clone(),
1443            lang_registry.clone(),
1444            fs.clone(),
1445            &mut cx_b.to_async(),
1446        )
1447        .await
1448        .unwrap();
1449        project_b
1450            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1451            .await
1452            .unwrap();
1453
1454        // Unshare the project as client A
1455        project_a
1456            .update(&mut cx_a, |project, cx| project.unshare(cx))
1457            .await
1458            .unwrap();
1459        project_b
1460            .condition(&mut cx_b, |project, _| project.is_read_only())
1461            .await;
1462        assert!(worktree_a.read_with(&cx_a, |tree, _| !tree.as_local().unwrap().is_shared()));
1463        drop(project_b);
1464
1465        // Share the project again and ensure guests can still join.
1466        project_a
1467            .update(&mut cx_a, |project, cx| project.share(cx))
1468            .await
1469            .unwrap();
1470        assert!(worktree_a.read_with(&cx_a, |tree, _| tree.as_local().unwrap().is_shared()));
1471
1472        let project_c = Project::remote(
1473            project_id,
1474            client_b.clone(),
1475            client_b.user_store.clone(),
1476            lang_registry.clone(),
1477            fs.clone(),
1478            &mut cx_b.to_async(),
1479        )
1480        .await
1481        .unwrap();
1482        project_c
1483            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1484            .await
1485            .unwrap();
1486    }
1487
1488    #[gpui::test(iterations = 10)]
1489    async fn test_propagate_saves_and_fs_changes(
1490        mut cx_a: TestAppContext,
1491        mut cx_b: TestAppContext,
1492        mut cx_c: TestAppContext,
1493    ) {
1494        let lang_registry = Arc::new(LanguageRegistry::new());
1495        let fs = Arc::new(FakeFs::new(cx_a.background()));
1496        cx_a.foreground().forbid_parking();
1497
1498        // Connect to a server as 3 clients.
1499        let mut server = TestServer::start(cx_a.foreground()).await;
1500        let client_a = server.create_client(&mut cx_a, "user_a").await;
1501        let client_b = server.create_client(&mut cx_b, "user_b").await;
1502        let client_c = server.create_client(&mut cx_c, "user_c").await;
1503
1504        // Share a worktree as client A.
1505        fs.insert_tree(
1506            "/a",
1507            json!({
1508                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1509                "file1": "",
1510                "file2": ""
1511            }),
1512        )
1513        .await;
1514        let project_a = cx_a.update(|cx| {
1515            Project::local(
1516                client_a.clone(),
1517                client_a.user_store.clone(),
1518                lang_registry.clone(),
1519                fs.clone(),
1520                cx,
1521            )
1522        });
1523        let (worktree_a, _) = project_a
1524            .update(&mut cx_a, |p, cx| {
1525                p.find_or_create_local_worktree("/a", false, cx)
1526            })
1527            .await
1528            .unwrap();
1529        worktree_a
1530            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1531            .await;
1532        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1533        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1534        project_a
1535            .update(&mut cx_a, |p, cx| p.share(cx))
1536            .await
1537            .unwrap();
1538
1539        // Join that worktree as clients B and C.
1540        let project_b = Project::remote(
1541            project_id,
1542            client_b.clone(),
1543            client_b.user_store.clone(),
1544            lang_registry.clone(),
1545            fs.clone(),
1546            &mut cx_b.to_async(),
1547        )
1548        .await
1549        .unwrap();
1550        let project_c = Project::remote(
1551            project_id,
1552            client_c.clone(),
1553            client_c.user_store.clone(),
1554            lang_registry.clone(),
1555            fs.clone(),
1556            &mut cx_c.to_async(),
1557        )
1558        .await
1559        .unwrap();
1560        let worktree_b = project_b.read_with(&cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1561        let worktree_c = project_c.read_with(&cx_c, |p, cx| p.worktrees(cx).next().unwrap());
1562
1563        // Open and edit a buffer as both guests B and C.
1564        let buffer_b = project_b
1565            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1566            .await
1567            .unwrap();
1568        let buffer_c = project_c
1569            .update(&mut cx_c, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1570            .await
1571            .unwrap();
1572        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "i-am-b, ", cx));
1573        buffer_c.update(&mut cx_c, |buf, cx| buf.edit([0..0], "i-am-c, ", cx));
1574
1575        // Open and edit that buffer as the host.
1576        let buffer_a = project_a
1577            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "file1"), cx))
1578            .await
1579            .unwrap();
1580
1581        buffer_a
1582            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, ")
1583            .await;
1584        buffer_a.update(&mut cx_a, |buf, cx| {
1585            buf.edit([buf.len()..buf.len()], "i-am-a", cx)
1586        });
1587
1588        // Wait for edits to propagate
1589        buffer_a
1590            .condition(&mut cx_a, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1591            .await;
1592        buffer_b
1593            .condition(&mut cx_b, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1594            .await;
1595        buffer_c
1596            .condition(&mut cx_c, |buf, _| buf.text() == "i-am-c, i-am-b, i-am-a")
1597            .await;
1598
1599        // Edit the buffer as the host and concurrently save as guest B.
1600        let save_b = buffer_b.update(&mut cx_b, |buf, cx| buf.save(cx));
1601        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "hi-a, ", cx));
1602        save_b.await.unwrap();
1603        assert_eq!(
1604            fs.load("/a/file1".as_ref()).await.unwrap(),
1605            "hi-a, i-am-c, i-am-b, i-am-a"
1606        );
1607        buffer_a.read_with(&cx_a, |buf, _| assert!(!buf.is_dirty()));
1608        buffer_b.read_with(&cx_b, |buf, _| assert!(!buf.is_dirty()));
1609        buffer_c.condition(&cx_c, |buf, _| !buf.is_dirty()).await;
1610
1611        // Ensure worktree observes a/file1's change event *before* the rename occurs, otherwise
1612        // when interpreting the change event it will mistakenly think that the file has been
1613        // deleted (because its path has changed) and will subsequently fail to detect the rename.
1614        worktree_a.flush_fs_events(&cx_a).await;
1615
1616        // Make changes on host's file system, see those changes on guest worktrees.
1617        fs.rename(
1618            "/a/file1".as_ref(),
1619            "/a/file1-renamed".as_ref(),
1620            Default::default(),
1621        )
1622        .await
1623        .unwrap();
1624        fs.rename("/a/file2".as_ref(), "/a/file3".as_ref(), Default::default())
1625            .await
1626            .unwrap();
1627        fs.insert_file(Path::new("/a/file4"), "4".into())
1628            .await
1629            .unwrap();
1630
1631        worktree_a
1632            .condition(&cx_a, |tree, _| {
1633                tree.paths()
1634                    .map(|p| p.to_string_lossy())
1635                    .collect::<Vec<_>>()
1636                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1637            })
1638            .await;
1639        worktree_b
1640            .condition(&cx_b, |tree, _| {
1641                tree.paths()
1642                    .map(|p| p.to_string_lossy())
1643                    .collect::<Vec<_>>()
1644                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1645            })
1646            .await;
1647        worktree_c
1648            .condition(&cx_c, |tree, _| {
1649                tree.paths()
1650                    .map(|p| p.to_string_lossy())
1651                    .collect::<Vec<_>>()
1652                    == [".zed.toml", "file1-renamed", "file3", "file4"]
1653            })
1654            .await;
1655
1656        // Ensure buffer files are updated as well.
1657        buffer_a
1658            .condition(&cx_a, |buf, _| {
1659                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1660            })
1661            .await;
1662        buffer_b
1663            .condition(&cx_b, |buf, _| {
1664                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1665            })
1666            .await;
1667        buffer_c
1668            .condition(&cx_c, |buf, _| {
1669                buf.file().unwrap().path().to_str() == Some("file1-renamed")
1670            })
1671            .await;
1672    }
1673
1674    #[gpui::test(iterations = 10)]
1675    async fn test_buffer_conflict_after_save(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1676        cx_a.foreground().forbid_parking();
1677        let lang_registry = Arc::new(LanguageRegistry::new());
1678        let fs = Arc::new(FakeFs::new(cx_a.background()));
1679
1680        // Connect to a server as 2 clients.
1681        let mut server = TestServer::start(cx_a.foreground()).await;
1682        let client_a = server.create_client(&mut cx_a, "user_a").await;
1683        let client_b = server.create_client(&mut cx_b, "user_b").await;
1684
1685        // Share a project as client A
1686        fs.insert_tree(
1687            "/dir",
1688            json!({
1689                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1690                "a.txt": "a-contents",
1691            }),
1692        )
1693        .await;
1694
1695        let project_a = cx_a.update(|cx| {
1696            Project::local(
1697                client_a.clone(),
1698                client_a.user_store.clone(),
1699                lang_registry.clone(),
1700                fs.clone(),
1701                cx,
1702            )
1703        });
1704        let (worktree_a, _) = project_a
1705            .update(&mut cx_a, |p, cx| {
1706                p.find_or_create_local_worktree("/dir", false, cx)
1707            })
1708            .await
1709            .unwrap();
1710        worktree_a
1711            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1712            .await;
1713        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1714        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1715        project_a
1716            .update(&mut cx_a, |p, cx| p.share(cx))
1717            .await
1718            .unwrap();
1719
1720        // Join that project as client B
1721        let project_b = Project::remote(
1722            project_id,
1723            client_b.clone(),
1724            client_b.user_store.clone(),
1725            lang_registry.clone(),
1726            fs.clone(),
1727            &mut cx_b.to_async(),
1728        )
1729        .await
1730        .unwrap();
1731        let worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1732
1733        // Open a buffer as client B
1734        let buffer_b = project_b
1735            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1736            .await
1737            .unwrap();
1738        let mtime = buffer_b.read_with(&cx_b, |buf, _| buf.file().unwrap().mtime());
1739
1740        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "world ", cx));
1741        buffer_b.read_with(&cx_b, |buf, _| {
1742            assert!(buf.is_dirty());
1743            assert!(!buf.has_conflict());
1744        });
1745
1746        buffer_b
1747            .update(&mut cx_b, |buf, cx| buf.save(cx))
1748            .await
1749            .unwrap();
1750        worktree_b
1751            .condition(&cx_b, |_, cx| {
1752                buffer_b.read(cx).file().unwrap().mtime() != mtime
1753            })
1754            .await;
1755        buffer_b.read_with(&cx_b, |buf, _| {
1756            assert!(!buf.is_dirty());
1757            assert!(!buf.has_conflict());
1758        });
1759
1760        buffer_b.update(&mut cx_b, |buf, cx| buf.edit([0..0], "hello ", cx));
1761        buffer_b.read_with(&cx_b, |buf, _| {
1762            assert!(buf.is_dirty());
1763            assert!(!buf.has_conflict());
1764        });
1765    }
1766
1767    #[gpui::test(iterations = 10)]
1768    async fn test_buffer_reloading(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
1769        cx_a.foreground().forbid_parking();
1770        let lang_registry = Arc::new(LanguageRegistry::new());
1771        let fs = Arc::new(FakeFs::new(cx_a.background()));
1772
1773        // Connect to a server as 2 clients.
1774        let mut server = TestServer::start(cx_a.foreground()).await;
1775        let client_a = server.create_client(&mut cx_a, "user_a").await;
1776        let client_b = server.create_client(&mut cx_b, "user_b").await;
1777
1778        // Share a project as client A
1779        fs.insert_tree(
1780            "/dir",
1781            json!({
1782                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
1783                "a.txt": "a-contents",
1784            }),
1785        )
1786        .await;
1787
1788        let project_a = cx_a.update(|cx| {
1789            Project::local(
1790                client_a.clone(),
1791                client_a.user_store.clone(),
1792                lang_registry.clone(),
1793                fs.clone(),
1794                cx,
1795            )
1796        });
1797        let (worktree_a, _) = project_a
1798            .update(&mut cx_a, |p, cx| {
1799                p.find_or_create_local_worktree("/dir", false, cx)
1800            })
1801            .await
1802            .unwrap();
1803        worktree_a
1804            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1805            .await;
1806        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1807        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1808        project_a
1809            .update(&mut cx_a, |p, cx| p.share(cx))
1810            .await
1811            .unwrap();
1812
1813        // Join that project as client B
1814        let project_b = Project::remote(
1815            project_id,
1816            client_b.clone(),
1817            client_b.user_store.clone(),
1818            lang_registry.clone(),
1819            fs.clone(),
1820            &mut cx_b.to_async(),
1821        )
1822        .await
1823        .unwrap();
1824        let _worktree_b = project_b.update(&mut cx_b, |p, cx| p.worktrees(cx).next().unwrap());
1825
1826        // Open a buffer as client B
1827        let buffer_b = project_b
1828            .update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1829            .await
1830            .unwrap();
1831        buffer_b.read_with(&cx_b, |buf, _| {
1832            assert!(!buf.is_dirty());
1833            assert!(!buf.has_conflict());
1834        });
1835
1836        fs.save(Path::new("/dir/a.txt"), &"new contents".into())
1837            .await
1838            .unwrap();
1839        buffer_b
1840            .condition(&cx_b, |buf, _| {
1841                buf.text() == "new contents" && !buf.is_dirty()
1842            })
1843            .await;
1844        buffer_b.read_with(&cx_b, |buf, _| {
1845            assert!(!buf.has_conflict());
1846        });
1847    }
1848
1849    #[gpui::test(iterations = 10)]
1850    async fn test_editing_while_guest_opens_buffer(
1851        mut cx_a: TestAppContext,
1852        mut cx_b: TestAppContext,
1853    ) {
1854        cx_a.foreground().forbid_parking();
1855        let lang_registry = Arc::new(LanguageRegistry::new());
1856        let fs = Arc::new(FakeFs::new(cx_a.background()));
1857
1858        // Connect to a server as 2 clients.
1859        let mut server = TestServer::start(cx_a.foreground()).await;
1860        let client_a = server.create_client(&mut cx_a, "user_a").await;
1861        let client_b = server.create_client(&mut cx_b, "user_b").await;
1862
1863        // Share a project as client A
1864        fs.insert_tree(
1865            "/dir",
1866            json!({
1867                ".zed.toml": r#"collaborators = ["user_b"]"#,
1868                "a.txt": "a-contents",
1869            }),
1870        )
1871        .await;
1872        let project_a = cx_a.update(|cx| {
1873            Project::local(
1874                client_a.clone(),
1875                client_a.user_store.clone(),
1876                lang_registry.clone(),
1877                fs.clone(),
1878                cx,
1879            )
1880        });
1881        let (worktree_a, _) = project_a
1882            .update(&mut cx_a, |p, cx| {
1883                p.find_or_create_local_worktree("/dir", false, cx)
1884            })
1885            .await
1886            .unwrap();
1887        worktree_a
1888            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1889            .await;
1890        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1891        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1892        project_a
1893            .update(&mut cx_a, |p, cx| p.share(cx))
1894            .await
1895            .unwrap();
1896
1897        // Join that project as client B
1898        let project_b = Project::remote(
1899            project_id,
1900            client_b.clone(),
1901            client_b.user_store.clone(),
1902            lang_registry.clone(),
1903            fs.clone(),
1904            &mut cx_b.to_async(),
1905        )
1906        .await
1907        .unwrap();
1908
1909        // Open a buffer as client A
1910        let buffer_a = project_a
1911            .update(&mut cx_a, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx))
1912            .await
1913            .unwrap();
1914
1915        // Start opening the same buffer as client B
1916        let buffer_b = cx_b
1917            .background()
1918            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1919        task::yield_now().await;
1920
1921        // Edit the buffer as client A while client B is still opening it.
1922        buffer_a.update(&mut cx_a, |buf, cx| buf.edit([0..0], "z", cx));
1923
1924        let text = buffer_a.read_with(&cx_a, |buf, _| buf.text());
1925        let buffer_b = buffer_b.await.unwrap();
1926        buffer_b.condition(&cx_b, |buf, _| buf.text() == text).await;
1927    }
1928
1929    #[gpui::test(iterations = 10)]
1930    async fn test_leaving_worktree_while_opening_buffer(
1931        mut cx_a: TestAppContext,
1932        mut cx_b: TestAppContext,
1933    ) {
1934        cx_a.foreground().forbid_parking();
1935        let lang_registry = Arc::new(LanguageRegistry::new());
1936        let fs = Arc::new(FakeFs::new(cx_a.background()));
1937
1938        // Connect to a server as 2 clients.
1939        let mut server = TestServer::start(cx_a.foreground()).await;
1940        let client_a = server.create_client(&mut cx_a, "user_a").await;
1941        let client_b = server.create_client(&mut cx_b, "user_b").await;
1942
1943        // Share a project as client A
1944        fs.insert_tree(
1945            "/dir",
1946            json!({
1947                ".zed.toml": r#"collaborators = ["user_b"]"#,
1948                "a.txt": "a-contents",
1949            }),
1950        )
1951        .await;
1952        let project_a = cx_a.update(|cx| {
1953            Project::local(
1954                client_a.clone(),
1955                client_a.user_store.clone(),
1956                lang_registry.clone(),
1957                fs.clone(),
1958                cx,
1959            )
1960        });
1961        let (worktree_a, _) = project_a
1962            .update(&mut cx_a, |p, cx| {
1963                p.find_or_create_local_worktree("/dir", false, cx)
1964            })
1965            .await
1966            .unwrap();
1967        worktree_a
1968            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
1969            .await;
1970        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
1971        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
1972        project_a
1973            .update(&mut cx_a, |p, cx| p.share(cx))
1974            .await
1975            .unwrap();
1976
1977        // Join that project as client B
1978        let project_b = Project::remote(
1979            project_id,
1980            client_b.clone(),
1981            client_b.user_store.clone(),
1982            lang_registry.clone(),
1983            fs.clone(),
1984            &mut cx_b.to_async(),
1985        )
1986        .await
1987        .unwrap();
1988
1989        // See that a guest has joined as client A.
1990        project_a
1991            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
1992            .await;
1993
1994        // Begin opening a buffer as client B, but leave the project before the open completes.
1995        let buffer_b = cx_b
1996            .background()
1997            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.txt"), cx)));
1998        cx_b.update(|_| drop(project_b));
1999        drop(buffer_b);
2000
2001        // See that the guest has left.
2002        project_a
2003            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2004            .await;
2005    }
2006
2007    #[gpui::test(iterations = 10)]
2008    async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2009        cx_a.foreground().forbid_parking();
2010        let lang_registry = Arc::new(LanguageRegistry::new());
2011        let fs = Arc::new(FakeFs::new(cx_a.background()));
2012
2013        // Connect to a server as 2 clients.
2014        let mut server = TestServer::start(cx_a.foreground()).await;
2015        let client_a = server.create_client(&mut cx_a, "user_a").await;
2016        let client_b = server.create_client(&mut cx_b, "user_b").await;
2017
2018        // Share a project as client A
2019        fs.insert_tree(
2020            "/a",
2021            json!({
2022                ".zed.toml": r#"collaborators = ["user_b"]"#,
2023                "a.txt": "a-contents",
2024                "b.txt": "b-contents",
2025            }),
2026        )
2027        .await;
2028        let project_a = cx_a.update(|cx| {
2029            Project::local(
2030                client_a.clone(),
2031                client_a.user_store.clone(),
2032                lang_registry.clone(),
2033                fs.clone(),
2034                cx,
2035            )
2036        });
2037        let (worktree_a, _) = project_a
2038            .update(&mut cx_a, |p, cx| {
2039                p.find_or_create_local_worktree("/a", false, cx)
2040            })
2041            .await
2042            .unwrap();
2043        worktree_a
2044            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2045            .await;
2046        let project_id = project_a
2047            .update(&mut cx_a, |project, _| project.next_remote_id())
2048            .await;
2049        project_a
2050            .update(&mut cx_a, |project, cx| project.share(cx))
2051            .await
2052            .unwrap();
2053
2054        // Join that project as client B
2055        let _project_b = Project::remote(
2056            project_id,
2057            client_b.clone(),
2058            client_b.user_store.clone(),
2059            lang_registry.clone(),
2060            fs.clone(),
2061            &mut cx_b.to_async(),
2062        )
2063        .await
2064        .unwrap();
2065
2066        // See that a guest has joined as client A.
2067        project_a
2068            .condition(&cx_a, |p, _| p.collaborators().len() == 1)
2069            .await;
2070
2071        // Drop client B's connection and ensure client A observes client B leaving the worktree.
2072        client_b.disconnect(&cx_b.to_async()).unwrap();
2073        project_a
2074            .condition(&cx_a, |p, _| p.collaborators().len() == 0)
2075            .await;
2076    }
2077
2078    #[gpui::test(iterations = 10)]
2079    async fn test_collaborating_with_diagnostics(
2080        mut cx_a: TestAppContext,
2081        mut cx_b: TestAppContext,
2082    ) {
2083        cx_a.foreground().forbid_parking();
2084        let mut lang_registry = Arc::new(LanguageRegistry::new());
2085        let fs = Arc::new(FakeFs::new(cx_a.background()));
2086
2087        // Set up a fake language server.
2088        let (language_server_config, mut fake_language_server) =
2089            LanguageServerConfig::fake(&cx_a).await;
2090        Arc::get_mut(&mut lang_registry)
2091            .unwrap()
2092            .add(Arc::new(Language::new(
2093                LanguageConfig {
2094                    name: "Rust".to_string(),
2095                    path_suffixes: vec!["rs".to_string()],
2096                    language_server: Some(language_server_config),
2097                    ..Default::default()
2098                },
2099                Some(tree_sitter_rust::language()),
2100            )));
2101
2102        // Connect to a server as 2 clients.
2103        let mut server = TestServer::start(cx_a.foreground()).await;
2104        let client_a = server.create_client(&mut cx_a, "user_a").await;
2105        let client_b = server.create_client(&mut cx_b, "user_b").await;
2106
2107        // Share a project as client A
2108        fs.insert_tree(
2109            "/a",
2110            json!({
2111                ".zed.toml": r#"collaborators = ["user_b"]"#,
2112                "a.rs": "let one = two",
2113                "other.rs": "",
2114            }),
2115        )
2116        .await;
2117        let project_a = cx_a.update(|cx| {
2118            Project::local(
2119                client_a.clone(),
2120                client_a.user_store.clone(),
2121                lang_registry.clone(),
2122                fs.clone(),
2123                cx,
2124            )
2125        });
2126        let (worktree_a, _) = project_a
2127            .update(&mut cx_a, |p, cx| {
2128                p.find_or_create_local_worktree("/a", false, cx)
2129            })
2130            .await
2131            .unwrap();
2132        worktree_a
2133            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2134            .await;
2135        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2136        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2137        project_a
2138            .update(&mut cx_a, |p, cx| p.share(cx))
2139            .await
2140            .unwrap();
2141
2142        // Cause the language server to start.
2143        let _ = cx_a
2144            .background()
2145            .spawn(project_a.update(&mut cx_a, |project, cx| {
2146                project.open_buffer(
2147                    ProjectPath {
2148                        worktree_id,
2149                        path: Path::new("other.rs").into(),
2150                    },
2151                    cx,
2152                )
2153            }))
2154            .await
2155            .unwrap();
2156
2157        // Simulate a language server reporting errors for a file.
2158        fake_language_server
2159            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2160                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2161                version: None,
2162                diagnostics: vec![lsp::Diagnostic {
2163                    severity: Some(lsp::DiagnosticSeverity::ERROR),
2164                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2165                    message: "message 1".to_string(),
2166                    ..Default::default()
2167                }],
2168            })
2169            .await;
2170
2171        // Wait for server to see the diagnostics update.
2172        server
2173            .condition(|store| {
2174                let worktree = store
2175                    .project(project_id)
2176                    .unwrap()
2177                    .worktrees
2178                    .get(&worktree_id.to_proto())
2179                    .unwrap();
2180
2181                !worktree
2182                    .share
2183                    .as_ref()
2184                    .unwrap()
2185                    .diagnostic_summaries
2186                    .is_empty()
2187            })
2188            .await;
2189
2190        // Join the worktree as client B.
2191        let project_b = Project::remote(
2192            project_id,
2193            client_b.clone(),
2194            client_b.user_store.clone(),
2195            lang_registry.clone(),
2196            fs.clone(),
2197            &mut cx_b.to_async(),
2198        )
2199        .await
2200        .unwrap();
2201
2202        project_b.read_with(&cx_b, |project, cx| {
2203            assert_eq!(
2204                project.diagnostic_summaries(cx).collect::<Vec<_>>(),
2205                &[(
2206                    ProjectPath {
2207                        worktree_id,
2208                        path: Arc::from(Path::new("a.rs")),
2209                    },
2210                    DiagnosticSummary {
2211                        error_count: 1,
2212                        warning_count: 0,
2213                        ..Default::default()
2214                    },
2215                )]
2216            )
2217        });
2218
2219        // Simulate a language server reporting more errors for a file.
2220        fake_language_server
2221            .notify::<lsp::notification::PublishDiagnostics>(lsp::PublishDiagnosticsParams {
2222                uri: lsp::Url::from_file_path("/a/a.rs").unwrap(),
2223                version: None,
2224                diagnostics: vec![
2225                    lsp::Diagnostic {
2226                        severity: Some(lsp::DiagnosticSeverity::ERROR),
2227                        range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 7)),
2228                        message: "message 1".to_string(),
2229                        ..Default::default()
2230                    },
2231                    lsp::Diagnostic {
2232                        severity: Some(lsp::DiagnosticSeverity::WARNING),
2233                        range: lsp::Range::new(
2234                            lsp::Position::new(0, 10),
2235                            lsp::Position::new(0, 13),
2236                        ),
2237                        message: "message 2".to_string(),
2238                        ..Default::default()
2239                    },
2240                ],
2241            })
2242            .await;
2243
2244        // Client b gets the updated summaries
2245        project_b
2246            .condition(&cx_b, |project, cx| {
2247                project.diagnostic_summaries(cx).collect::<Vec<_>>()
2248                    == &[(
2249                        ProjectPath {
2250                            worktree_id,
2251                            path: Arc::from(Path::new("a.rs")),
2252                        },
2253                        DiagnosticSummary {
2254                            error_count: 1,
2255                            warning_count: 1,
2256                            ..Default::default()
2257                        },
2258                    )]
2259            })
2260            .await;
2261
2262        // Open the file with the errors on client B. They should be present.
2263        let buffer_b = cx_b
2264            .background()
2265            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2266            .await
2267            .unwrap();
2268
2269        buffer_b.read_with(&cx_b, |buffer, _| {
2270            assert_eq!(
2271                buffer
2272                    .snapshot()
2273                    .diagnostics_in_range::<_, Point>(0..buffer.len())
2274                    .map(|entry| entry)
2275                    .collect::<Vec<_>>(),
2276                &[
2277                    DiagnosticEntry {
2278                        range: Point::new(0, 4)..Point::new(0, 7),
2279                        diagnostic: Diagnostic {
2280                            group_id: 0,
2281                            message: "message 1".to_string(),
2282                            severity: lsp::DiagnosticSeverity::ERROR,
2283                            is_primary: true,
2284                            ..Default::default()
2285                        }
2286                    },
2287                    DiagnosticEntry {
2288                        range: Point::new(0, 10)..Point::new(0, 13),
2289                        diagnostic: Diagnostic {
2290                            group_id: 1,
2291                            severity: lsp::DiagnosticSeverity::WARNING,
2292                            message: "message 2".to_string(),
2293                            is_primary: true,
2294                            ..Default::default()
2295                        }
2296                    }
2297                ]
2298            );
2299        });
2300    }
2301
2302    #[gpui::test(iterations = 10)]
2303    async fn test_collaborating_with_completion(
2304        mut cx_a: TestAppContext,
2305        mut cx_b: TestAppContext,
2306    ) {
2307        cx_a.foreground().forbid_parking();
2308        let mut lang_registry = Arc::new(LanguageRegistry::new());
2309        let fs = Arc::new(FakeFs::new(cx_a.background()));
2310
2311        // Set up a fake language server.
2312        let (language_server_config, mut fake_language_server) =
2313            LanguageServerConfig::fake_with_capabilities(
2314                lsp::ServerCapabilities {
2315                    completion_provider: Some(lsp::CompletionOptions {
2316                        trigger_characters: Some(vec![".".to_string()]),
2317                        ..Default::default()
2318                    }),
2319                    ..Default::default()
2320                },
2321                &cx_a,
2322            )
2323            .await;
2324        Arc::get_mut(&mut lang_registry)
2325            .unwrap()
2326            .add(Arc::new(Language::new(
2327                LanguageConfig {
2328                    name: "Rust".to_string(),
2329                    path_suffixes: vec!["rs".to_string()],
2330                    language_server: Some(language_server_config),
2331                    ..Default::default()
2332                },
2333                Some(tree_sitter_rust::language()),
2334            )));
2335
2336        // Connect to a server as 2 clients.
2337        let mut server = TestServer::start(cx_a.foreground()).await;
2338        let client_a = server.create_client(&mut cx_a, "user_a").await;
2339        let client_b = server.create_client(&mut cx_b, "user_b").await;
2340
2341        // Share a project as client A
2342        fs.insert_tree(
2343            "/a",
2344            json!({
2345                ".zed.toml": r#"collaborators = ["user_b"]"#,
2346                "main.rs": "fn main() { a }",
2347                "other.rs": "",
2348            }),
2349        )
2350        .await;
2351        let project_a = cx_a.update(|cx| {
2352            Project::local(
2353                client_a.clone(),
2354                client_a.user_store.clone(),
2355                lang_registry.clone(),
2356                fs.clone(),
2357                cx,
2358            )
2359        });
2360        let (worktree_a, _) = project_a
2361            .update(&mut cx_a, |p, cx| {
2362                p.find_or_create_local_worktree("/a", false, cx)
2363            })
2364            .await
2365            .unwrap();
2366        worktree_a
2367            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2368            .await;
2369        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2370        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2371        project_a
2372            .update(&mut cx_a, |p, cx| p.share(cx))
2373            .await
2374            .unwrap();
2375
2376        // Join the worktree as client B.
2377        let project_b = Project::remote(
2378            project_id,
2379            client_b.clone(),
2380            client_b.user_store.clone(),
2381            lang_registry.clone(),
2382            fs.clone(),
2383            &mut cx_b.to_async(),
2384        )
2385        .await
2386        .unwrap();
2387
2388        // Open a file in an editor as the guest.
2389        let buffer_b = project_b
2390            .update(&mut cx_b, |p, cx| {
2391                p.open_buffer((worktree_id, "main.rs"), cx)
2392            })
2393            .await
2394            .unwrap();
2395        let (window_b, _) = cx_b.add_window(|_| EmptyView);
2396        let editor_b = cx_b.add_view(window_b, |cx| {
2397            Editor::for_buffer(
2398                cx.add_model(|cx| MultiBuffer::singleton(buffer_b.clone(), cx)),
2399                Arc::new(|cx| EditorSettings::test(cx)),
2400                Some(project_b.clone()),
2401                cx,
2402            )
2403        });
2404
2405        // Type a completion trigger character as the guest.
2406        editor_b.update(&mut cx_b, |editor, cx| {
2407            editor.select_ranges([13..13], None, cx);
2408            editor.handle_input(&Input(".".into()), cx);
2409            cx.focus(&editor_b);
2410        });
2411
2412        // Receive a completion request as the host's language server.
2413        // Return some completions from the host's language server.
2414        fake_language_server.handle_request::<lsp::request::Completion, _>(|params| {
2415            assert_eq!(
2416                params.text_document_position.text_document.uri,
2417                lsp::Url::from_file_path("/a/main.rs").unwrap(),
2418            );
2419            assert_eq!(
2420                params.text_document_position.position,
2421                lsp::Position::new(0, 14),
2422            );
2423
2424            Some(lsp::CompletionResponse::Array(vec![
2425                lsp::CompletionItem {
2426                    label: "first_method(…)".into(),
2427                    detail: Some("fn(&mut self, B) -> C".into()),
2428                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2429                        new_text: "first_method($1)".to_string(),
2430                        range: lsp::Range::new(
2431                            lsp::Position::new(0, 14),
2432                            lsp::Position::new(0, 14),
2433                        ),
2434                    })),
2435                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2436                    ..Default::default()
2437                },
2438                lsp::CompletionItem {
2439                    label: "second_method(…)".into(),
2440                    detail: Some("fn(&mut self, C) -> D<E>".into()),
2441                    text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2442                        new_text: "second_method()".to_string(),
2443                        range: lsp::Range::new(
2444                            lsp::Position::new(0, 14),
2445                            lsp::Position::new(0, 14),
2446                        ),
2447                    })),
2448                    insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2449                    ..Default::default()
2450                },
2451            ]))
2452        });
2453
2454        // Open the buffer on the host.
2455        let buffer_a = project_a
2456            .update(&mut cx_a, |p, cx| {
2457                p.open_buffer((worktree_id, "main.rs"), cx)
2458            })
2459            .await
2460            .unwrap();
2461        buffer_a
2462            .condition(&cx_a, |buffer, _| buffer.text() == "fn main() { a. }")
2463            .await;
2464
2465        // Confirm a completion on the guest.
2466        editor_b.next_notification(&cx_b).await;
2467        editor_b.update(&mut cx_b, |editor, cx| {
2468            assert!(editor.context_menu_visible());
2469            editor.confirm_completion(&ConfirmCompletion(Some(0)), cx);
2470            assert_eq!(editor.text(cx), "fn main() { a.first_method() }");
2471        });
2472
2473        // Return a resolved completion from the host's language server.
2474        // The resolved completion has an additional text edit.
2475        fake_language_server.handle_request::<lsp::request::ResolveCompletionItem, _>(|params| {
2476            assert_eq!(params.label, "first_method(…)");
2477            lsp::CompletionItem {
2478                label: "first_method(…)".into(),
2479                detail: Some("fn(&mut self, B) -> C".into()),
2480                text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit {
2481                    new_text: "first_method($1)".to_string(),
2482                    range: lsp::Range::new(lsp::Position::new(0, 14), lsp::Position::new(0, 14)),
2483                })),
2484                additional_text_edits: Some(vec![lsp::TextEdit {
2485                    new_text: "use d::SomeTrait;\n".to_string(),
2486                    range: lsp::Range::new(lsp::Position::new(0, 0), lsp::Position::new(0, 0)),
2487                }]),
2488                insert_text_format: Some(lsp::InsertTextFormat::SNIPPET),
2489                ..Default::default()
2490            }
2491        });
2492
2493        buffer_a
2494            .condition(&cx_a, |buffer, _| {
2495                buffer.text() == "fn main() { a.first_method() }"
2496            })
2497            .await;
2498
2499        // The additional edit is applied.
2500        buffer_b
2501            .condition(&cx_b, |buffer, _| {
2502                buffer.text() == "use d::SomeTrait;\nfn main() { a.first_method() }"
2503            })
2504            .await;
2505        assert_eq!(
2506            buffer_a.read_with(&cx_a, |buffer, _| buffer.text()),
2507            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2508        );
2509    }
2510
2511    #[gpui::test(iterations = 10)]
2512    async fn test_formatting_buffer(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2513        cx_a.foreground().forbid_parking();
2514        let mut lang_registry = Arc::new(LanguageRegistry::new());
2515        let fs = Arc::new(FakeFs::new(cx_a.background()));
2516
2517        // Set up a fake language server.
2518        let (language_server_config, mut fake_language_server) =
2519            LanguageServerConfig::fake(&cx_a).await;
2520        Arc::get_mut(&mut lang_registry)
2521            .unwrap()
2522            .add(Arc::new(Language::new(
2523                LanguageConfig {
2524                    name: "Rust".to_string(),
2525                    path_suffixes: vec!["rs".to_string()],
2526                    language_server: Some(language_server_config),
2527                    ..Default::default()
2528                },
2529                Some(tree_sitter_rust::language()),
2530            )));
2531
2532        // Connect to a server as 2 clients.
2533        let mut server = TestServer::start(cx_a.foreground()).await;
2534        let client_a = server.create_client(&mut cx_a, "user_a").await;
2535        let client_b = server.create_client(&mut cx_b, "user_b").await;
2536
2537        // Share a project as client A
2538        fs.insert_tree(
2539            "/a",
2540            json!({
2541                ".zed.toml": r#"collaborators = ["user_b"]"#,
2542                "a.rs": "let one = two",
2543            }),
2544        )
2545        .await;
2546        let project_a = cx_a.update(|cx| {
2547            Project::local(
2548                client_a.clone(),
2549                client_a.user_store.clone(),
2550                lang_registry.clone(),
2551                fs.clone(),
2552                cx,
2553            )
2554        });
2555        let (worktree_a, _) = project_a
2556            .update(&mut cx_a, |p, cx| {
2557                p.find_or_create_local_worktree("/a", false, cx)
2558            })
2559            .await
2560            .unwrap();
2561        worktree_a
2562            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2563            .await;
2564        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2565        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2566        project_a
2567            .update(&mut cx_a, |p, cx| p.share(cx))
2568            .await
2569            .unwrap();
2570
2571        // Join the worktree as client B.
2572        let project_b = Project::remote(
2573            project_id,
2574            client_b.clone(),
2575            client_b.user_store.clone(),
2576            lang_registry.clone(),
2577            fs.clone(),
2578            &mut cx_b.to_async(),
2579        )
2580        .await
2581        .unwrap();
2582
2583        let buffer_b = cx_b
2584            .background()
2585            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2586            .await
2587            .unwrap();
2588
2589        let format = project_b.update(&mut cx_b, |project, cx| {
2590            project.format(HashSet::from_iter([buffer_b.clone()]), true, cx)
2591        });
2592
2593        fake_language_server.handle_request::<lsp::request::Formatting, _>(|_| {
2594            Some(vec![
2595                lsp::TextEdit {
2596                    range: lsp::Range::new(lsp::Position::new(0, 4), lsp::Position::new(0, 4)),
2597                    new_text: "h".to_string(),
2598                },
2599                lsp::TextEdit {
2600                    range: lsp::Range::new(lsp::Position::new(0, 7), lsp::Position::new(0, 7)),
2601                    new_text: "y".to_string(),
2602                },
2603            ])
2604        });
2605
2606        format.await.unwrap();
2607        assert_eq!(
2608            buffer_b.read_with(&cx_b, |buffer, _| buffer.text()),
2609            "let honey = two"
2610        );
2611    }
2612
2613    #[gpui::test(iterations = 10)]
2614    async fn test_definition(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2615        cx_a.foreground().forbid_parking();
2616        let mut lang_registry = Arc::new(LanguageRegistry::new());
2617        let fs = Arc::new(FakeFs::new(cx_a.background()));
2618        fs.insert_tree(
2619            "/root-1",
2620            json!({
2621                ".zed.toml": r#"collaborators = ["user_b"]"#,
2622                "a.rs": "const ONE: usize = b::TWO + b::THREE;",
2623            }),
2624        )
2625        .await;
2626        fs.insert_tree(
2627            "/root-2",
2628            json!({
2629                "b.rs": "const TWO: usize = 2;\nconst THREE: usize = 3;",
2630            }),
2631        )
2632        .await;
2633
2634        // Set up a fake language server.
2635        let (language_server_config, mut fake_language_server) =
2636            LanguageServerConfig::fake(&cx_a).await;
2637        Arc::get_mut(&mut lang_registry)
2638            .unwrap()
2639            .add(Arc::new(Language::new(
2640                LanguageConfig {
2641                    name: "Rust".to_string(),
2642                    path_suffixes: vec!["rs".to_string()],
2643                    language_server: Some(language_server_config),
2644                    ..Default::default()
2645                },
2646                Some(tree_sitter_rust::language()),
2647            )));
2648
2649        // Connect to a server as 2 clients.
2650        let mut server = TestServer::start(cx_a.foreground()).await;
2651        let client_a = server.create_client(&mut cx_a, "user_a").await;
2652        let client_b = server.create_client(&mut cx_b, "user_b").await;
2653
2654        // Share a project as client A
2655        let project_a = cx_a.update(|cx| {
2656            Project::local(
2657                client_a.clone(),
2658                client_a.user_store.clone(),
2659                lang_registry.clone(),
2660                fs.clone(),
2661                cx,
2662            )
2663        });
2664        let (worktree_a, _) = project_a
2665            .update(&mut cx_a, |p, cx| {
2666                p.find_or_create_local_worktree("/root-1", false, cx)
2667            })
2668            .await
2669            .unwrap();
2670        worktree_a
2671            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2672            .await;
2673        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2674        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2675        project_a
2676            .update(&mut cx_a, |p, cx| p.share(cx))
2677            .await
2678            .unwrap();
2679
2680        // Join the worktree as client B.
2681        let project_b = Project::remote(
2682            project_id,
2683            client_b.clone(),
2684            client_b.user_store.clone(),
2685            lang_registry.clone(),
2686            fs.clone(),
2687            &mut cx_b.to_async(),
2688        )
2689        .await
2690        .unwrap();
2691
2692        // Open the file on client B.
2693        let buffer_b = cx_b
2694            .background()
2695            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2696            .await
2697            .unwrap();
2698
2699        // Request the definition of a symbol as the guest.
2700        let definitions_1 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 23, cx));
2701        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2702            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2703                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2704                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2705            )))
2706        });
2707
2708        let definitions_1 = definitions_1.await.unwrap();
2709        cx_b.read(|cx| {
2710            assert_eq!(definitions_1.len(), 1);
2711            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2712            let target_buffer = definitions_1[0].target_buffer.read(cx);
2713            assert_eq!(
2714                target_buffer.text(),
2715                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2716            );
2717            assert_eq!(
2718                definitions_1[0].target_range.to_point(target_buffer),
2719                Point::new(0, 6)..Point::new(0, 9)
2720            );
2721        });
2722
2723        // Try getting more definitions for the same buffer, ensuring the buffer gets reused from
2724        // the previous call to `definition`.
2725        let definitions_2 = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b, 33, cx));
2726        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2727            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2728                lsp::Url::from_file_path("/root-2/b.rs").unwrap(),
2729                lsp::Range::new(lsp::Position::new(1, 6), lsp::Position::new(1, 11)),
2730            )))
2731        });
2732
2733        let definitions_2 = definitions_2.await.unwrap();
2734        cx_b.read(|cx| {
2735            assert_eq!(definitions_2.len(), 1);
2736            assert_eq!(project_b.read(cx).worktrees(cx).count(), 2);
2737            let target_buffer = definitions_2[0].target_buffer.read(cx);
2738            assert_eq!(
2739                target_buffer.text(),
2740                "const TWO: usize = 2;\nconst THREE: usize = 3;"
2741            );
2742            assert_eq!(
2743                definitions_2[0].target_range.to_point(target_buffer),
2744                Point::new(1, 6)..Point::new(1, 11)
2745            );
2746        });
2747        assert_eq!(
2748            definitions_1[0].target_buffer,
2749            definitions_2[0].target_buffer
2750        );
2751
2752        cx_b.update(|_| {
2753            drop(definitions_1);
2754            drop(definitions_2);
2755        });
2756        project_b
2757            .condition(&cx_b, |proj, cx| proj.worktrees(cx).count() == 1)
2758            .await;
2759    }
2760
2761    #[gpui::test(iterations = 10)]
2762    async fn test_open_buffer_while_getting_definition_pointing_to_it(
2763        mut cx_a: TestAppContext,
2764        mut cx_b: TestAppContext,
2765        mut rng: StdRng,
2766    ) {
2767        cx_a.foreground().forbid_parking();
2768        let mut lang_registry = Arc::new(LanguageRegistry::new());
2769        let fs = Arc::new(FakeFs::new(cx_a.background()));
2770        fs.insert_tree(
2771            "/root",
2772            json!({
2773                ".zed.toml": r#"collaborators = ["user_b"]"#,
2774                "a.rs": "const ONE: usize = b::TWO;",
2775                "b.rs": "const TWO: usize = 2",
2776            }),
2777        )
2778        .await;
2779
2780        // Set up a fake language server.
2781        let (language_server_config, mut fake_language_server) =
2782            LanguageServerConfig::fake(&cx_a).await;
2783        Arc::get_mut(&mut lang_registry)
2784            .unwrap()
2785            .add(Arc::new(Language::new(
2786                LanguageConfig {
2787                    name: "Rust".to_string(),
2788                    path_suffixes: vec!["rs".to_string()],
2789                    language_server: Some(language_server_config),
2790                    ..Default::default()
2791                },
2792                Some(tree_sitter_rust::language()),
2793            )));
2794
2795        // Connect to a server as 2 clients.
2796        let mut server = TestServer::start(cx_a.foreground()).await;
2797        let client_a = server.create_client(&mut cx_a, "user_a").await;
2798        let client_b = server.create_client(&mut cx_b, "user_b").await;
2799
2800        // Share a project as client A
2801        let project_a = cx_a.update(|cx| {
2802            Project::local(
2803                client_a.clone(),
2804                client_a.user_store.clone(),
2805                lang_registry.clone(),
2806                fs.clone(),
2807                cx,
2808            )
2809        });
2810        let (worktree_a, _) = project_a
2811            .update(&mut cx_a, |p, cx| {
2812                p.find_or_create_local_worktree("/root", false, cx)
2813            })
2814            .await
2815            .unwrap();
2816        worktree_a
2817            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
2818            .await;
2819        let project_id = project_a.update(&mut cx_a, |p, _| p.next_remote_id()).await;
2820        let worktree_id = worktree_a.read_with(&cx_a, |tree, _| tree.id());
2821        project_a
2822            .update(&mut cx_a, |p, cx| p.share(cx))
2823            .await
2824            .unwrap();
2825
2826        // Join the worktree as client B.
2827        let project_b = Project::remote(
2828            project_id,
2829            client_b.clone(),
2830            client_b.user_store.clone(),
2831            lang_registry.clone(),
2832            fs.clone(),
2833            &mut cx_b.to_async(),
2834        )
2835        .await
2836        .unwrap();
2837
2838        let buffer_b1 = cx_b
2839            .background()
2840            .spawn(project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "a.rs"), cx)))
2841            .await
2842            .unwrap();
2843
2844        let definitions;
2845        let buffer_b2;
2846        if rng.gen() {
2847            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2848            buffer_b2 =
2849                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2850        } else {
2851            buffer_b2 =
2852                project_b.update(&mut cx_b, |p, cx| p.open_buffer((worktree_id, "b.rs"), cx));
2853            definitions = project_b.update(&mut cx_b, |p, cx| p.definition(&buffer_b1, 23, cx));
2854        }
2855
2856        fake_language_server.handle_request::<lsp::request::GotoDefinition, _>(|_| {
2857            Some(lsp::GotoDefinitionResponse::Scalar(lsp::Location::new(
2858                lsp::Url::from_file_path("/root/b.rs").unwrap(),
2859                lsp::Range::new(lsp::Position::new(0, 6), lsp::Position::new(0, 9)),
2860            )))
2861        });
2862
2863        let buffer_b2 = buffer_b2.await.unwrap();
2864        let definitions = definitions.await.unwrap();
2865        assert_eq!(definitions.len(), 1);
2866        assert_eq!(definitions[0].target_buffer, buffer_b2);
2867    }
2868
2869    #[gpui::test(iterations = 10)]
2870    async fn test_basic_chat(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
2871        cx_a.foreground().forbid_parking();
2872
2873        // Connect to a server as 2 clients.
2874        let mut server = TestServer::start(cx_a.foreground()).await;
2875        let client_a = server.create_client(&mut cx_a, "user_a").await;
2876        let client_b = server.create_client(&mut cx_b, "user_b").await;
2877
2878        // Create an org that includes these 2 users.
2879        let db = &server.app_state.db;
2880        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
2881        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
2882            .await
2883            .unwrap();
2884        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
2885            .await
2886            .unwrap();
2887
2888        // Create a channel that includes all the users.
2889        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
2890        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
2891            .await
2892            .unwrap();
2893        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
2894            .await
2895            .unwrap();
2896        db.create_channel_message(
2897            channel_id,
2898            client_b.current_user_id(&cx_b),
2899            "hello A, it's B.",
2900            OffsetDateTime::now_utc(),
2901            1,
2902        )
2903        .await
2904        .unwrap();
2905
2906        let channels_a = cx_a
2907            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
2908        channels_a
2909            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
2910            .await;
2911        channels_a.read_with(&cx_a, |list, _| {
2912            assert_eq!(
2913                list.available_channels().unwrap(),
2914                &[ChannelDetails {
2915                    id: channel_id.to_proto(),
2916                    name: "test-channel".to_string()
2917                }]
2918            )
2919        });
2920        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
2921            this.get_channel(channel_id.to_proto(), cx).unwrap()
2922        });
2923        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
2924        channel_a
2925            .condition(&cx_a, |channel, _| {
2926                channel_messages(channel)
2927                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2928            })
2929            .await;
2930
2931        let channels_b = cx_b
2932            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
2933        channels_b
2934            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
2935            .await;
2936        channels_b.read_with(&cx_b, |list, _| {
2937            assert_eq!(
2938                list.available_channels().unwrap(),
2939                &[ChannelDetails {
2940                    id: channel_id.to_proto(),
2941                    name: "test-channel".to_string()
2942                }]
2943            )
2944        });
2945
2946        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
2947            this.get_channel(channel_id.to_proto(), cx).unwrap()
2948        });
2949        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
2950        channel_b
2951            .condition(&cx_b, |channel, _| {
2952                channel_messages(channel)
2953                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
2954            })
2955            .await;
2956
2957        channel_a
2958            .update(&mut cx_a, |channel, cx| {
2959                channel
2960                    .send_message("oh, hi B.".to_string(), cx)
2961                    .unwrap()
2962                    .detach();
2963                let task = channel.send_message("sup".to_string(), cx).unwrap();
2964                assert_eq!(
2965                    channel_messages(channel),
2966                    &[
2967                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2968                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
2969                        ("user_a".to_string(), "sup".to_string(), true)
2970                    ]
2971                );
2972                task
2973            })
2974            .await
2975            .unwrap();
2976
2977        channel_b
2978            .condition(&cx_b, |channel, _| {
2979                channel_messages(channel)
2980                    == [
2981                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
2982                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
2983                        ("user_a".to_string(), "sup".to_string(), false),
2984                    ]
2985            })
2986            .await;
2987
2988        assert_eq!(
2989            server
2990                .state()
2991                .await
2992                .channel(channel_id)
2993                .unwrap()
2994                .connection_ids
2995                .len(),
2996            2
2997        );
2998        cx_b.update(|_| drop(channel_b));
2999        server
3000            .condition(|state| state.channel(channel_id).unwrap().connection_ids.len() == 1)
3001            .await;
3002
3003        cx_a.update(|_| drop(channel_a));
3004        server
3005            .condition(|state| state.channel(channel_id).is_none())
3006            .await;
3007    }
3008
3009    #[gpui::test(iterations = 10)]
3010    async fn test_chat_message_validation(mut cx_a: TestAppContext) {
3011        cx_a.foreground().forbid_parking();
3012
3013        let mut server = TestServer::start(cx_a.foreground()).await;
3014        let client_a = server.create_client(&mut cx_a, "user_a").await;
3015
3016        let db = &server.app_state.db;
3017        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3018        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3019        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3020            .await
3021            .unwrap();
3022        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3023            .await
3024            .unwrap();
3025
3026        let channels_a = cx_a
3027            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3028        channels_a
3029            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3030            .await;
3031        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3032            this.get_channel(channel_id.to_proto(), cx).unwrap()
3033        });
3034
3035        // Messages aren't allowed to be too long.
3036        channel_a
3037            .update(&mut cx_a, |channel, cx| {
3038                let long_body = "this is long.\n".repeat(1024);
3039                channel.send_message(long_body, cx).unwrap()
3040            })
3041            .await
3042            .unwrap_err();
3043
3044        // Messages aren't allowed to be blank.
3045        channel_a.update(&mut cx_a, |channel, cx| {
3046            channel.send_message(String::new(), cx).unwrap_err()
3047        });
3048
3049        // Leading and trailing whitespace are trimmed.
3050        channel_a
3051            .update(&mut cx_a, |channel, cx| {
3052                channel
3053                    .send_message("\n surrounded by whitespace  \n".to_string(), cx)
3054                    .unwrap()
3055            })
3056            .await
3057            .unwrap();
3058        assert_eq!(
3059            db.get_channel_messages(channel_id, 10, None)
3060                .await
3061                .unwrap()
3062                .iter()
3063                .map(|m| &m.body)
3064                .collect::<Vec<_>>(),
3065            &["surrounded by whitespace"]
3066        );
3067    }
3068
3069    #[gpui::test(iterations = 10)]
3070    async fn test_chat_reconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) {
3071        cx_a.foreground().forbid_parking();
3072
3073        // Connect to a server as 2 clients.
3074        let mut server = TestServer::start(cx_a.foreground()).await;
3075        let client_a = server.create_client(&mut cx_a, "user_a").await;
3076        let client_b = server.create_client(&mut cx_b, "user_b").await;
3077        let mut status_b = client_b.status();
3078
3079        // Create an org that includes these 2 users.
3080        let db = &server.app_state.db;
3081        let org_id = db.create_org("Test Org", "test-org").await.unwrap();
3082        db.add_org_member(org_id, client_a.current_user_id(&cx_a), false)
3083            .await
3084            .unwrap();
3085        db.add_org_member(org_id, client_b.current_user_id(&cx_b), false)
3086            .await
3087            .unwrap();
3088
3089        // Create a channel that includes all the users.
3090        let channel_id = db.create_org_channel(org_id, "test-channel").await.unwrap();
3091        db.add_channel_member(channel_id, client_a.current_user_id(&cx_a), false)
3092            .await
3093            .unwrap();
3094        db.add_channel_member(channel_id, client_b.current_user_id(&cx_b), false)
3095            .await
3096            .unwrap();
3097        db.create_channel_message(
3098            channel_id,
3099            client_b.current_user_id(&cx_b),
3100            "hello A, it's B.",
3101            OffsetDateTime::now_utc(),
3102            2,
3103        )
3104        .await
3105        .unwrap();
3106
3107        let channels_a = cx_a
3108            .add_model(|cx| ChannelList::new(client_a.user_store.clone(), client_a.clone(), cx));
3109        channels_a
3110            .condition(&mut cx_a, |list, _| list.available_channels().is_some())
3111            .await;
3112
3113        channels_a.read_with(&cx_a, |list, _| {
3114            assert_eq!(
3115                list.available_channels().unwrap(),
3116                &[ChannelDetails {
3117                    id: channel_id.to_proto(),
3118                    name: "test-channel".to_string()
3119                }]
3120            )
3121        });
3122        let channel_a = channels_a.update(&mut cx_a, |this, cx| {
3123            this.get_channel(channel_id.to_proto(), cx).unwrap()
3124        });
3125        channel_a.read_with(&cx_a, |channel, _| assert!(channel.messages().is_empty()));
3126        channel_a
3127            .condition(&cx_a, |channel, _| {
3128                channel_messages(channel)
3129                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3130            })
3131            .await;
3132
3133        let channels_b = cx_b
3134            .add_model(|cx| ChannelList::new(client_b.user_store.clone(), client_b.clone(), cx));
3135        channels_b
3136            .condition(&mut cx_b, |list, _| list.available_channels().is_some())
3137            .await;
3138        channels_b.read_with(&cx_b, |list, _| {
3139            assert_eq!(
3140                list.available_channels().unwrap(),
3141                &[ChannelDetails {
3142                    id: channel_id.to_proto(),
3143                    name: "test-channel".to_string()
3144                }]
3145            )
3146        });
3147
3148        let channel_b = channels_b.update(&mut cx_b, |this, cx| {
3149            this.get_channel(channel_id.to_proto(), cx).unwrap()
3150        });
3151        channel_b.read_with(&cx_b, |channel, _| assert!(channel.messages().is_empty()));
3152        channel_b
3153            .condition(&cx_b, |channel, _| {
3154                channel_messages(channel)
3155                    == [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3156            })
3157            .await;
3158
3159        // Disconnect client B, ensuring we can still access its cached channel data.
3160        server.forbid_connections();
3161        server.disconnect_client(client_b.current_user_id(&cx_b));
3162        while !matches!(
3163            status_b.next().await,
3164            Some(client::Status::ReconnectionError { .. })
3165        ) {}
3166
3167        channels_b.read_with(&cx_b, |channels, _| {
3168            assert_eq!(
3169                channels.available_channels().unwrap(),
3170                [ChannelDetails {
3171                    id: channel_id.to_proto(),
3172                    name: "test-channel".to_string()
3173                }]
3174            )
3175        });
3176        channel_b.read_with(&cx_b, |channel, _| {
3177            assert_eq!(
3178                channel_messages(channel),
3179                [("user_b".to_string(), "hello A, it's B.".to_string(), false)]
3180            )
3181        });
3182
3183        // Send a message from client B while it is disconnected.
3184        channel_b
3185            .update(&mut cx_b, |channel, cx| {
3186                let task = channel
3187                    .send_message("can you see this?".to_string(), cx)
3188                    .unwrap();
3189                assert_eq!(
3190                    channel_messages(channel),
3191                    &[
3192                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3193                        ("user_b".to_string(), "can you see this?".to_string(), true)
3194                    ]
3195                );
3196                task
3197            })
3198            .await
3199            .unwrap_err();
3200
3201        // Send a message from client A while B is disconnected.
3202        channel_a
3203            .update(&mut cx_a, |channel, cx| {
3204                channel
3205                    .send_message("oh, hi B.".to_string(), cx)
3206                    .unwrap()
3207                    .detach();
3208                let task = channel.send_message("sup".to_string(), cx).unwrap();
3209                assert_eq!(
3210                    channel_messages(channel),
3211                    &[
3212                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3213                        ("user_a".to_string(), "oh, hi B.".to_string(), true),
3214                        ("user_a".to_string(), "sup".to_string(), true)
3215                    ]
3216                );
3217                task
3218            })
3219            .await
3220            .unwrap();
3221
3222        // Give client B a chance to reconnect.
3223        server.allow_connections();
3224        cx_b.foreground().advance_clock(Duration::from_secs(10));
3225
3226        // Verify that B sees the new messages upon reconnection, as well as the message client B
3227        // sent while offline.
3228        channel_b
3229            .condition(&cx_b, |channel, _| {
3230                channel_messages(channel)
3231                    == [
3232                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3233                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3234                        ("user_a".to_string(), "sup".to_string(), false),
3235                        ("user_b".to_string(), "can you see this?".to_string(), false),
3236                    ]
3237            })
3238            .await;
3239
3240        // Ensure client A and B can communicate normally after reconnection.
3241        channel_a
3242            .update(&mut cx_a, |channel, cx| {
3243                channel.send_message("you online?".to_string(), cx).unwrap()
3244            })
3245            .await
3246            .unwrap();
3247        channel_b
3248            .condition(&cx_b, |channel, _| {
3249                channel_messages(channel)
3250                    == [
3251                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3252                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3253                        ("user_a".to_string(), "sup".to_string(), false),
3254                        ("user_b".to_string(), "can you see this?".to_string(), false),
3255                        ("user_a".to_string(), "you online?".to_string(), false),
3256                    ]
3257            })
3258            .await;
3259
3260        channel_b
3261            .update(&mut cx_b, |channel, cx| {
3262                channel.send_message("yep".to_string(), cx).unwrap()
3263            })
3264            .await
3265            .unwrap();
3266        channel_a
3267            .condition(&cx_a, |channel, _| {
3268                channel_messages(channel)
3269                    == [
3270                        ("user_b".to_string(), "hello A, it's B.".to_string(), false),
3271                        ("user_a".to_string(), "oh, hi B.".to_string(), false),
3272                        ("user_a".to_string(), "sup".to_string(), false),
3273                        ("user_b".to_string(), "can you see this?".to_string(), false),
3274                        ("user_a".to_string(), "you online?".to_string(), false),
3275                        ("user_b".to_string(), "yep".to_string(), false),
3276                    ]
3277            })
3278            .await;
3279    }
3280
3281    #[gpui::test(iterations = 10)]
3282    async fn test_contacts(
3283        mut cx_a: TestAppContext,
3284        mut cx_b: TestAppContext,
3285        mut cx_c: TestAppContext,
3286    ) {
3287        cx_a.foreground().forbid_parking();
3288        let lang_registry = Arc::new(LanguageRegistry::new());
3289        let fs = Arc::new(FakeFs::new(cx_a.background()));
3290
3291        // Connect to a server as 3 clients.
3292        let mut server = TestServer::start(cx_a.foreground()).await;
3293        let client_a = server.create_client(&mut cx_a, "user_a").await;
3294        let client_b = server.create_client(&mut cx_b, "user_b").await;
3295        let client_c = server.create_client(&mut cx_c, "user_c").await;
3296
3297        // Share a worktree as client A.
3298        fs.insert_tree(
3299            "/a",
3300            json!({
3301                ".zed.toml": r#"collaborators = ["user_b", "user_c"]"#,
3302            }),
3303        )
3304        .await;
3305
3306        let project_a = cx_a.update(|cx| {
3307            Project::local(
3308                client_a.clone(),
3309                client_a.user_store.clone(),
3310                lang_registry.clone(),
3311                fs.clone(),
3312                cx,
3313            )
3314        });
3315        let (worktree_a, _) = project_a
3316            .update(&mut cx_a, |p, cx| {
3317                p.find_or_create_local_worktree("/a", false, cx)
3318            })
3319            .await
3320            .unwrap();
3321        worktree_a
3322            .read_with(&cx_a, |tree, _| tree.as_local().unwrap().scan_complete())
3323            .await;
3324
3325        client_a
3326            .user_store
3327            .condition(&cx_a, |user_store, _| {
3328                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3329            })
3330            .await;
3331        client_b
3332            .user_store
3333            .condition(&cx_b, |user_store, _| {
3334                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3335            })
3336            .await;
3337        client_c
3338            .user_store
3339            .condition(&cx_c, |user_store, _| {
3340                contacts(user_store) == vec![("user_a", vec![("a", vec![])])]
3341            })
3342            .await;
3343
3344        let project_id = project_a
3345            .update(&mut cx_a, |project, _| project.next_remote_id())
3346            .await;
3347        project_a
3348            .update(&mut cx_a, |project, cx| project.share(cx))
3349            .await
3350            .unwrap();
3351
3352        let _project_b = Project::remote(
3353            project_id,
3354            client_b.clone(),
3355            client_b.user_store.clone(),
3356            lang_registry.clone(),
3357            fs.clone(),
3358            &mut cx_b.to_async(),
3359        )
3360        .await
3361        .unwrap();
3362
3363        client_a
3364            .user_store
3365            .condition(&cx_a, |user_store, _| {
3366                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3367            })
3368            .await;
3369        client_b
3370            .user_store
3371            .condition(&cx_b, |user_store, _| {
3372                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3373            })
3374            .await;
3375        client_c
3376            .user_store
3377            .condition(&cx_c, |user_store, _| {
3378                contacts(user_store) == vec![("user_a", vec![("a", vec!["user_b"])])]
3379            })
3380            .await;
3381
3382        project_a
3383            .condition(&cx_a, |project, _| {
3384                project.collaborators().contains_key(&client_b.peer_id)
3385            })
3386            .await;
3387
3388        cx_a.update(move |_| drop(project_a));
3389        client_a
3390            .user_store
3391            .condition(&cx_a, |user_store, _| contacts(user_store) == vec![])
3392            .await;
3393        client_b
3394            .user_store
3395            .condition(&cx_b, |user_store, _| contacts(user_store) == vec![])
3396            .await;
3397        client_c
3398            .user_store
3399            .condition(&cx_c, |user_store, _| contacts(user_store) == vec![])
3400            .await;
3401
3402        fn contacts(user_store: &UserStore) -> Vec<(&str, Vec<(&str, Vec<&str>)>)> {
3403            user_store
3404                .contacts()
3405                .iter()
3406                .map(|contact| {
3407                    let worktrees = contact
3408                        .projects
3409                        .iter()
3410                        .map(|p| {
3411                            (
3412                                p.worktree_root_names[0].as_str(),
3413                                p.guests.iter().map(|p| p.github_login.as_str()).collect(),
3414                            )
3415                        })
3416                        .collect();
3417                    (contact.user.github_login.as_str(), worktrees)
3418                })
3419                .collect()
3420        }
3421    }
3422
3423    struct TestServer {
3424        peer: Arc<Peer>,
3425        app_state: Arc<AppState>,
3426        server: Arc<Server>,
3427        foreground: Rc<executor::Foreground>,
3428        notifications: mpsc::Receiver<()>,
3429        connection_killers: Arc<Mutex<HashMap<UserId, watch::Sender<Option<()>>>>>,
3430        forbid_connections: Arc<AtomicBool>,
3431        _test_db: TestDb,
3432    }
3433
3434    impl TestServer {
3435        async fn start(foreground: Rc<executor::Foreground>) -> Self {
3436            let test_db = TestDb::new();
3437            let app_state = Self::build_app_state(&test_db).await;
3438            let peer = Peer::new();
3439            let notifications = mpsc::channel(128);
3440            let server = Server::new(app_state.clone(), peer.clone(), Some(notifications.0));
3441            Self {
3442                peer,
3443                app_state,
3444                server,
3445                foreground,
3446                notifications: notifications.1,
3447                connection_killers: Default::default(),
3448                forbid_connections: Default::default(),
3449                _test_db: test_db,
3450            }
3451        }
3452
3453        async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient {
3454            let http = FakeHttpClient::with_404_response();
3455            let user_id = self.app_state.db.create_user(name, false).await.unwrap();
3456            let client_name = name.to_string();
3457            let mut client = Client::new(http.clone());
3458            let server = self.server.clone();
3459            let connection_killers = self.connection_killers.clone();
3460            let forbid_connections = self.forbid_connections.clone();
3461            let (connection_id_tx, mut connection_id_rx) = postage::mpsc::channel(16);
3462
3463            Arc::get_mut(&mut client)
3464                .unwrap()
3465                .override_authenticate(move |cx| {
3466                    cx.spawn(|_| async move {
3467                        let access_token = "the-token".to_string();
3468                        Ok(Credentials {
3469                            user_id: user_id.0 as u64,
3470                            access_token,
3471                        })
3472                    })
3473                })
3474                .override_establish_connection(move |credentials, cx| {
3475                    assert_eq!(credentials.user_id, user_id.0 as u64);
3476                    assert_eq!(credentials.access_token, "the-token");
3477
3478                    let server = server.clone();
3479                    let connection_killers = connection_killers.clone();
3480                    let forbid_connections = forbid_connections.clone();
3481                    let client_name = client_name.clone();
3482                    let connection_id_tx = connection_id_tx.clone();
3483                    cx.spawn(move |cx| async move {
3484                        if forbid_connections.load(SeqCst) {
3485                            Err(EstablishConnectionError::other(anyhow!(
3486                                "server is forbidding connections"
3487                            )))
3488                        } else {
3489                            let (client_conn, server_conn, kill_conn) =
3490                                Connection::in_memory(cx.background());
3491                            connection_killers.lock().insert(user_id, kill_conn);
3492                            cx.background()
3493                                .spawn(server.handle_connection(
3494                                    server_conn,
3495                                    client_name,
3496                                    user_id,
3497                                    Some(connection_id_tx),
3498                                ))
3499                                .detach();
3500                            Ok(client_conn)
3501                        }
3502                    })
3503                });
3504
3505            client
3506                .authenticate_and_connect(&cx.to_async())
3507                .await
3508                .unwrap();
3509
3510            let peer_id = PeerId(connection_id_rx.next().await.unwrap().0);
3511            let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx));
3512            let mut authed_user =
3513                user_store.read_with(cx, |user_store, _| user_store.watch_current_user());
3514            while authed_user.next().await.unwrap().is_none() {}
3515
3516            TestClient {
3517                client,
3518                peer_id,
3519                user_store,
3520            }
3521        }
3522
3523        fn disconnect_client(&self, user_id: UserId) {
3524            if let Some(mut kill_conn) = self.connection_killers.lock().remove(&user_id) {
3525                let _ = kill_conn.try_send(Some(()));
3526            }
3527        }
3528
3529        fn forbid_connections(&self) {
3530            self.forbid_connections.store(true, SeqCst);
3531        }
3532
3533        fn allow_connections(&self) {
3534            self.forbid_connections.store(false, SeqCst);
3535        }
3536
3537        async fn build_app_state(test_db: &TestDb) -> Arc<AppState> {
3538            let mut config = Config::default();
3539            config.session_secret = "a".repeat(32);
3540            config.database_url = test_db.url.clone();
3541            let github_client = github::AppClient::test();
3542            Arc::new(AppState {
3543                db: test_db.db().clone(),
3544                handlebars: Default::default(),
3545                auth_client: auth::build_client("", ""),
3546                repo_client: github::RepoClient::test(&github_client),
3547                github_client,
3548                config,
3549            })
3550        }
3551
3552        async fn state<'a>(&'a self) -> RwLockReadGuard<'a, Store> {
3553            self.server.store.read()
3554        }
3555
3556        async fn condition<F>(&mut self, mut predicate: F)
3557        where
3558            F: FnMut(&Store) -> bool,
3559        {
3560            async_std::future::timeout(Duration::from_millis(500), async {
3561                while !(predicate)(&*self.server.store.read()) {
3562                    self.foreground.start_waiting();
3563                    self.notifications.next().await;
3564                    self.foreground.finish_waiting();
3565                }
3566            })
3567            .await
3568            .expect("condition timed out");
3569        }
3570    }
3571
3572    impl Drop for TestServer {
3573        fn drop(&mut self) {
3574            self.peer.reset();
3575        }
3576    }
3577
3578    struct TestClient {
3579        client: Arc<Client>,
3580        pub peer_id: PeerId,
3581        pub user_store: ModelHandle<UserStore>,
3582    }
3583
3584    impl Deref for TestClient {
3585        type Target = Arc<Client>;
3586
3587        fn deref(&self) -> &Self::Target {
3588            &self.client
3589        }
3590    }
3591
3592    impl TestClient {
3593        pub fn current_user_id(&self, cx: &TestAppContext) -> UserId {
3594            UserId::from_proto(
3595                self.user_store
3596                    .read_with(cx, |user_store, _| user_store.current_user().unwrap().id),
3597            )
3598        }
3599    }
3600
3601    fn channel_messages(channel: &Channel) -> Vec<(String, String, bool)> {
3602        channel
3603            .messages()
3604            .cursor::<()>()
3605            .map(|m| {
3606                (
3607                    m.sender.github_login.clone(),
3608                    m.body.clone(),
3609                    m.is_pending(),
3610                )
3611            })
3612            .collect()
3613    }
3614
3615    struct EmptyView;
3616
3617    impl gpui::Entity for EmptyView {
3618        type Event = ();
3619    }
3620
3621    impl gpui::View for EmptyView {
3622        fn ui_name() -> &'static str {
3623            "empty view"
3624        }
3625
3626        fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
3627            gpui::Element::boxed(gpui::elements::Empty)
3628        }
3629    }
3630}